# Basic Version w/ Custom Architecture
For this approach, it will be used a custom architecture, inspired by Inception and Densenet.

In [None]:
import os

DATA_DIR = "../data/"
IMG_DIR = DATA_DIR + "/images/"
ANNOTATION_DIR = DATA_DIR + "/annotations/"
SPLITS_DIR = DATA_DIR + "/dl-split/"
OUT_DIR = "./out/basic_custom/"

os.makedirs(OUT_DIR, exist_ok=True)

SEED = 42

# Utility Function

In [None]:
import matplotlib.pyplot as plt
def plot_training_history(train_history, val_history, model_name, phase_name, out_dir):
    fig, (loss_ax, acc_ax) = plt.subplots(figsize=(12, 8), nrows=2)
    fig.suptitle(f"{model_name} - {phase_name} History")
    loss_ax.set_title("Cross Entropy Loss")
    loss_ax.plot(train_history["loss"], label="train")
    loss_ax.plot(val_history["loss"], label="val")
    loss_ax.legend(loc="best")

    acc_ax.set_title("Classification accuracy")
    acc_ax.plot(train_history["accuracy"], label="train")
    acc_ax.plot(val_history["accuracy"], label="val")
    loss_ax.legend(loc="best")

    plt.tight_layout()

    fig.savefig(f"{out_dir}/{model_name}_{phase_name}_history.png", dpi=150, bbox_inches='tight')

    return fig

## Load Dataset

In [None]:
# Fetching pre-defined splits
train_split = []
test_split = []

with open(SPLITS_DIR + "/train.txt") as train_split_f:
    train_split = [line.strip("\n") for line in train_split_f.readlines()]

with open(SPLITS_DIR + "/test.txt") as test_split_f:
    test_split = [line.strip("\n") for line in test_split_f.readlines()]

In [None]:
# Label mapping
label_encode_map = {
    "background": -100,
    "trafficlight": 0,
    "speedlimit": 1,
    "crosswalk": 2,
    "stop": 3,
}

label_decode_map = {
    -100: "background",
    0: "trafficlight",
    1: "speedlimit",
    2: "crosswalk",
    3: "stop",
}

# Cull training dataset to balance classes better

In [None]:
from utils.utils import parse_annotation

def get_annotations(split):
    annotations = {}
    for id in split:
        annotation = parse_annotation(f"{ANNOTATION_DIR}/{id}.xml", label_encode_map, return_biggest = True)
        annotation["labels"] = annotation["labels"][0]
        annotations[id] = annotation

    return annotations

annotations = get_annotations(train_split)

In [None]:
import pandas as pd
train_df = pd.DataFrame.from_dict(annotations, orient="index")
train_df

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

print("Label Ratios")
label_ratios = (train_df["labels"].value_counts() / len(train_df["labels"])).sort_index()
print(label_ratios)

fig, ax = plt.subplots()
sns.countplot(data=train_df, x="labels", ax=ax, tick_label=["trafficlight", "speedlimit", "crosswalk", "stop"])
ax.set_title("Label Distribution")

plt.show()
plt.clf()

In [None]:
import numpy as np
non_max_labels = label_ratios[label_ratios != label_ratios.max()]
ratio_to_return = np.sum(non_max_labels) + np.max(non_max_labels) * 2

print(f"Ratio of training dataset to return: {ratio_to_return}")

In [None]:
label_ratios[1] /= 2
label_ratios

In [None]:
weights = (1 - label_ratios) / np.sum(1 - label_ratios)
print("Weights")
print(weights)
train_df["weights"] = 0
for i, weight in enumerate(weights):
    train_df.loc[train_df["labels"] == i, "weights"] = weight / len(train_df[train_df["labels"] == i])

train_df

In [None]:
culled_df = train_df.sample(frac=ratio_to_return, weights="weights", random_state=SEED)
print("Label Ratios")
label_ratios = (culled_df["labels"].value_counts() / len(culled_df["labels"])).sort_index()
print(label_ratios)

fig, ax = plt.subplots()
sns.countplot(data=culled_df, x="labels", ax=ax, tick_label=["trafficlight", "speedlimit", "crosswalk", "stop"])
ax.set_title("Label Distribution on culled training dataset")

plt.show()
plt.clf()

culled_df

In [None]:
# Maintain the ratios of labels across splits
train_sample = culled_df.groupby("labels").sample(frac=0.8, random_state=SEED)
val_sample = culled_df.loc[culled_df.index.difference(train_sample.index)]

fig, (train_ax, val_ax) = plt.subplots(figsize=(16, 4), ncols=2)
for split, df, ax in zip(["Train", "Validation"], [train_sample, val_sample], [train_ax, val_ax]):
    sns.countplot(x=df["labels"], ax=ax)
    ax.set_title(f"Distribution of labels on {split} sample")
    ax.set_xlabel("Label")
    ax.set_ylabel("Count")
    
    print(f"{split} Label Ratios")
    print(pd.Series(df["labels"]).value_counts() / len(df["labels"]))
plt.show()
plt.clf()

In [None]:
from datasets.road_sign_dataset import RoadSignDataset

# Training dataset
training_data = RoadSignDataset(
    img_names=train_sample.index.tolist(),
    img_dir=IMG_DIR,
    annotation_dir=ANNOTATION_DIR,
    classes=label_encode_map,
    is_train=True,
    multilabel=False
)

# Validation dataset
validation_data = RoadSignDataset(
    img_names=val_sample.index.tolist(),
    img_dir=IMG_DIR,
    annotation_dir=ANNOTATION_DIR,
    classes=label_encode_map,
    is_train=True,
    multilabel=False
)

# Test dataset
testing_data = RoadSignDataset(
    img_names=test_split,
    img_dir=IMG_DIR,
    annotation_dir=ANNOTATION_DIR,
    classes=label_encode_map,
    is_train=False,
    multilabel=False,
)

# Create Dataloaders

In [None]:
from torch.utils.data import DataLoader

BATCH_SIZE = 8 # Tested on 1050TI with 4GB (can load at least 64 as well, but doesn't make sense to use 64 with low amount of data)
NUM_WORKERS = 4

train_dataloader = DataLoader(
    dataset=training_data,
    #sampler=train_sampler,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    shuffle=True,
    drop_last=True,
    collate_fn=training_data.collate_fn
)

val_dataloader = DataLoader(
    dataset=validation_data,
    #sampler=val_sampler,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    shuffle=True,
    drop_last=True,
    collate_fn=training_data.collate_fn
)

test_dataloader = DataLoader(
    dataset=testing_data,
    batch_size=1,
    num_workers=NUM_WORKERS,
    shuffle=False,
    drop_last=False,
    collate_fn=testing_data.collate_fn
)

# Model Definition

In [None]:
from models.TrafficSignifier import TrafficSignifier
from torch import nn
from torchinfo import summary

MODEL_NAME = "BASIC_CUSTOM"
N_CLASSES = 4

def get_model():
    return TrafficSignifier(
        num_classes=N_CLASSES,
        num_blocks=6,
        num_internal_layers=4,
    )

model = get_model()

summary(model)

# Define Optimizer, LR Scheduler, Loss function and Metric Scorer

In [None]:
from torch import optim
import torchmetrics

optimizer = optim.Adam(
    params=model.parameters(),
    lr=1e-2,
    betas=(0.9, 0.999),
    weight_decay=5e-4,
    amsgrad=True
)

lr_scheduler = optim.lr_scheduler.ExponentialLR(
    optimizer=optimizer,
    gamma=0.99,
    verbose=True
)

loss_fn = nn.CrossEntropyLoss(ignore_index=label_encode_map["background"])

metric_scorer = torchmetrics.Accuracy(
    threshold=0.5,
    num_classes=N_CLASSES,
    average="micro",
    ignore_index=label_encode_map["background"]
)

# Define Epoch Iteration

In [None]:
import torch
import torch.nn.functional as F
from tqdm import tqdm

def epoch_iter(dataloader, model, loss_fn, device, is_train = True, optimizer=None, lr_scheduler=None):
    if is_train:
        assert optimizer is not None, "When training, please provide an optimizer."
      
    num_batches = len(dataloader)

    if is_train:
        model.train()
    else:
        model.eval()

    probs = []
    preds = []
    expected_labels = []
    imageIds = []

    total_loss = 0.0

    scaler = torch.cuda.amp.GradScaler()

    with torch.set_grad_enabled(is_train):
        for _batch, (X, y) in enumerate(tqdm(dataloader)):
            labels = y["labels"]
            ids = y["imageIds"]

            X, y = X.to(device), labels.to(device)

            with torch.cuda.amp.autocast():
                pred = model(X)
                loss = loss_fn(pred, y)

            if is_train:
                optimizer.zero_grad()
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
            
            total_loss += loss.item()


            prob = F.softmax(pred, dim=1)
            final_pred = torch.argmax(prob, dim=1)

            probs.extend(prob.detach().cpu().numpy())
            preds.extend(final_pred.detach().cpu().numpy())
            expected_labels.extend(y.detach().cpu().numpy())
            imageIds.extend([f"road{imageId}" for imageId in ids.detach().cpu().numpy()])

        if is_train and lr_scheduler is not None:
            lr_scheduler.step()
    

    return (expected_labels, preds, probs, imageIds), total_loss / num_batches

# Train Model

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print(f"Using device: {device}")

In [None]:
import numpy as np
NUM_EPOCHS = 50

model.to(device)    
# model.features.requires_grad_(False) # Freeze feature layer

train_history = {
    "loss": [],
    "accuracy": [],
}

val_history = {
    "loss": [],
    "accuracy": [],
}

best_val_loss = np.inf
best_val_accuracy = 0
best_epoch = -1

print(f"Starting {MODEL_NAME} training...")

for epoch in range(1, NUM_EPOCHS + 1):
    print(f"Epoch[{epoch}/{NUM_EPOCHS}]")
    (train_target, train_preds, train_probs, _), train_loss = epoch_iter(
        dataloader=train_dataloader,
        model=model,
        loss_fn=loss_fn,
        device=device,
        is_train=True,
        optimizer=optimizer,
        lr_scheduler=lr_scheduler
    )

    train_accuracy = metric_scorer(torch.tensor(np.array(train_probs)), torch.tensor(np.array(train_target))).item()
    print(f"Training loss: {train_loss:.3f}\t Training micro accuracy: {train_accuracy:.3f}")

    (val_target, val_preds, val_probs, _), val_loss = epoch_iter(
        dataloader=val_dataloader,
        model=model,
        loss_fn=loss_fn,
        device=device,
        is_train=False,
    )

    val_accuracy = metric_scorer(torch.tensor(np.array(val_probs)), torch.tensor(np.array(val_target))).item()
    print(f"Validation loss: {val_loss:.3f}\t Validation micro accuracy: {val_accuracy:.3f}")

    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_val_accuracy = val_accuracy
        best_epoch = epoch
        save_dict = {"model": model.state_dict(), "optimizer": optimizer.state_dict(), "lr_scheduler": lr_scheduler.state_dict(), "epoch": epoch}
        torch.save(save_dict, f"{OUT_DIR}/{MODEL_NAME}_best_model.pth")

    # Save latest model
    save_dict = {"model": model.state_dict(), "optimizer": optimizer.state_dict(), "lr_scheduler": lr_scheduler.state_dict(), "epoch": epoch}
    torch.save(save_dict, f"{OUT_DIR}/{MODEL_NAME}_latest_model.pth")

    # Save loss and accuracy in history
    train_history["loss"].append(train_loss)
    train_history["accuracy"].append(train_accuracy)

    val_history["loss"].append(val_loss)
    val_history["accuracy"].append(val_accuracy)

    print("----------------------------------------------------------------")

print(
    f"\nFinished training..."
    f"\nBest epoch: {best_epoch}\t Validation loss on best epoch: {best_val_loss}\t Accuracy on best epoch: {best_val_accuracy}"
)

In [None]:
fig = plot_training_history(train_history=train_history, val_history=val_history, model_name=MODEL_NAME, phase_name="Training", out_dir=OUT_DIR)
fig.show()

Clear GPU memory for guarantees

In [None]:
import gc
model = None

gc.collect()
torch.cuda.empty_cache()
torch.cuda.mem_get_info(device)

In [None]:
!nvidia-smi

# Fine-tune the model

Load best model from first training session

In [None]:
BATCH_SIZE = 8 # Have to reduce batch size otherwise GPU memory dies (tested on 1050TI with 4GB)

train_dataloader = DataLoader(
    dataset=training_data,
    sampler=train_sampler,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    shuffle=False, # Must be False because we're using a random sampler already
    drop_last=True,
    collate_fn=training_data.collate_fn
)

val_dataloader = DataLoader(
    dataset=training_data,
    sampler=val_sampler,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    shuffle=False, # Must be False because we're using a random sampler already
    drop_last=True,
    collate_fn=training_data.collate_fn
)

In [None]:
best_checkpoint = torch.load(f"{OUT_DIR}/{MODEL_NAME}_best_model.pth")
model = get_model()
model.load_state_dict(best_checkpoint["model"])
best_checkpoint = None

model.to(device)


print("Loaded best model...")

In [None]:
ft_optimizer = optim.Adam(
    params=model.parameters(),
    lr=1e-3,
    betas=(0.9, 0.999),
    weight_decay=5e-4,
    amsgrad=True
)

ft_lr_scheduler = optim.lr_scheduler.ExponentialLR(
    optimizer=ft_optimizer,
    gamma=0.99
)

In [None]:
FT_NUM_EPOCHS = 30
#model.features.requires_grad_(True) # Unfreeze feature layer for fine-tuning

ft_train_history = {
    "loss": [],
    "accuracy": [],
}

ft_val_history = {
    "loss": [],
    "accuracy": [],
}

ft_best_val_loss = best_val_loss
ft_best_val_accuracy = best_val_accuracy
ft_best_epoch = -1

print(f"Starting {MODEL_NAME} fine-tuning...")

for epoch in range(1, FT_NUM_EPOCHS + 1):
    print(f"Epoch[{epoch}/{FT_NUM_EPOCHS}]")
    (train_target, train_preds, train_probs, _), train_loss = epoch_iter(
        dataloader=train_dataloader,
        model=model,
        loss_fn=loss_fn,
        device=device,
        is_train=True,
        optimizer=ft_optimizer,
        lr_scheduler=ft_lr_scheduler
    )

    train_accuracy = metric_scorer(torch.tensor(np.array(train_probs)), torch.tensor(np.array(train_target))).item()
    print(f"Training loss: {train_loss:.3f}\t Training micro accuracy: {train_accuracy:.3f}")

    (val_target, val_preds, val_probs, _), val_loss = epoch_iter(
        dataloader=val_dataloader,
        model=model,
        loss_fn=loss_fn,
        device=device,
        is_train=False,
    )

    val_accuracy = metric_scorer(torch.tensor(np.array(val_probs)), torch.tensor(np.array(val_target))).item()
    print(f"Validation loss: {val_loss:.3f}\t Validation micro accuracy: {val_accuracy:.3f}")

    # Save best model
    if val_loss < ft_best_val_loss:
        ft_best_val_loss = val_loss
        ft_best_val_accuracy = val_accuracy
        ft_best_epoch = epoch
        save_dict = {"model": model.state_dict(), "optimizer": ft_optimizer.state_dict(), "lr_scheduler": ft_lr_scheduler.state_dict(), "epoch": epoch}
        torch.save(save_dict, f"{OUT_DIR}/{MODEL_NAME}_ft_best_model.pth")

    # Save latest model
    save_dict = {"model": model.state_dict(), "optimizer": ft_optimizer.state_dict(), "lr_scheduler": ft_lr_scheduler.state_dict(), "epoch": epoch}
    torch.save(save_dict, f"{OUT_DIR}/{MODEL_NAME}_ft_latest_model.pth")

    # Save loss and accuracy in history
    ft_train_history["loss"].append(train_loss)
    ft_train_history["accuracy"].append(train_accuracy)

    ft_val_history["loss"].append(val_loss)
    ft_val_history["accuracy"].append(val_accuracy)

    print("----------------------------------------------------------------")

print(
    f"\nFinished fine-tuning..."
    f"\nBest epoch: {ft_best_epoch}\t Validation loss on best epoch: {ft_best_val_loss}\t Accuracy on best epoch: {ft_best_val_accuracy}"
)

In [None]:
fig = plot_training_history(train_history=ft_train_history, val_history=ft_val_history, model_name=MODEL_NAME, phase_name="FINE-TUNE", out_dir=OUT_DIR)
fig.show()

Clean GPU memory again

In [None]:
model = None

gc.collect()
torch.cuda.empty_cache()
torch.cuda.mem_get_info(device)

# Test the model

In [None]:
best_checkpoint = torch.load(f"{OUT_DIR}/{MODEL_NAME}_best_model.pth")
model = get_model()
model.load_state_dict(best_checkpoint["model"])
best_checkpoint = None

model.to(device)


print("Loaded best fine-tuned model...")

In [None]:
(test_target, test_preds, test_probs, test_ids), test_loss = epoch_iter(
    dataloader=test_dataloader,
    model=model,
    loss_fn=loss_fn,
    device=device,
    is_train=False,
)

In [None]:
test_metrics = torchmetrics.MetricCollection(
    metrics={
        "micro_accuracy": metric_scorer,
        "macro_accuracy": torchmetrics.Accuracy(
            threshold=0.5,
            num_classes=N_CLASSES,
            average="macro",
        ),
        "weighted_accuracy": torchmetrics.Accuracy(
            threshold=0.5,
            num_classes=N_CLASSES,
            average="weighted",
        ),
        "micro_f1_score": torchmetrics.F1Score(
            threshold=0.5,
            num_classes=N_CLASSES,
            average="micro",
        ),
        "macro_f1_score": torchmetrics.F1Score(
            threshold=0.5,
            num_classes=N_CLASSES,
            average="macro",
        ),
        "weighted_f1_score": torchmetrics.F1Score(
            threshold=0.5,
            num_classes=N_CLASSES,
            average="weighted",
        )
    }
)

test_probs_tensor = torch.tensor(np.array(test_probs))
test_target_tensor = torch.tensor(np.array(test_target))

test_metrics_scores = test_metrics(test_probs_tensor, test_target_tensor)

print(test_metrics_scores)

In [None]:
from sklearn.metrics import classification_report
print(classification_report(y_true=test_target, y_pred=test_preds, target_names=list(label_encode_map.keys())[1:]))

In [None]:
from PIL import Image
def showErrors(imageIds, y_true, y_pred, limit=None, img_dir = "./"):
    N = limit if limit is not None else len(imageIds)
    N_COLS = 4
    N_ROWS = N // 4 + N % 4
    shown = 0
    fig = plt.figure(figsize=(8*N_COLS, 12*N_ROWS))
    for (imageId, correct, predicted) in zip(imageIds, y_true, y_pred):
        if correct == predicted:
            continue
        image = np.array(Image.open(f"{img_dir}/{imageId}.png").convert("RGB"))
        target_label = f"TRUE: {label_decode_map[correct]}"
        predicted_label = f"PRED: {label_decode_map[predicted]}"

        ax = fig.add_subplot(N_ROWS, N_COLS, shown + 1)
        ax.axis("off")
        ax.text(0, -8, imageId, fontsize=14, color="black")
        ax.text((len(imageId) + 1) * 8, -8, target_label, fontsize=14, color='green') # correct
        ax.text((len(imageId) + len(target_label) + 1) * 8, -8, predicted_label, fontsize=14, color='red')  # predicted
        ax.imshow(image)
        
        shown += 1

        if shown >= N:
            break

    plt.tight_layout()
    plt.show()

In [None]:
showErrors(test_ids, test_target, test_preds, limit=None, img_dir=IMG_DIR)