In [5]:
import torch
import os
import shutil
import random
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# Set random seed for reproducibility
torch.manual_seed(42)

# Define dataset paths and parameters
source_dir = "captured_dataset"
train_ratio = 0.64  # 64% for train
val_ratio = 0.16   # 16% for validation (total train+val = 80%)
test_ratio = 0.20  # 20% for test
batch_size = 64
num_epochs = 20
model_path = "model.pth"

# Create train/validation/test directories if they don't exist
train_dir = os.path.join(source_dir, 'train')
val_dir = os.path.join(source_dir, 'validation')
test_dir = os.path.join(source_dir, 'test')

# Split dataset into train/validation/test if not already done
if not os.path.exists(train_dir) or not os.path.exists(val_dir) or not os.path.exists(test_dir):
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(val_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)

    for class_name in os.listdir(source_dir):
        class_dir = os.path.join(source_dir, class_name)
        if not os.path.isdir(class_dir) or class_name in ['train', 'validation', 'test']:
            continue

        os.makedirs(os.path.join(train_dir, class_name), exist_ok=True)
        os.makedirs(os.path.join(val_dir, class_name), exist_ok=True)
        os.makedirs(os.path.join(test_dir, class_name), exist_ok=True)

        files = [f for f in os.listdir(class_dir) if os.path.isfile(os.path.join(class_dir, f))]
        if not files:
            print(f"Warning: No files in {class_dir}")
            continue

        random.shuffle(files)
        num_files = len(files)
        num_train = int(num_files * train_ratio)
        num_val = int(num_files * val_ratio)

        # Split files
        train_files = files[:num_train]
        val_files = files[num_train:num_train + num_val]
        test_files = files[num_train + num_val:]  # Remaining 20% for test

        # Copy to respective directories
        for f in train_files:
            shutil.copy2(os.path.join(class_dir, f), os.path.join(train_dir, class_name, f))
        for f in val_files:
            shutil.copy2(os.path.join(class_dir, f), os.path.join(val_dir, class_name, f))
        for f in test_files:
            shutil.copy2(os.path.join(class_dir, f), os.path.join(test_dir, class_name, f))

    print(f"Dataset split complete: Train={train_dir}, Validation={val_dir}, Test={test_dir}")
else:
    print("Using existing train/validation/test split")

# Define training transformations with augmentation
train_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.Grayscale(),
    transforms.RandomRotation(10),
    transforms.RandomAffine(0, translate=(0.1, 0.1)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Define validation/test transformations (no augmentation)
val_test_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.Grayscale(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Load datasets
try:
    training_data = datasets.ImageFolder(root=train_dir, transform=train_transform)
    validation_data = datasets.ImageFolder(root=val_dir, transform=val_test_transform)
    test_data = datasets.ImageFolder(root=test_dir, transform=val_test_transform)
except Exception as e:
    print(f"Error loading datasets: {e}")
    exit()

# Create data loaders
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(validation_data, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

# Check dataset structure
print(f"Training samples: {len(training_data)}")
print(f"Validation samples: {len(validation_data)}")
print(f"Test samples: {len(test_data)}")
for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

# Get device
device = (
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)
print(f"Using {device} device")

# Define model
class ConvNeuralNetwork(nn.Module):
    def __init__(self, num_classes=6):
        super().__init__()
        self.conv_stack = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )
        self.flatten = nn.Flatten()
        self.linear_stack = nn.Sequential(
            nn.Linear(32 * 8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.conv_stack(x)
        x = self.flatten(x)
        logits = self.linear_stack(x)
        return logits

model = ConvNeuralNetwork().to(device)
print(model)

# Define loss and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training function
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        if batch % 5 == 0:
            print(f"loss: {loss.item():>7f}  [{(batch + 1) * len(X):>5d}/{size:>5d}]")

# Validation/test function
def evaluate(dataloader, model, loss_fn, dataset_name="Validation"):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"{dataset_name} Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return correct

# Training loop with early stopping
best_accuracy = 0.0
patience = 5
patience_counter = 0

for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    accuracy = evaluate(val_dataloader, model, loss_fn, "Validation")
    
    # Early stopping
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        patience_counter = 0
        torch.save(model.state_dict(), model_path)
        print(f"Saved best model to {model_path} (Validation Accuracy: {100*accuracy:.1f}%)")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered")
            break

print("Training complete!")

# Evaluate on test set
print("Evaluating on test set:")
model.load_state_dict(torch.load(model_path, map_location=device, weights_only=True))
evaluate(test_dataloader, model, loss_fn, "Test")

Using existing train/validation/test split
Training samples: 6159
Validation samples: 1063
Test samples: 2391
Shape of X [N, C, H, W]: torch.Size([64, 1, 32, 32])
Shape of y: torch.Size([64]) torch.int64
Using cpu device
ConvNeuralNetwork(
  (conv_stack): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_stack): Sequential(
    (0): Linear(in_features=2048, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=6, bias=True)
  )
)
Epoch 1
-------------------------------
loss: 1.786522  [   64/ 6159]
loss: 1.808214  [  384/ 6159]
loss: 1.749138  [  704/ 6159]
loss: 1.732627  [ 1024/ 6159

0.998745294855709