In [None]:
import os
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, datasets, models
from torchvision.io import read_image
from torchvision.utils import make_grid
from torch.utils.tensorboard import SummaryWriter

import matplotlib.pyplot as plt
%matplotlib inline


DATA_PATH = "./data/gtsrb-german-traffic-sign/"

TRAIN_FILE = "./data/gtsrb-german-traffic-sign/Train.csv"
TEST_FILE = "./data/gtsrb-german-traffic-sign/Test.csv"

TRAIN_DATA_PATH = "./data/gtsrb-german-traffic-sign/Train/"
TEST_DATA_PATH = "./data/gtsrb-german-traffic-sign/Test/"

TENSORBOARD_PATH = "./runs/gtsrb-german-traffic-sign/"

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Using device: ", device)

In [None]:
transform_train = transforms.Compose([
    transforms.Resize((42, 42)),                                # Resize the image to 42x42
    transforms.RandomRotation(15),                              # Randomly rotate the image by up to 15 degrees
    # transforms.RandomHorizontalFlip(),                        # Randomly flip the image horizontally
    transforms.ColorJitter(brightness=0.2,
                           contrast=0.2,
                           saturation=0.2),                     # Randomly adjust brightness, contrast and saturation
    transforms.RandomAdjustSharpness(sharpness_factor=2),       # Randomly ajust sharpness
    transforms.RandomPerspective(distortion_scale=.1, p=0.8),   # Rendomly changes the persective of the image
    transforms.RandomCrop((32, 32)),                            # Randomly crop the image to 32x32
    transforms.ToTensor(),
    # transforms.Normalize(mean=[0.5, 0.5, 0.5],
    #                      std=[0.5, 0.5, 0.5])
])

transform_test = transforms.Compose([
    transforms.Resize((42, 42)),                                # Resize the image to 42x42
    transforms.CenterCrop((32, 32)),                            # Crop the center 32x32 portion of the image
    transforms.ToTensor(),
    # transforms.Normalize(mean=[0.5, 0.5, 0.5],
    #                      std=[0.5, 0.5, 0.5])
])

In [None]:
class GTSRBDataset(Dataset):
    def __init__(self, annotations_file, img_dir , transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file).loc[:, ["Path","ClassId"]]
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        label = self.img_labels.iloc[idx, 1]
        image = read_image(img_path)
        image = transforms.ToTensor(image)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [None]:
train_dataset = datasets.ImageFolder(root=TRAIN_DATA_PATH, transform=transform_train)

train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True, num_workers=2)

In [None]:
def ShowImages(dl, rows=6, cols=10):
    for images, labels in dl:
        fig, ax = plt.subplots(figsize=(12, 12))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(images[:rows*cols], nrow=cols).permute(1, 2, 0).clamp(0,1))
        break

ShowImages(train_dataloader)

In [None]:
# Define your neural network
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT, progress=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 43)

In [None]:
# Define the optimizer and loss function
# Common range for weight decay 1e-4 and 1e-2
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.0)
criterion = nn.CrossEntropyLoss()

# Define the learning rate scheduler
# scheduler = lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)
# scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)

In [None]:
# checkpoint = torch.load("./model_checkpoints/best-model.pt")

# model.load_state_dict(checkpoint['model_state_dict'])
# optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Train the model
n_epochs = 10

# Load model on device
model.to(device)

# Initialize TensorBoard writer
writer = SummaryWriter()

for epoch in range(n_epochs):
    model.train()
    train_running_loss = 0.0
    train_correct = 0
    train_total = 0
    for i, data in enumerate(train_dataloader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_running_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        train_total += labels.size(0)
        train_correct += (predicted == labels).sum().item()

        # Write the training loss and accuracy to TensorBoard
        iteration = epoch * len(train_dataloader) + i
        writer.add_scalar('Train/Loss', loss.item(), iteration)
        writer.add_scalar('Train/Accuracy', (predicted == labels).sum().item() / labels.size(0), iteration)
    
    # Reduce learning rate every epoch
    # scheduler.step()

    train_loss = train_running_loss / len(train_dataloader)
    train_acc = train_correct / train_total
    # print(f"Epoch {epoch+1} Training Loss: {train_loss}")
    # print(f"Epoch {epoch+1} Training Accuracy: {train_acc}")

    # Calculate the validation loss and accuracy
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for data in val_dataloader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_loss = val_loss / len(val_dataloader)
    val_acc = val_correct / val_total
    # print(f"Epoch {epoch+1} Validation Loss: {val_loss}")
    # print(f"Epoch {epoch+1} Validation Accuracy: {val_acc}")

    # Plot the epoch results
    print(f'Epoch [{epoch+1}/{n_epochs}] | '
          f'Train Loss: {train_loss:.4f} | '
          f'Val Loss: {val_loss:.4f} | '
          f'Train Acc: {train_acc:.4f} | '
          f'Val Acc: {val_acc:.4f}')

writer.flush()


In [None]:
torch.save({
    # 'epoch': epoch+1,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
            }, os.path.join("./model_checkpoints/", f'best-model-1.pt'))