In [1]:
import torch
from google.colab import drive
drive.mount('/content/drive')
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import ToTensor
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pickle
import numpy as np



Mounted at /content/drive


In [2]:

gnDataset = 3
gBase_path = '/content/drive/My Drive/Colab Notebooks/Task2_data/Task2_data'
gnClasses = 5
gbData_augmentation = True
gData_batch_size = 64

In [3]:
class CAS771Dataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]
        label = self.labels[idx]
        if self.transform and not isinstance(img, torch.Tensor):
            img = self.transform(img)
        return img, label

def _load_data(train_data_path):
    raw_data = torch.load(train_data_path)
    data = raw_data['data']
    labels = raw_data['labels']

    # Moves channels to the second dimension
    data = data.permute(0, 3, 1, 2)
    #print(data.shape)
    # Convert labels to a Python list
    if isinstance(labels, torch.Tensor):
        labels = labels.tolist()

    return data, labels

def remap_labels(labels, class_mapping):
    return [class_mapping[label] for label in labels]

def load_class_names(filepath):
    with open(filepath, 'r') as file:
        classes = [line.strip() for line in file]
    return classes
def get_data_augmentation(mode):
    if gbData_augmentation == False:
        return None

    if mode == "train":
        return transforms.Compose([
            transforms.ToTensor(),
            transforms.RandomHorizontalFlip(), # inverse left-right
            transforms.RandomRotation(degrees=15), # random rotate
            transforms.RandomResizedCrop(32, scale=(0.8, 1.0)), # random crop
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) # normalization
        ])
    else:
        return transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])



train_data_paths = [f'{gBase_path}/train_dataB_model_{i}.pth' for i in range(1, gnDataset+1)]
test_data_paths = [f'{gBase_path}/val_dataB_model_{i}.pth' for i in range(1, gnDataset+1)]
print(train_data_paths)
print(test_data_paths)

classes_path = gBase_path + '/cifar100_classes.txt'
classes = load_class_names(classes_path)
print(classes)

In [4]:
train_data_paths = [f'{gBase_path}/train_dataB_model_{i}.pth' for i in range(1, gnDataset+1)]
test_data_paths = [f'{gBase_path}/val_dataB_model_{i}.pth' for i in range(1, gnDataset+1)]
#print(train_data_paths)
#print(test_data_paths)

for m in range(gnDataset):
  tr_data, tr_label= _load_data(train_data_paths[m])
  va_data, va_label= _load_data(test_data_paths[m])
  print(tr_data.shape, va_data.shape)

#classes_path = gBase_path + '/cifar100_classes.txt'
#classes = load_class_names(classes_path)
#print(classes)

  raw_data = torch.load(train_data_path)


torch.Size([3197, 3, 64, 64]) torch.Size([250, 3, 64, 64])
torch.Size([3222, 3, 64, 64]) torch.Size([250, 3, 64, 64])
torch.Size([3232, 3, 64, 64]) torch.Size([250, 3, 64, 64])


In [5]:
def load_data(train_data_path, test_data_path, m=0, save_class_mapping=True):
    train_data, train_labels = _load_data(train_data_path)
    #print(train_data.shape)
    #print(type(train_labels), type(train_labels))
    unique_labels = sorted(set(train_labels))
    #print(unique_labels)
    class_mapping = {label: i for i, label in enumerate(unique_labels)}
    #print(f"Class mapping: {class_mapping}")


    if train_data_path == None:
        train_dataloader = None
    else:
        train_remapped_labels = remap_labels(train_labels, class_mapping)
        #print(train_remapped_labels)
        transform = get_data_augmentation("train")
        train_dataset = CAS771Dataset(train_data, train_remapped_labels, transform=transform)
        #print(train_dataset.data.shape,type(train_dataset.labels))

        train_dataloader = DataLoader(train_dataset, batch_size=gData_batch_size, shuffle=True, drop_last=False)
        #print(train_dataloader)
        if save_class_mapping:
            # Save the mapping to a file
            class_mapping_path = f'{gBase_path}/class_mapping_model_{m+1}.pkl'
            with open(class_mapping_path, "wb") as f:
                pickle.dump(class_mapping, f)

    if test_data_path == None:
        test_dataloader = None
    else:
        test_data, test_labels = _load_data(test_data_path)
        remapped_test_labels = remap_labels(test_labels, class_mapping)
        transform = get_data_augmentation("test")
        test_dataset = CAS771Dataset(test_data, remapped_test_labels, transform=transform)
        test_dataloader = DataLoader(test_dataset, batch_size=gData_batch_size, shuffle=False, drop_last=False)

    return train_dataloader, test_dataloader
#     return 1,2

In [6]:
# for m in range(gnDataset):
#   #print(m)
#   train_dataloader, test_dataloader = load_data(train_data_paths[0], test_data_paths[0], 0)
#   print(train_dataloader)

In [7]:
# batch_size = 64
# print(train_dataloader, test_dataloader)

In [8]:
class CNNClassifier_1(nn.Module):
    def __init__(self, num_classes):
        super(CNNClassifier, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            # nn.MaxPool2d(2, 2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(256 * 16 * 16, 512),  # FIXED: Corrected Flatten Size
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)  # Output for 5 classes
        )

    def forward(self, x):
        x = self.conv_layers(x)  # Apply Conv Layers
        x = x.view(x.size(0), -1)  # Flatten
        return self.fc_layers(x)  # Apply Fully Connected Layers



In [9]:
class CNN2(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.5):
        super(CNN2, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(256)

        # Pooling layers
        self.pool1 = nn.MaxPool2d(2)
        self.pool2 = nn.MaxPool2d(2)

        # Calculate the flattened size by passing dummy input
        dummy_input = torch.randn(1, 3, 64, 64)
        dummy_output = self.pool2(F.relu(self.bn3(self.conv3(self.pool1(F.relu(self.bn2(self.conv2(F.relu(self.bn1(self.conv1(dummy_input)))))))))))
        flattened_size = dummy_output.view(1, -1).size(1)
        print(f"falttened size is {flattened_size}")

        # Fully connected layers
        self.fc1 = nn.Linear(flattened_size, 512)
        self.fc2 = nn.Linear(512, num_classes)

        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, return_embedding=False):
        # Convolutional layer + ReLU + Batch Normalization
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool1(x) # first pooling
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool2(x) # second pooling

        # Flatten
        x = x.contiguous().view(x.size(0), -1)

        # Fully connected layers + Dropout
        x = self.dropout(F.relu(self.fc1(x)))

        if return_embedding:
            return x  # Feature vector 반환

        x = self.fc2(x)
        return x

In [10]:
class CNNClassifier(nn.Module):
    def __init__(self, num_classes=5):
        super(CNNClassifier, self).__init__()

        self.conv_layers = nn.Sequential(
            # Conv Block 1
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),  # Added BatchNorm
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # Added Pooling here for better downsampling

            # Conv Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),  # Added BatchNorm
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            # Conv Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),  # Added BatchNorm
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        # Fully Connected Layers
        self.fc_layers = nn.Sequential(
            nn.Linear(256 * 8 * 8, 256),  # Reduced size from 512 to 256
            nn.ReLU(),
            nn.Dropout(0.3),  # Reduced dropout from 0.5 to 0.3
            nn.Linear(256, num_classes)  # Output layer
        )

    def forward(self, x):
        x = self.conv_layers(x)  # Apply Conv Layers
        x = x.view(x.size(0), -1)  # Flatten before FC layers
        return self.fc_layers(x)


In [11]:


class ImprovedCNN(nn.Module):
    def __init__(self, num_classes=5):
        super(ImprovedCNN, self).__init__()

        self.conv_layers = nn.Sequential(
            # Conv Block 1
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),  # Extra Conv Layer
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            # Conv Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),  # Extra Conv Layer
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            # Conv Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),  # Extra Conv Layer
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        # **Global Average Pooling**
        self.gap = nn.AdaptiveAvgPool2d(1)  # Output: (batch_size, 256, 1, 1)

        # Fully Connected Layers
        self.fc_layers = nn.Sequential(
            nn.Linear(256, 128),  # Reduced size
            nn.ReLU(),
            nn.Dropout(0.3),  # Reduced dropout
            nn.Linear(128, num_classes)  # Output layer
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.gap(x)  # Apply Global Average Pooling
        x = x.view(x.size(0), -1)  # Flatten before FC layers
        return self.fc_layers(x)


In [12]:
def validate_model(model, val_loader,criterion):
    #Validate the model on the validation dataset.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()  # Set to evaluation mode

    running_loss = 0.0
    correct = 0
    total = 0

    # Track class-wise accuracy
    class_correct = np.zeros(5)  # Assuming 5 classes
    class_total = np.zeros(5)

    with torch.no_grad():  # Disable gradient calculation
        for imputs, labels in val_loader:
            imputs, labels = imputs.to(device), labels.to(device)

            # Enable mixed precision (for speedup)
            with torch.cuda.amp.autocast():
              outputs = model(imputs)
              loss = criterion(outputs, labels)  # Compute validation loss

            running_loss += loss.item()

             # Get predicted class
            _, predicted = torch.max(outputs, 1)
            # Update correct predictions
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

             # Track class-wise performance
            for i in range(len(labels)):
                class_total[labels[i].item()] += 1
                if predicted[i] == labels[i]:
                    class_correct[labels[i].item()] += 1


    val_loss = running_loss / len(val_loader)
    val_acc = 100 * correct / total

    # Compute per-class accuracy
    class_acc = 100 * class_correct / (class_total + 1e-6)  # Avoid division by zero

    print(f"\n✅ Validation Loss: {val_loss:.4f} | Accuracy: {val_acc:.2f}%")
    print("📊 Per-Class Accuracy:")
    for i in range(len(class_acc)):
        print(f"  Class {i}: {class_acc[i]:.2f}%")

    return val_loss, val_acc  # Return validation accuracy

In [13]:
def train_model(model, train_loader, val_loader, num_epochs=50, lr=0.001, patience=5):
    #Train a CNN model with validation, learning rate scheduling, and early stopping.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)  # Add weight decay for regularization
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)  # Reduce LR every 10 epochs

    # Track loss and accuracy
    train_loss_list = []
    train_acc_list = []
    val_loss_history = []
    val_acc_history = []

    best_val_acc = 0.0  # Track best validation accuracy
    patience_counter = 0  # Track early stopping condition

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct, total = 0, 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()

            # Gradient Clipping (Prevents exploding gradients)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()

            running_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        avg_loss = running_loss / len(train_loader)
        accuracy = 100 * correct / total

        # Store loss and accuracy for plotting
        train_loss_list.append(avg_loss)
        train_acc_list.append(accuracy)

        # Validate after each epoch
        val_loss, val_acc = validate_model(model, val_loader, criterion)

        # Store loss and accuracy for plotting
        val_loss_history.append(val_loss)
        val_acc_history.append(val_acc)

        # Print epoch results
        print(f"Epoch {epoch+1}: Train_Loss={avg_loss:.4f}, Train_Accuracy={accuracy:.2f}%," f"Val_Loss: {val_loss:.4f}, Val_Acc: {val_acc:.2f}%")

        # Adjust learning rate
        scheduler.step()

        # Early Stopping
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0  # Reset patience
            torch.save(model.state_dict(), "best_model.pth")  # Save best model
        else:
            patience_counter += 1  # Increase patience counter

        if patience_counter >= patience:
            print(f"Early stopping triggered after {epoch+1} epochs. Best Val Acc = {best_val_acc:.2f}%")
            break
    # Load best model before returning
    model.load_state_dict(torch.load("best_model.pth"))
    print(f"Training complete. Best validation accuracy: {best_val_acc:.2f}%")

    return model, train_loss_list, train_acc_list, val_loss_history, val_acc_history



In [14]:
# def get_plot_1(train_history):
#   plt.figure(figsize=(12, 6))

#   for name, history in train_history.items():
#       plt.plot(history["train_loss"], label=f"Train Loss - Model {name+1}")
#       plt.plot(history["val_loss"], linestyle="dashed", label=f"Val Loss - Model {name+1}")

#   plt.xlabel("Epochs")
#   plt.ylabel("Loss")
#   plt.title("Training & Validation Loss of Overlapping Models")
#   plt.legend()
#   plt.grid()
#   plt.show()

In [15]:
def get_plot(train_history):
    plt.figure(figsize=(12, 6))

    #epochs = range(1, len(list(train_history.values())[0]["train_loss"]) + 1)

    for m, history in train_history.items():
        # **Plot Loss**
        plt.subplot(1, 2, 1)
        plt.plot( history["train_loss"], label=f"Train Loss - Model {m+1}")
        plt.plot( history["val_loss"], linestyle="dashed", label=f"Validation Loss - Model {m+1}")
        plt.xlabel("Epochs")
        plt.ylabel("Loss")
        plt.title("Training & Validation Loss")
        plt.legend()
        plt.grid()

        # **Plot Accuracy**
        plt.subplot(1, 2, 2)
        plt.plot( history["train_accuracy"], label=f"Train Accuracy - Model {m+1}")
        plt.plot( history["val_accuracy"], linestyle="dashed", label=f"Validation Accuracy - Model {m+1}")
        plt.xlabel("Epochs")
        plt.ylabel("Accuracy (%)")
        plt.title("Training & Validation Accuracy")
        plt.legend()
        plt.grid()

    plt.show()


In [16]:
models={}
train_history = {}

for m in range(gnDataset):
    train_dataloader, test_dataloader = load_data(train_data_paths[m], test_data_paths[m], m)
    #print(train_dataloader,test_dataloader)
    print(f"\nTraining Model {m+1}...")


    #model = CNNClassifier(num_classes=5)
    model = ImprovedCNN(num_classes=5)

    trained_model, loss_list, acc_list, val_loss_list,  val_acc_list = train_model(model, train_dataloader, test_dataloader)

    models[m] = trained_model
    train_history[m] = {"train_loss": loss_list, "train_accuracy": acc_list, "val_loss": val_loss_list, "val_accuracy": val_acc_list}

get_plot(train_history)


  raw_data = torch.load(train_data_path)



Training Model 1...


  with torch.cuda.amp.autocast():



✅ Validation Loss: 1.7481 | Accuracy: 31.60%
📊 Per-Class Accuracy:
  Class 0: 2.00%
  Class 1: 0.00%
  Class 2: 12.00%
  Class 3: 86.00%
  Class 4: 58.00%
Epoch 1: Train_Loss=1.4233, Train_Accuracy=40.19%,Val_Loss: 1.7481, Val_Acc: 31.60%

✅ Validation Loss: 1.5995 | Accuracy: 33.20%
📊 Per-Class Accuracy:
  Class 0: 20.00%
  Class 1: 4.00%
  Class 2: 98.00%
  Class 3: 38.00%
  Class 4: 6.00%
Epoch 2: Train_Loss=1.3064, Train_Accuracy=46.39%,Val_Loss: 1.5995, Val_Acc: 33.20%

✅ Validation Loss: 1.1841 | Accuracy: 52.40%
📊 Per-Class Accuracy:
  Class 0: 52.00%
  Class 1: 20.00%
  Class 2: 64.00%
  Class 3: 74.00%
  Class 4: 52.00%
Epoch 3: Train_Loss=1.2206, Train_Accuracy=50.92%,Val_Loss: 1.1841, Val_Acc: 52.40%

✅ Validation Loss: 1.3303 | Accuracy: 43.20%
📊 Per-Class Accuracy:
  Class 0: 62.00%
  Class 1: 58.00%
  Class 2: 28.00%
  Class 3: 62.00%
  Class 4: 6.00%
Epoch 4: Train_Loss=1.1586, Train_Accuracy=55.02%,Val_Loss: 1.3303, Val_Acc: 43.20%

✅ Validation Loss: 1.0932 | Accuracy

  model.load_state_dict(torch.load("best_model.pth"))



Training Model 2...

✅ Validation Loss: 1.1882 | Accuracy: 49.20%
📊 Per-Class Accuracy:
  Class 0: 20.00%
  Class 1: 46.00%
  Class 2: 74.00%
  Class 3: 92.00%
  Class 4: 14.00%
Epoch 1: Train_Loss=1.3603, Train_Accuracy=42.86%,Val_Loss: 1.1882, Val_Acc: 49.20%

✅ Validation Loss: 1.2320 | Accuracy: 48.40%
📊 Per-Class Accuracy:
  Class 0: 82.00%
  Class 1: 46.00%
  Class 2: 90.00%
  Class 3: 20.00%
  Class 4: 4.00%
Epoch 2: Train_Loss=1.1980, Train_Accuracy=51.68%,Val_Loss: 1.2320, Val_Acc: 48.40%

✅ Validation Loss: 1.0872 | Accuracy: 56.00%
📊 Per-Class Accuracy:
  Class 0: 18.00%
  Class 1: 76.00%
  Class 2: 76.00%
  Class 3: 32.00%
  Class 4: 78.00%
Epoch 3: Train_Loss=1.0951, Train_Accuracy=57.05%,Val_Loss: 1.0872, Val_Acc: 56.00%

✅ Validation Loss: 0.8440 | Accuracy: 70.80%
📊 Per-Class Accuracy:
  Class 0: 88.00%
  Class 1: 62.00%
  Class 2: 84.00%
  Class 3: 44.00%
  Class 4: 76.00%
Epoch 4: Train_Loss=0.9741, Train_Accuracy=63.69%,Val_Loss: 0.8440, Val_Acc: 70.80%

✅ Validatio

KeyboardInterrupt: 