In [1]:
import os
import re
import pandas as pd
from PIL import Image
import numpy as np
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from sklearn.model_selection import train_test_split

In [2]:
# Define models
models_dict = {
    'MobileNetV3S': models.mobilenet_v3_small(weights=None),
    'ShuffleNetV2': models.shufflenet_v2_x1_0(weights=None),
    "MobileNetV2": models.mobilenet_v2(weights=None),
    "ResNet18": models.resnet18(weights=None),
    "SqueezeNet": models.squeezenet1_0(weights=None),
}

In [3]:
# Training and evaluation
dataset_path = "./dataset"
augment_data = True
num_epochs = 100
batch_size = 64
learning_rate = 0.001
criterion = nn.MSELoss()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

In [4]:
# Define the augmentations
augmentations = {
    "brightness": transforms.Compose(
        [
            transforms.ColorJitter(brightness=0.5),
            transforms.ToTensor(),
            transforms.ToPILImage(),
        ]
    ),
    "contrast": transforms.Compose(
        [
            transforms.ColorJitter(contrast=0.5),
            transforms.ToTensor(),
            transforms.ToPILImage(),
        ]
    ),
    "saturation": transforms.Compose(
        [
            transforms.ColorJitter(saturation=0.5),
            transforms.ToTensor(),
            transforms.ToPILImage(),
        ]
    ),
    "hue": transforms.Compose(
        [
            transforms.ColorJitter(hue=0.5),
            transforms.ToTensor(),
            transforms.ToPILImage(),
        ]
    ),
}

In [5]:
dirs = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]

if augment_data:
    for dir in dirs:
        files = os.listdir(os.path.join(dataset_path, dir))

        for file_name in tqdm(files, desc=f"Processing {dir}"):
            file_path = os.path.join(dataset_path, dir, file_name)
            photo_id = re.search(r"\d+", file_name).group()

            if file_name != f"{photo_id}.jpg":
                continue

            image = Image.open(file_path).convert("RGB")

            for name, augmentation in augmentations.items():
                augmented_image = augmentation(image)
                augmented_image = np.array(augmented_image)
                augmented_image = cv2.cvtColor(augmented_image, cv2.COLOR_RGB2BGR)
                save_path = os.path.join(dataset_path, dir, f"{photo_id}_{name}.jpg")
                cv2.imwrite(save_path, augmented_image)

Processing 1652875851.3497071: 100%|██████████| 144/144 [00:05<00:00, 26.67it/s]
Processing 1652875901.3107166: 100%|██████████| 708/708 [00:26<00:00, 27.12it/s]
Processing 1652876013.741493: 100%|██████████| 513/513 [00:18<00:00, 27.04it/s]
Processing 1652876206.2541456: 100%|██████████| 828/828 [00:29<00:00, 27.83it/s]
Processing 1652876485.8123376: 100%|██████████| 977/977 [00:35<00:00, 27.31it/s]
Processing 1652959186.4507334: 100%|██████████| 805/805 [00:29<00:00, 26.92it/s]
Processing 1652959347.972946: 100%|██████████| 612/612 [00:23<00:00, 26.36it/s]
Processing 1653042695.4914637: 100%|██████████| 324/324 [00:11<00:00, 27.91it/s]
Processing 1653042775.5213027: 100%|██████████| 382/382 [00:13<00:00, 28.06it/s]
Processing 1653043202.5073502: 100%|██████████| 405/405 [00:14<00:00, 28.13it/s]
Processing 1653043345.3415065: 100%|██████████| 288/288 [00:10<00:00, 27.51it/s]
Processing 1653043428.8546412: 100%|██████████| 648/648 [00:24<00:00, 26.57it/s]


In [6]:
# Custom Dataset Class
class JetBotDataset(Dataset):
    def __init__(self, target_map, transform=None):
        self.target_map = target_map
        self.transform = transform

    def __len__(self):
        return len(self.target_map)

    def __getitem__(self, idx):
        img_path, target = self.target_map[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        target = torch.tensor(target, dtype=torch.float32)
        return image, target

In [7]:
def get_target(path, dirs):
    result = []
    for dir in dirs:
        labels = pd.read_csv(
            f"{path}/{dir}.csv", header=None, index_col=0, names=["speed", "turn"]
        )
        for file_name in os.listdir(os.path.join(path, dir)):
            file_path = os.path.join(path, dir, file_name)
            photo_id = int(re.search(r"\d+", file_name).group())
            if photo_id in labels.index:
                target = labels.loc[photo_id]
                result.append((file_path, (target["speed"], target["turn"])))
    return result

In [8]:
# Data loading and transformation
target_map = get_target(dataset_path, dirs)
train_data, test_data = train_test_split(target_map, test_size=0.2, random_state=42)

transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

train_dataset = JetBotDataset(train_data, transform=transform)
test_dataset = JetBotDataset(test_data, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [9]:
# Modify models for regression
def modify_model(model, model_name):
    if model_name == "SqueezeNet":
        model.classifier[1] = nn.Conv2d(512, 2, kernel_size=(1, 1), stride=(1, 1))
    elif model_name == "MobileNetV2":
        model.classifier[1] = nn.Linear(model.classifier[1].in_features, 2)
    elif model_name == "ResNet18":
        model.fc = nn.Linear(model.fc.in_features, 2)
    elif model_name == "MobileNetV3S":
        model.classifier[3] = nn.Linear(model.classifier[3].in_features, 2)
    elif model_name == "ShuffleNetV2":
        model.fc = nn.Linear(model.fc.in_features, 2)
    return model

In [10]:
# Training function
def train_model(model, dataloaders, criterion, optimizer, num_epochs):
    for epoch in range(num_epochs):
        print(f" Epoch {epoch}/{num_epochs - 1}")
        for phase in ["train", "val"]:
            if phase == "train":
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            for inputs, targets in dataloaders[phase]:
                inputs, targets = inputs.to(device), targets.to(device)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(inputs)
                    loss = criterion(outputs, targets)
                    if phase == "train":
                        loss.backward()
                        optimizer.step()
                running_loss += loss.item() * inputs.size(0)
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            print(f"  {phase} loss: {epoch_loss:.4f}")
    return model

In [11]:
for model_name in models_dict:
    model = modify_model(models_dict[model_name], model_name).to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    dataloaders = {"train": train_loader, "val": test_loader}
    print(f"Training {model_name}...")
    trained_model = train_model(
        model, dataloaders, criterion, optimizer, num_epochs=num_epochs
    )
    torch.save(trained_model.state_dict(), f"{model_name}_model.pth")
    print(f"{model_name} training complete.")

    # Export model to ONNX
    dummy_input = torch.randn(1, 3, 224, 224).to(device)
    torch.onnx.export(
        trained_model,
        dummy_input,
        f"./models/{model_name}.onnx",
        export_params=True,
        opset_version=11,
        do_constant_folding=True,
        input_names=["input"],
        output_names=["output"],
    )
    print(f"{model_name} exported as ONNX.\n")

Training MobileNetV3S...
 Epoch 0/99
