In [23]:
import os
import shutil
from PIL import Image
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms

from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler

from typing import Tuple

In [6]:
device = torch.device("cuda" if torch.mps.is_available() else "cpu")

device

device(type='cpu')

In [7]:
SEED = 42

In [8]:
image_paths = []
labels = []

classes = {v: i for i, v in enumerate(sorted(os.listdir('Vehicles/')))}

for root, _, files in os.walk('Vehicles/'):
    label = os.path.basename(root)
    if label in classes:
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                image_paths.append(os.path.join(root, file))
                labels.append(label)

X_train, X_test, y_train, y_test = train_test_split(
    image_paths, labels, test_size=0.2, random_state=SEED, stratify=labels
)

for split_name, paths, split_labels in [('train', X_train, y_train), 
                                         ('test', X_test, y_test)]:
    for path, label in zip(paths, split_labels):
        dest_dir = os.path.join('Vehicles_split', split_name, label)
        os.makedirs(dest_dir, exist_ok=True)
        shutil.copy2(path, os.path.join(dest_dir, os.path.basename(path)))

In [9]:
class VehiclesDataset:

    def __init__(self, data_path: str, transforms=None):
        self.data_path = data_path
        self.transforms = transforms
        
        self.classes = {v: i for i, v in enumerate(sorted(os.listdir(data_path)))}
        
        self.image_paths = []
        for root, _, files in os.walk(data_path):
            label = os.path.basename(root)
            if label in self.classes:
                for file in files:
                    if file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                        self.image_paths.append((os.path.join(root, file), label))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path, label = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        if self.transforms:
            image = self.transforms(image)
        return image, self.classes[label]

    def generate_csv(self):
        data = []

        print(self.classes)
        
        for image_path, label in self.image_paths:
            image = Image.open(image_path).convert('RGB')
            width, height = image.size
            
            data.append({
                'label': label,
                'path': image_path,
                'width': width,
                'height': height
            })
        
        df = pd.DataFrame(data)
        return df

In [10]:
def get_mean_and_std(dataloader):
    channels_sum, channels_squared_sum, num_batches = 0, 0, 0
    for data, _ in dataloader:
        channels_sum += torch.mean(data, dim=[0,2,3])
        channels_squared_sum += torch.mean(data**2, dim=[0,2,3])
        num_batches += 1
    
    mean = channels_sum / num_batches

    std = (channels_squared_sum / num_batches - mean ** 2) ** 0.5

    return mean, std

In [11]:
dataset = VehiclesDataset('Vehicles/', transforms=transforms.Compose([transforms.Resize((100, 100)), transforms.ToTensor()]))
loader = DataLoader(dataset, batch_size=16, num_workers=0, shuffle=False)

In [12]:
df = dataset.generate_csv()

{'Auto Rickshaws': 0, 'Bikes': 1, 'Cars': 2, 'Motorcycles': 3, 'Planes': 4, 'Ships': 5, 'Trains': 6}




In [13]:
df

Unnamed: 0,label,path,width,height
0,Planes,Vehicles/Planes/Plane (566).jpg,800,516
1,Planes,Vehicles/Planes/Plane (450).jpg,1600,1200
2,Planes,Vehicles/Planes/Plane (209).jpg,1000,541
3,Planes,Vehicles/Planes/Plane (706).jpg,800,473
4,Planes,Vehicles/Planes/Plane (231).jpg,800,478
...,...,...,...,...
5583,Motorcycles,Vehicles/Motorcycles/Motorcycle (397).png,225,192
5584,Motorcycles,Vehicles/Motorcycles/Motorcycle (634).jpg,275,183
5585,Motorcycles,Vehicles/Motorcycles/Motorcycle (88).jpg,225,225
5586,Motorcycles,Vehicles/Motorcycles/Motorcycle (177).jpg,275,183


In [14]:
MEAN, STD = get_mean_and_std(loader)



In [15]:
def load_data(config, mean: torch.Tensor, std: torch.Tensor, size: Tuple[int]) -> Tuple[DataLoader, DataLoader]: 
    train_transforms = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.RandomCrop((224, 224)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])

    test_transforms = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])

    train_dataset = VehiclesDataset('Vehicles_split/train', transforms=train_transforms)
    test_dataset = VehiclesDataset('Vehicles_split/test', transforms=test_transforms)

    train_loader = DataLoader(train_dataset, batch_size=int(config["batch_size"]), shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=int(config["batch_size"]), shuffle=False)

    return train_loader, test_loader

In [16]:
class LeNetFive(nn.Module):

    def __init__(self, classes):
        super(LeNetFive, self).__init__()
        self.l1 = nn.Sequential(
            *self.make_layers(3, 6, kernel_size=5),
            nn.AvgPool2d(kernel_size=2, stride=2)
        )
        self.l2 = nn.Sequential(
            *self.make_layers(6, 16, kernel_size=5),
            nn.AvgPool2d(kernel_size=2, stride=2)
        )
        self.flatten = nn.Flatten()
        self.fc = nn.Sequential([
            nn.Linear(16 * 5 * 5, 120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Linear(84, classes)
        ])

    def make_layers(self, in_channels, out_channels, kernel_size, padding=0, stride=1):
        return [
            nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        ]

    def forward(self, x):
        x = self.l1(x)
        x = self.l2(x)
        x = self.flatten(x)
        x = self.fc(x)
        return x

In [17]:
class AlexNet(nn.Module):

    def __init__(self, classes, dropout_prob=0.5):
        super(AlexNet, self).__init__()
        
        self.features = nn.Sequential(
            *self.make_layers(3, 96, kernel_size=11, stride=4),
            nn.MaxPool2d(kernel_size=3, stride=2),
            
            *self.make_layers(96, 256, kernel_size=5, stride=2),
            nn.MaxPool2d(kernel_size=3, stride=2),
            
            *self.make_layers(256, 384, kernel_size=3, padding=1),
            *self.make_layers(384, 384, kernel_size=3, padding=1),
            *self.make_layers(384, 256, kernel_size=3, padding=1),
            
            nn.MaxPool2d(kernel_size=3, stride=2)
        )
        
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(),
            nn.Dropout(dropout_prob),
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Dropout(dropout_prob),
            nn.Linear(4096, classes)
        )

    def make_layers(self, in_channels, out_channels, kernel_size, padding=0, stride=1):
        return [
            nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        ]

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [None]:
def train_epoch(model: nn.Module, train_loader: DataLoader, criterion: nn.Module, optimizer: optim.Optimizer, device: torch.device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for data, target in train_loader:
        data = data.float().to(device)
        target = target.long().to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        pred = output.argmax(dim=1)
        correct += pred.eq(target).sum().item()
        total += target.size(0)
    
    return total_loss / len(train_loader), 100. * correct / total


def test_epoch(model: nn.Module, test_loader: DataLoader, criterion: nn.Module, optimizer: optim.Optimizer, device: torch.device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for data, target in test_loader:
            data = data.float().to(device)
            target = target.long().to(device)

            output = model(data)
            loss = criterion(output, target)
            
            total_loss += loss.item()
            pred = output.argmax(dim=1)
            correct += pred.eq(target).sum().item()
            total += target.size(0)

    return total_loss / len(test_loader), 100. * correct / total


def evaluate(model: nn.Module, loader: DataLoader, criterion: nn.Module, optimizer: optim.Optimizer, device: torch.device):
    model.eval()
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)

            pred = output.argmax(dim=1)
            
            all_preds.extend(pred.cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)

    accuracy = (all_preds == all_targets).mean()
    precision_weighted = precision_score(all_targets, all_preds, average='weighted', zero_division=0)
    recall_weighted = recall_score(all_targets, all_preds, average='weighted', zero_division=0)
    f1_weighted = f1_score(all_targets, all_preds, average='weighted', zero_division=0)
    conf_matrix = confusion_matrix(all_targets, all_preds)

    print(f"Accuracy:  {accuracy:.4f}")
    print(f"Precision: {precision_weighted:.4f}")
    print(f"Recall:    {recall_weighted:.4f}")
    print(f"F1-Score:  {f1_weighted:.4f}")
    
    fig = go.Figure(data=go.Heatmap(
        z=conf_matrix,
        x=[str(i) for i in range(conf_matrix.shape[1])],
        y=[str(i) for i in range(conf_matrix.shape[0])],
        colorscale='Blues',
        text=conf_matrix,
        texttemplate='%{text}',
        textfont={"size": 12},
        colorbar=dict(title="Count")
    ))
    
    fig.update_layout(
        title='Confusion Matrix',
        xaxis_title='Predicted Label',
        yaxis_title='True Label',
        width=700,
        height=700
    )
    
    fig.show()

In [None]:
def train_lenetfive(config):
    train_loader, test_loader = load_data(config, MEAN, STD, (32, 32))
    
    model = LeNetFive(10)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config['lr'], weight_decay=config["weight_decay"])
    
    epochs = config.get('epochs', 10)
    
    train_losses = []
    test_losses = []
    train_accs = []
    test_accs = []
    
    for epoch in range(epochs):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        test_loss, test_acc = test_epoch(model, test_loader, criterion, device)
        
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        train_accs.append(train_acc)
        test_accs.append(test_acc)
        
        print(f'Epoch {epoch+1}/{epochs}')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {test_loss:.4f}, Val Acc: {test_acc:.2f}%')
        
        tune.report(loss=test_loss, accuracy=test_acc, train_loss=train_loss, train_accuracy=train_acc)
    
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=list(range(1, epochs+1)), y=train_losses, mode='lines+markers', name='Train Loss'))
    fig.add_trace(go.Scatter(x=list(range(1, epochs+1)), y=test_losses, mode='lines+markers', name='Val Loss'))
    fig.update_layout(title='Training and Validation Loss', xaxis_title='Epoch', yaxis_title='Loss')
    fig.show()
    
    fig2 = go.Figure()
    fig2.add_trace(go.Scatter(x=list(range(1, epochs+1)), y=train_accs, mode='lines+markers', name='Train Acc'))
    fig2.add_trace(go.Scatter(x=list(range(1, epochs+1)), y=test_accs, mode='lines+markers', name='Val Acc'))
    fig2.update_layout(title='Training and Validation Accuracy', xaxis_title='Epoch', yaxis_title='Accuracy (%)')
    fig2.show()

lenetfive_config = {
    "lr": tune.loguniform(1e-4, 1e-1),
    "weight_decay": tune.loguniform(1e-6, 1e-2),
    "batch_size": tune.choice([32, 64, 128]),
    "epochs": 10
}

In [None]:
scheduler = ASHAScheduler(
    metric="accuracy",
    mode="max",
    max_t=10,
    grace_period=3,
    reduction_factor=2
)

reporter = CLIReporter(
    metric_columns=["loss", "accuracy", "train_loss", "train_accuracy", "training_iteration"]
)

result = tune.run(
    train_lenetfive,
    resources_per_trial={"cpu": 2, "gpu": 0.5 if torch.cuda.is_available() else 0},
    config=lenetfive_config,
    num_samples=20,
    scheduler=scheduler,
    progress_reporter=reporter,
    name="lenet5_tune"
)
    
best_trial = result.get_best_trial("accuracy", "max", "last")
print(f"\nBest trial config: {best_trial.config}")
print(f"Best trial final validation accuracy: {best_trial.last_result['accuracy']:.2f}%")
print(f"Best trial final validation loss: {best_trial.last_result['loss']:.4f}")

In [None]:
def train_alexnet(config):
    train_loader, test_loader = load_data(config, MEAN, STD, (227, 227))
    
    model = AlexNet(10)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config['lr'], weight_decay=config["weight_decay"])
    
    epochs = config.get('epochs', 10)
    
    train_losses = []
    test_losses = []
    train_accs = []
    test_accs = []
    
    for epoch in range(epochs):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        test_loss, test_acc = test_epoch(model, test_loader, criterion, device)
        
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        train_accs.append(train_acc)
        test_accs.append(test_acc)
        
        print(f'Epoch {epoch+1}/{epochs}')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {test_loss:.4f}, Val Acc: {test_acc:.2f}%')
        
        tune.report(loss=test_loss, accuracy=test_acc, train_loss=train_loss, train_accuracy=train_acc)
    
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=list(range(1, epochs+1)), y=train_losses, mode='lines+markers', name='Train Loss'))
    fig.add_trace(go.Scatter(x=list(range(1, epochs+1)), y=test_losses, mode='lines+markers', name='Val Loss'))
    fig.update_layout(title='Training and Validation Loss', xaxis_title='Epoch', yaxis_title='Loss')
    fig.show()
    
    fig2 = go.Figure()
    fig2.add_trace(go.Scatter(x=list(range(1, epochs+1)), y=train_accs, mode='lines+markers', name='Train Acc'))
    fig2.add_trace(go.Scatter(x=list(range(1, epochs+1)), y=test_accs, mode='lines+markers', name='Val Acc'))
    fig2.update_layout(title='Training and Validation Accuracy', xaxis_title='Epoch', yaxis_title='Accuracy (%)')
    fig2.show()

alexnet_config = {
    "dropout_rate": tune.uniform(0.1, 0.5),
    "lr": tune.loguniform(1e-5, 1e-2),
    "batch_size": tune.choice([32, 64, 128]),
    "weight_decay": tune.loguniform(1e-6, 1e-3),
    "epochs": 20
}

In [None]:
scheduler = ASHAScheduler(
    metric="accuracy",
    mode="max",
    max_t=10,
    grace_period=3,
    reduction_factor=2
)

reporter = CLIReporter(
    metric_columns=["loss", "accuracy", "train_loss", "train_accuracy", "training_iteration"]
)

result = tune.run(
    train_alexnet,
    resources_per_trial={"cpu": 2, "gpu": 0.5 if torch.cuda.is_available() else 0},
    config=alexnet_config,
    num_samples=20,
    scheduler=scheduler,
    progress_reporter=reporter,
    name="lenet5_tune"
)
    
best_trial = result.get_best_trial("accuracy", "max", "last")
print(f"\nBest trial config: {best_trial.config}")
print(f"Best trial final validation accuracy: {best_trial.last_result['accuracy']:.2f}%")
print(f"Best trial final validation loss: {best_trial.last_result['loss']:.4f}")