# Imports & Drive

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')

%cd drive/MyDrive/ProjectML/data

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchvision import models, transforms
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from sklearn.utils.class_weight import compute_class_weight

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Data Loading & Preprocessing

In [None]:
TRAIN_PATH = "training.npz"
TEST_PATH  = "testing.npz"

In [None]:
train_npz = np.load(TRAIN_PATH, allow_pickle=True)
test_npz  = np.load(TEST_PATH,  allow_pickle=True)

X_full      = train_npz['images']
y_full      = train_npz['labels']
class_names = train_npz['class_names']

X_test = test_npz['images']
y_test = test_npz['labels']

print("Full (to split):", X_full.shape, y_full.shape)
print("Test:", X_test.shape, y_test.shape)
print("Class names:", class_names)

X_train, X_val, y_train, y_val = train_test_split(
    X_full, y_full,
    test_size=0.2,
    random_state=42,
    stratify=y_full
)

print("Train:", X_train.shape, y_train.shape)
print("Val:",   X_val.shape,   y_val.shape)
print("Test:",  X_test.shape,  y_test.shape)

In [None]:
from PIL import Image

#first experiment
class NumpyDataset(Dataset):
    def __init__(self, X, y, transform=None):
        self.X = X
        self.y = y
        self.transform = transform

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        img = self.X[idx]
        label = self.y[idx]

        if img.ndim == 3 and img.shape[-1] in [1, 3]:
            pass
        else:
            raise ValueError(f"Image with shape not compatible: {img.shape}")

        if img.dtype != np.uint8:
            img = (img * 255).astype(np.uint8)
        img = Image.fromarray(img)

        if self.transform:
            img = self.transform(img)

        label = torch.tensor(label).long()
        return img, label


"""
#experiment
class TargetedAugmentDataset(Dataset):
    def __init__(self, X, y, strong_classes=[0,1], transform_basic=None, transform_strong=None):
        self.X = X
        self.y = y
        self.strong_classes = strong_classes  # es: [0,1] = glioma, meningioma
        self.transform_basic = transform_basic
        self.transform_strong = transform_strong

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        img = self.X[idx]
        label = self.y[idx]

        if img.dtype != np.uint8:
            img = (img * 255).astype(np.uint8)

        img = Image.fromarray(img)

        if label in self.strong_classes:
            if self.transform_strong:
                img = self.transform_strong(img)
        else:
            if self.transform_basic:
                img = self.transform_basic(img)

        label = torch.tensor(label).long()
        return img, label

#end experiment
"""


In [None]:

#first experiment
data_transforms = {
    #more augmentation becauase no pretrained data
    'train': transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.RandomResizedCrop(size=(X_train.shape[1], X_train.shape[2]), scale=(0.9,1.0)),
        transforms.ColorJitter(brightness=0.15, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225])
    ]),
    'val': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225])
    ]),
    'test': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225])
    ]),
}
"""
#experiment
# Augmentation STANDARD (easy classes)
basic_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225])
])

# Augmentation STRONG (glioma + meningioma)
strong_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.RandomResizedCrop(size=(X_train.shape[1], X_train.shape[2]), scale=(0.85,1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.25),
    transforms.RandomAffine(degrees=0, translate=(0.05,0.05)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225])
])
#end experiment
"""
batch_size = 32
num_workers = 2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


#first experiment
train_ds = NumpyDataset(X_train, y_train, transform=data_transforms['train'])
val_ds   = NumpyDataset(X_val,   y_val,   transform=data_transforms['val'])
test_ds  = NumpyDataset(X_test,  y_test,  transform=data_transforms['test'])

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  num_workers=num_workers, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)

"""
experiment
train_ds = TargetedAugmentDataset(
    X_train, y_train,
    strong_classes=[0,1],
    transform_basic=basic_transform,
    transform_strong=strong_transform
)

val_ds = NumpyDataset(X_val, y_val, transform=basic_transform)
test_ds = NumpyDataset(X_test, y_test, transform=basic_transform)
"""
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_ds,   batch_size=32, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_ds,  batch_size=32, shuffle=False, num_workers=2)

dataset_sizes = {'train': len(train_ds), 'val': len(val_ds), 'test': len(test_ds)}
print(dataset_sizes)

#experiment
class_weights = torch.tensor([1.0, 1.4, 1.0, 1.0], dtype=torch.float).to(device)
#end experiment

# Model

In [None]:
num_classes = len(np.unique(y_train))
"""
model = models.mobilenet_v2(pretrained=True)
num_ftrs = model.classifier[1].in_features
model.classifier[1] = nn.Linear(num_ftrs, num_classes)
model = model.to(device)

# Freeze
for param in model.features.parameters():
    param.requires_grad = False
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.classifier.parameters(), lr=1e-3)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
"""


In [None]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        #Kernel 3x3 standard
        #Padding = 1 -> preserve spatial dimension (needed for fully connected layer)
        #channels: 32->64->128->256
        #maxPool2d = 2 -> halves H and W at each layer, making the model more robust and reducing parameters
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.pool = nn.AdaptiveAvgPool2d((1,1))

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256*1*1, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.pool(x) #reduce HxW to 1x1
        x = self.classifier(x)
        return x
model = SimpleCNN(num_classes=num_classes).to(device)
"""
#experiment
class FocalLoss(nn.Module):
    def __init__(self, gamma=2, weight=None):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.weight = weight
        self.ce = nn.CrossEntropyLoss(weight=weight)

    def forward(self, inputs, targets):
        logp = self.ce(inputs, targets)
        p = torch.exp(-logp)
        loss = ((1 - p) ** self.gamma * logp).mean()
        return loss
"""

criterion = nn.CrossEntropyLoss(weight=class_weights)
#criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.5)

#end experiment

"""first experiment
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.5)
"""

# Training

**Early stopping logic**

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0.0, verbose=False):
        """
        Args:
            patience (int): how many epochs to wait after last improvement
            min_delta (float): minimal change to qualify as an improvement
            verbose (bool): whether to print when early‚Äêstop is triggered
        """
        self.patience    = patience
        self.min_delta   = min_delta
        self.verbose     = verbose
        self.best_loss   = float('inf')
        self.counter     = 0
        self.early_stop  = False

    def __call__(self, val_loss, model, best_model_wts):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss  = val_loss
            self.counter    = 0
            best_model_wts  = model.state_dict().copy()
        else:
            self.counter += 1
            if self.verbose:
                print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
        return best_model_wts


In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=5, early_stopping=None):
    best_model_wts = model.state_dict().copy()
    best_acc = 0.0

    history = {
        'train_loss': [],
        'val_loss':   [],
        'train_acc':  [],
        'val_acc':    []
    }

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        print('-'*20)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
                dataloader = train_loader
            else:
                model.eval()
                dataloader = val_loader

            running_loss = 0.0
            running_corrects = 0

            with tqdm(dataloader, unit="batch", desc=f"{phase} Epoch {epoch+1}") as tepoch:
                for inputs, labels in tepoch:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    optimizer.zero_grad()
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                    current_loss = running_loss / ((tepoch.n * tepoch.last_print_n) if tepoch.last_print_n else 1)
                    tepoch.set_postfix(loss=f"{loss.item():.4f}")

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc  = running_corrects.double() / dataset_sizes[phase]

            print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

            if phase == 'train':
                history['train_loss'].append(epoch_loss)
                history['train_acc'].append(epoch_acc.item())
            else:
                history['val_loss'].append(epoch_loss)
                history['val_acc'].append(epoch_acc.item())

                if early_stopping is not None:
                    best_model_wts = early_stopping(epoch_loss, model, best_model_wts)
                    if early_stopping.early_stop:
                        print("Early stopping triggered.")
                        model.load_state_dict(best_model_wts)
                        return model, history

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict().copy()



    print(f"Best val Acc: {best_acc:.4f}")
    model.load_state_dict(best_model_wts)
    return model, history


**Hyperparamms**

In [None]:

es = EarlyStopping(patience=5, min_delta=0.01, verbose=True)
model, history = train_model(model, criterion, optimizer, scheduler, num_epochs=40, early_stopping=es)

"""LR = 1e-5
STEP_SIZE = 7
GAMMA = 0.1
EPOCHS = 30
BATCH_SIZE = 32
PATIENCE = 3
"""

In [None]:
"""
for param in model.features.parameters():
    param.requires_grad = True
"""
"""optimizer_ft = optim.Adam(model.parameters(), lr=LR)
exp_lr_scheduler_ft = lr_scheduler.StepLR(optimizer_ft, step_size=STEP_SIZE, gamma=GAMMA)

es = EarlyStopping(patience=PATIENCE, min_delta=0.01, verbose=True)

model, history = train_model(
    model,
    criterion,
    optimizer_ft,
    exp_lr_scheduler_ft,
    num_epochs=EPOCHS,
    early_stopping=es
)"""

In [None]:
epochs = range(1, len(history['train_loss']) + 1)

plt.figure(figsize=(12,5))

# Loss subplot
plt.subplot(1,2,1)
plt.plot(epochs, history['train_loss'], label='Train Loss')
plt.plot(epochs, history['val_loss'],   label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training & Validation Loss')
plt.legend()

# Accuracy subplot
plt.subplot(1,2,2)
plt.plot(epochs, history['train_acc'], label='Train Acc')
plt.plot(epochs, history['val_acc'],   label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training & Validation Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

# Testing

In [None]:
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)

print("Classification Report:")
print(classification_report(all_labels, all_preds, target_names=class_names))

cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d",
            xticklabels=class_names,
            yticklabels=class_names,
            cmap=plt.cm.Blues)
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.title('Confusion Matrix')
plt.show()

# Save Checkpoint

In [None]:
os.makedirs("models", exist_ok=True)
torch.save(model.state_dict(), 'models/simple_cnn.pth')