In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from tqdm import tqdm 
import matplotlib.pyplot as plt
import numpy as np

In [2]:
# Define the CNN model
class SCNN(nn.Sequential):
    def __init__(self):
        super(SCNN, self).__init__(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Linear(64 * 20 * 20, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )
class regCNN(nn.Sequential):
    def __init__(self):
        super(regCNN, self).__init__(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Dropout2d(0.25),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Linear(64 * 20 * 20, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 10)
        )
    


In [9]:
import torch
import torch.nn as nn
from tqdm import tqdm
import numpy as np

class PSO:
    def __init__(self, model, loss_fn, train_loader, num_particles=10, lr=0.1, inertia=0.5, c1=1.5, c2=1.5):
        self.model = model
        self.loss_fn = loss_fn
        self.train_loader = train_loader
        self.num_particles = num_particles
        self.lr = lr
        self.inertia = inertia
        self.c1 = c1
        self.c2 = c2
        
        self.param_shapes = [p.shape for p in model.parameters()]
        self.param_size = sum(p.numel() for p in model.parameters())

        self.swarm_pos = [torch.randn(self.param_size) for _ in range(num_particles)]
        self.swarm_vel = [torch.randn(self.param_size) * 0.1 for _ in range(num_particles)]
        self.pbest_pos = self.swarm_pos.copy()
        self.pbest_scores = [float('inf')] * num_particles
        self.gbest_pos = None
        self.gbest_score = float('inf')

    def _set_model_params(self, flat_params):
        idx = 0
        with torch.no_grad():
            for p in self.model.parameters():
                numel = p.numel()
                p.copy_(flat_params[idx:idx+numel].view(p.shape))
                idx += numel

    def _evaluate(self, flat_params):
        self._set_model_params(flat_params)
        self.model.eval()
        loss_total = 0.0
        with torch.no_grad():
            for images, labels in self.train_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = self.model(images)
                loss = self.loss_fn(outputs, labels)
                loss_total += loss.item()
        return loss_total / len(self.train_loader)

    def step(self):
        for i in range(self.num_particles):
            fitness = self._evaluate(self.swarm_pos[i])

            # Update personal best
            if fitness < self.pbest_scores[i]:
                self.pbest_scores[i] = fitness
                self.pbest_pos[i] = self.swarm_pos[i].clone()

            # Update global best
            if fitness < self.gbest_score:
                self.gbest_score = fitness
                self.gbest_pos = self.swarm_pos[i].clone()

        for i in range(self.num_particles):
            r1 = torch.rand(self.param_size)
            r2 = torch.rand(self.param_size)
            cognitive = self.c1 * r1 * (self.pbest_pos[i] - self.swarm_pos[i])
            social = self.c2 * r2 * (self.gbest_pos - self.swarm_pos[i])
            self.swarm_vel[i] = self.inertia * self.swarm_vel[i] + cognitive + social
            self.swarm_pos[i] += self.lr * self.swarm_vel[i]

        # Set model to best found so far
        self._set_model_params(self.gbest_pos)

In [10]:
# Device setup for M3 Pro
if torch.backends.mps.is_available() and torch.backends.mps.is_built():
    device = torch.device("mps")
    print("Using MPS (M3 Pro GPU)")
else:
    device = torch.device("cpu")
    print("Using CPU (MPS not available)")

Using MPS (M3 Pro GPU)


In [11]:
def train_with_pso(model, num_epochs, patience, train_loader, val_loader, model_path):
    best_val_loss = float('inf')
    patience_counter = 0
    criterion = nn.CrossEntropyLoss()
    pso = PSO(model, criterion, train_loader)
    train_loss_arr = []
    val_loss_arr = []

    for epoch in range(num_epochs):
        print(f"\nEpoch [{epoch+1}/{num_epochs}] PSO optimizing...")
        pso.step()  # run one step of PSO (i.e., update all particles)
        
        # Evaluate best model
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        val_loss /= len(val_loader)
        accuracy = 100 * correct / total
        val_loss_arr.append(val_loss)
        train_loss_arr.append(pso.gbest_score)
        print(f"Validation Loss: {val_loss:.4f}, Accuracy: {accuracy:.2f}%, Best Val Loss: {best_val_loss:.2f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), f'{model_path}best.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered.")
                break

    return train_loss_arr, val_loss_arr

In [12]:
def train(model,num_epochs,patience,train_loader,val_loader,model_path):
    best_val_loss = float('inf')
    patience_counter = 0
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    train_loss_arr =[]
    val_loss_arr = []
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        train_loop = tqdm(train_loader, desc=f"Epoch [{epoch+1}/{num_epochs}]")
        for images, labels in train_loop:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            train_loop.set_postfix(loss=running_loss / (train_loop.n + 1))
        
        avg_loss = running_loss / len(train_loader)
        train_loss_arr.append(avg_loss)
        print(f"Epoch [{epoch+1}/{num_epochs}], Training Loss: {avg_loss:.4f}")

        # Validation
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        val_loss /= len(val_loader)
        val_loss_arr.append(val_loss)
        accuracy = 100 * correct / total
        print(f"Validation Loss: {val_loss:.4f}, Accuracy: {accuracy:.2f}% , Best Val Loss: {best_val_loss:.2f}")

        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), f'{model_path}best.pth')
            # print("Best Model saved successfully!")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered. Training stopped.")
                break
    return train_loss_arr,val_loss_arr

In [13]:
def test(model_class ,model_path,val_loader):
    model = model_class
    model.load_state_dict(torch.load(model_path, map_location=device)) 
    model.to(device)
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Validation Accuracy: {accuracy:.2f}%")

In [14]:
def plot(array,filename):
    epochs = np.arange(len(array))  # Epochs are the indices of the array
    plt.figure(figsize=(10, 6))
    plt.plot(epochs, array, marker='o', linestyle='-')
    plt.title('Average Loss vs. Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Average Loss')
    plt.grid(True)
    plt.savefig(f'plots/{filename}.png')
    plt.close() 

# Simple

In [7]:
simple_transform = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


# Load Imagenette dataset
qual = '160px'
folder = f'./data/Simple{qual}/'
train_dataset = datasets.Imagenette(root=folder, split='train', size=qual, download=True, transform=simple_transform)
val_dataset = datasets.Imagenette(root=folder, split='val', size=qual, download=True, transform=simple_transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)



In [None]:
model = SCNN().to(device)
train_loss_arr,val_loss_arr = train_with_pso(model,5,1,train_loader,val_loader,'./model/PSO/')
plot(train_loss_arr,'Simple_Train_loss')
plot(val_loss_arr,'Simple_Val_loss')


In [None]:
test(SCNN(),'./model/Simple/best.pth',val_loader)

Validation Accuracy: 63.49%


# Regularisation

In [15]:
# Define transforms
reg_transform = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(degrees=15),  
    transforms.ColorJitter( brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


# Load Imagenette dataset
qual = '160px'
folder = f'./data/reg{qual}/'
train_dataset_reg = datasets.Imagenette(root=folder, split='train', size=qual, download=True, transform=reg_transform)
val_dataset_reg = datasets.Imagenette(root=folder, split='val', size=qual, download=True, transform=reg_transform)

train_loader_reg = DataLoader(train_dataset_reg, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
val_loader_reg = DataLoader(val_dataset_reg, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)



In [17]:
model2 = regCNN().to(device)

train_loss_arr,val_loss_arr = train_with_pso(model2,5,5,train_loader_reg,val_loader_reg,'./model/PSO/')
plot(train_loss_arr,'RegP_Train_loss')
plot(val_loss_arr,'RegP_Val_loss')


Epoch [1/5] PSO optimizing...
Validation Loss: 453088.4250, Accuracy: 9.53%, Best Val Loss: inf

Epoch [2/5] PSO optimizing...
Validation Loss: 379957.2857, Accuracy: 8.25%, Best Val Loss: 453088.43

Epoch [3/5] PSO optimizing...
Validation Loss: 255019.2958, Accuracy: 11.57%, Best Val Loss: 379957.29

Epoch [4/5] PSO optimizing...
Validation Loss: 154827.1638, Accuracy: 8.64%, Best Val Loss: 255019.30

Epoch [5/5] PSO optimizing...
Validation Loss: 81036.8987, Accuracy: 10.93%, Best Val Loss: 154827.16


In [None]:
test(regCNN(),'./model/Reg/best.pth',val_loader_reg)

Validation Accuracy: 67.90%


# Transfer Learning

In [None]:
cifar_transform = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(degrees=15),  
    transforms.ColorJitter( brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


cifar_train = datasets.CIFAR10(root='./data/cifar/', train=True, download=True, transform=cifar_transform)


cifar_test = datasets.CIFAR10(root='./data/cifar/', train=False, download=True, transform=cifar_transform)


cifar_train_loader = DataLoader(cifar_train, batch_size=64, shuffle=True)
cifar_test_loader = DataLoader(cifar_test, batch_size=64, shuffle=False)

In [None]:
best_model = regCNN().to(device)
best_model.load_state_dict(torch.load("./model/Reg/best.pth"))
for layer in best_model.modules():
    if isinstance(layer, nn.Conv2d):
        for param in layer.parameters():
            param.requires_grad = False

train_loss_arr,val_loss_arr = train(best_model,15,3, cifar_train_loader, cifar_test_loader,'./model/cifar/')

plot(train_loss_arr,'cifar_Train_loss')
plot(val_loss_arr,'cifar_Val_loss')

Epoch [1/10]: 100%|██████████| 782/782 [01:12<00:00, 10.84it/s, loss=1.97]


Epoch [1/10], Training Loss: 1.9654
Validation Loss: 1.6052, Accuracy: 42.70% , Best Val Loss: inf


Epoch [2/10]: 100%|██████████| 782/782 [01:11<00:00, 10.89it/s, loss=1.74]


Epoch [2/10], Training Loss: 1.7377
Validation Loss: 1.5422, Accuracy: 46.56% , Best Val Loss: 1.61


Epoch [3/10]: 100%|██████████| 782/782 [05:21<00:00,  2.43it/s, loss=1.68]


Epoch [3/10], Training Loss: 1.6792
Validation Loss: 1.4796, Accuracy: 48.20% , Best Val Loss: 1.54


Epoch [4/10]: 100%|██████████| 782/782 [01:12<00:00, 10.75it/s, loss=1.65]


Epoch [4/10], Training Loss: 1.6456
Validation Loss: 1.4487, Accuracy: 49.62% , Best Val Loss: 1.48


Epoch [5/10]: 100%|██████████| 782/782 [02:32<00:00,  5.12it/s, loss=1.62]  


Epoch [5/10], Training Loss: 1.6229
Validation Loss: 1.4257, Accuracy: 49.69% , Best Val Loss: 1.45


Epoch [6/10]: 100%|██████████| 782/782 [01:12<00:00, 10.78it/s, loss=1.6] 


Epoch [6/10], Training Loss: 1.5977
Validation Loss: 1.3965, Accuracy: 50.90% , Best Val Loss: 1.43


Epoch [7/10]: 100%|██████████| 782/782 [01:12<00:00, 10.76it/s, loss=1.59]


Epoch [7/10], Training Loss: 1.5922
Validation Loss: 1.3954, Accuracy: 51.29% , Best Val Loss: 1.40


Epoch [8/10]: 100%|██████████| 782/782 [01:12<00:00, 10.77it/s, loss=1.58]


Epoch [8/10], Training Loss: 1.5794
Validation Loss: 1.3774, Accuracy: 51.47% , Best Val Loss: 1.40


Epoch [9/10]: 100%|██████████| 782/782 [01:12<00:00, 10.80it/s, loss=1.56]


Epoch [9/10], Training Loss: 1.5644
Validation Loss: 1.3775, Accuracy: 51.93% , Best Val Loss: 1.38


Epoch [10/10]: 100%|██████████| 782/782 [01:12<00:00, 10.80it/s, loss=1.56]


Epoch [10/10], Training Loss: 1.5532
Validation Loss: 1.3690, Accuracy: 51.28% , Best Val Loss: 1.38


In [None]:
test(regCNN(),'./model/cifar/best.pth',cifar_test_loader)

Validation Accuracy: 51.72%
