In [1]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torch.utils.data import random_split
import time
import os
import time
import numpy as np
import torch.nn.functional as F
from torch.utils.data import WeightedRandomSampler
from collections import Counter
from sklearn.metrics import precision_recall_fscore_support

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cuda


In [3]:
# ======================
#  DATASET & NORMALIZATION
# ======================
train_dir = './data/Training'

# Temporary loader for mean/std
temp_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])
temp_dataset = datasets.ImageFolder(train_dir, transform=temp_transform)
temp_loader = DataLoader(temp_dataset, batch_size=64, shuffle=False, num_workers=min(2, os.cpu_count() // 2), pin_memory=(device.type == "cuda"))

imgs = torch.cat([img for img, _ in temp_loader])
mean, std = imgs.mean([0, 2, 3]), imgs.std([0, 2, 3])
print("Mean:", mean.item(), "Std:", std.item())

# Final transforms
train_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((224, 224)),
    # These 3 transformations are recommended to not get caught up on orientation or lighting
    # Shouldn't use more as too many can distort or cause slower convergence
    transforms.RandomHorizontalFlip(p=0.2),
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    # -------------------------------------------------------------------------------------
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

val_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

# Reload full dataset with final transforms
full_dataset = datasets.ImageFolder(train_dir, transform=train_transform)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

# Loaders
train_labels = [train_dataset.dataset.samples[i][1] for i in train_dataset.indices]
label_counts = Counter(train_labels)
weights = 1. / np.array([label_counts[i] for i in range(len(label_counts))])
sample_weights = [weights[label] for label in train_labels]
sampler = WeightedRandomSampler(sample_weights, len(sample_weights)) # Good for heavy class imbalance & custom CNN which is common w/ medical datasets
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler, num_workers=min(2, os.cpu_count() // 2), pin_memory=(device.type == "cuda"))
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=min(2, os.cpu_count() // 2), pin_memory=(device.type == "cuda"))

print(f"Train images: {len(train_dataset)}, Validation images: {len(val_dataset)}")
print("Classes:", train_dataset.dataset.classes)

Mean: 0.17943531274795532 Std: 0.18682362139225006
Train images: 2296, Validation images: 574
Classes: ['glioma_tumor', 'meningioma_tumor', 'no_tumor', 'pituitary_tumor']


In [4]:
class BrainTumorCNN(nn.Module):
    def __init__(self, num_classes=4, in_channels=1):
        super(BrainTumorCNN, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2)  # 112x112
        )

        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout2d(0.1),
            nn.Conv2d(64, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2) # 56x56
        )

        self.layer3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.MaxPool2d(2)  # 28x28
        )

        self.layer4 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1)  # 1x1
        )

        self.fc = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

In [None]:
# ======================
# MODEL SETUP
# ======================
model = BrainTumorCNN(num_classes=4, in_channels=1).to(device)
criterion = nn.CrossEntropyLoss() # Can use weight=weights here to have milder imbalance adjustment, don't use with WeightedRandomSampler as it's redundent
# Typical optimizer and scheduler for custom medical CNN, specifies steps and convergence
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10)

# ======================
# VALIDATION FUNCTION
# ======================
def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    avg_val_loss = val_loss / len(val_loader)
    avg_val_acc = correct / total

    print(f'Validation Loss: {avg_val_loss:.4f} | Validation Accuracy: {avg_val_acc*100:.2f}%')
    return avg_val_loss, avg_val_acc

# ======================
#  SAVE UTIL
# ======================
save_dir = "./data"
def next_version(prefix, ext):
    existing = [int(f.split("_")[-1].split(".")[0])
                for f in os.listdir(os.path.join(save_dir, "models"))
                if f.startswith(prefix) and f.endswith(ext)
                and f.split("_")[-1].split(".")[0].isdigit()]
    return max(existing) + 1 if existing else 1

model_number = next_version("Brain_Tumor_Model_custom", ".pth")
model_path = os.path.join(save_dir, "models", f"Brain_Tumor_Model_custom_{model_number}.pth")
print(f"ðŸ§© Saving model to: {model_path}")

# ======================
# TRAINING LOOP
# ======================
total_start = time.time()
best_val_loss = float('inf')
epochs_no_improve = 0
early_stop_patience = 7
num_epochs = 70

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    loop = tqdm(train_loader, leave=True)
    loop.set_description(f"Epoch [{epoch+1}/{num_epochs}]")

    for imgs, labels in loop:
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        loop.set_postfix(loss=loss.item())

    avg_train_loss = running_loss / len(train_loader)
    val_loss, val_acc = validate(model, val_loader, criterion, device)
    scheduler.step(val_loss)

    print(f"Epoch [{epoch+1}/{num_epochs}] | Train Loss: {avg_train_loss:.4f} | "
          f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc*100:.2f}%")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        checkpoint = {
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "epoch": epoch + 1,
            "val_loss": val_loss,
            "training_time": time.time() - total_start
        }
        torch.save(checkpoint, model_path)
        print(f"ðŸ’¾ New best model saved (Val Loss: {val_loss:.4f})")
    else:
        epochs_no_improve += 1
        print(f"ðŸ•’ No improvement for {epochs_no_improve} epoch(s)")
        if epochs_no_improve >= early_stop_patience:
            print(f"\nðŸ›‘ Early stopping at epoch {epoch+1}")
            break

print(f"\nâœ… Total Training Time: {(time.time()-total_start)/60:.2f} min")

ðŸ§© Saving model to: ./data\models\Brain_Tumor_Model_custom_1.pth


Epoch [1/70]:   0%|          | 0/72 [00:00<?, ?it/s]

In [None]:
# ======================
# LOAD SPECIFIC SAVED MODEL
# ======================

import os
import torch
import torch.nn as nn
from torchvision import models
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Re-create the same model architecture used during training
save_dir = "./data"
model_number = 7 # Change to version you want
model_path = os.path.join(save_dir, "models", f"Brain_Tumor_Model_custom_{model_number}.pth")

print(f"ðŸ§© Loading custom CNN model from: {model_path}")

model = BrainTumorCNN(num_classes=4, in_channels=1).to(device)

# âœ… Load full checkpoint (not just weights)
checkpoint = torch.load(model_path, map_location=device)
model.load_state_dict(checkpoint["model_state_dict"])
training_time = checkpoint.get("training_time", 0)
epoch_trained = checkpoint.get("epoch", "N/A")

print(f"âœ… Model loaded from checkpoint trained for {training_time/60:.2f} minutes "
      f"({epoch_trained} epochs)")

# Set to evaluation mode
model.eval()
print("âœ… Model ready for inference.")

In [None]:
# ======================
# TEST + REPORT
# ======================
test_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

test_dir = "./data/Testing"
test_dataset = datasets.ImageFolder(root=test_dir, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=2, pin_memory=True)
class_names = test_dataset.classes

model.eval()
y_true, y_pred = [], []

with torch.no_grad():
    for imgs, labels in test_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        outputs = model(imgs)
        _, preds = torch.max(outputs, 1)
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())

y_true, y_pred = np.array(y_true), np.array(y_pred)
acc = (y_true == y_pred).sum() / len(y_true)
print(f"\nâœ… Test Accuracy: {acc*100:.2f}%")

# Classification report
report = classification_report(y_true, y_pred, target_names=class_names)
print("\nClassification Report:\n", report)

report_path = os.path.join(save_dir, "reports", f"eval_report_{model_number}.txt")

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()

metrics = precision_recall_fscore_support(y_true, y_pred, labels=[0,1,2,3])
print("Per-class Precision:", metrics[0])
print("Per-class Recall:", metrics[1])

# ======================
# SAVE REPORT
# ======================
with open(report_path, "w") as f:
    f.write(f"Model file: {os.path.basename(model_path)}\n")
    f.write(f"Total Training Time: {training_time/60:.2f} minutes\n")
    f.write(f"\nTest Accuracy: {acc*100:.2f}%\n\n")
    f.write(report)
print(f"ðŸ“„ Report saved to: {report_path}")