In [1]:
import os
import time
import copy

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [3]:
from pathlib import Path
import shutil
import random

def split_dataset(source_dir: str, split_ratio: float = 0.8, image_ext: str = "*.jpg"):
    """
    Splits a dataset into training and validation sets.

    Parameters:
        source_dir (str): Path to the original dataset containing 'original' subdirectory.
        split_ratio (float): Percentage of images used for training (default: 0.8).
        image_ext (str): Image file extension to search for (default: "*.jpg").
    """
    source_path = Path(source_dir) / "original"
    train_path = Path(source_dir) / "train"
    val_path = Path(source_dir) / "val"

    categories = [category.name for category in source_path.iterdir() if category.is_dir()]

    # Create train and val directories
    for category in categories:
        (train_path / category).mkdir(parents=True, exist_ok=True)
        (val_path / category).mkdir(parents=True, exist_ok=True)

    # Process each category
    for category in categories:
        image_files = list((source_path / category).glob(image_ext))
        random.shuffle(image_files)

        split_index = int(len(image_files) * split_ratio)
        train_files = image_files[:split_index]
        val_files = image_files[split_index:]

        # Copy images to respective folders
        for file in train_files:
            shutil.copy(file, train_path / category / file.name)

        for file in val_files:
            shutil.copy(file, val_path / category / file.name)

        print(f"Category '{category}': {len(train_files)} train, {len(val_files)} val")

    print("Dataset successfully split into training and validation sets.")

# Split if output folders don't exist already
if not Path("brain_tumor_dataset/train").exists():
    split_dataset("brain_tumor_dataset", split_ratio=0.8, image_ext="*.jpg")

In [4]:
data_dir = 'brain_tumor_dataset'

# Data augmentation and normalization for training
# Just normalization for validation
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=32,
                                             shuffle=True, num_workers=4)
              for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
class_names

['no', 'yes']

In [5]:
model = models.vgg16(pretrained=True)

# Freeze all the parameters
for param in model.parameters():
    param.requires_grad = False

# Modify the classifier
num_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(num_features, 2)  # Assuming binary classification

model = model.to(device)
# model



In [6]:
model.parameters

<bound method Module.parameters of VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size

In [15]:
criterion = nn.CrossEntropyLoss()

# Only parameters of the final layer are being optimized
# optimizer = optim.SGD(model.classifier[6].parameters(), lr=0.001, momentum=0.9)
optimizer = optim.Adam(model.classifier[6].parameters(), lr=0.001)

In [16]:
from sklearn.metrics import f1_score

def train_model(model, criterion, optimizer, dataloaders, dataset_sizes, device, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_f1 = 0.0  # Track best F1-score

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluation mode

            running_loss = 0.0
            running_corrects = 0
            all_labels = []
            all_preds = []

            # Iterate over data
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Store labels and predictions for F1-score
                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(preds.cpu().numpy())

                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            # Compute F1-score
            epoch_f1 = f1_score(all_labels, all_preds, average='macro')

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} F1: {epoch_f1:.4f}')

            # Save model if it has the best F1-score
            if phase == 'val' and epoch_f1 > best_f1:
                best_f1 = epoch_f1
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val F1: {best_f1:.4f}')

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model


In [18]:
model = train_model(model, criterion, optimizer, dataloaders, dataset_sizes, device, num_epochs=5) # num_epochs=25 

Epoch 0/4
----------
train Loss: 0.4867 Acc: 0.7897 F1: 0.7544
val Loss: 0.3261 Acc: 0.9000 F1: 0.8949

Epoch 1/4
----------
train Loss: 0.4358 Acc: 0.7897 F1: 0.7832
val Loss: 0.3280 Acc: 0.8600 F1: 0.8498

Epoch 2/4
----------
train Loss: 0.5031 Acc: 0.7436 F1: 0.6962
val Loss: 0.3377 Acc: 0.7800 F1: 0.7512

Epoch 3/4
----------
train Loss: 0.5048 Acc: 0.7795 F1: 0.7640
val Loss: 0.3310 Acc: 0.9200 F1: 0.9167

Epoch 4/4
----------
train Loss: 0.4800 Acc: 0.8000 F1: 0.7820
val Loss: 0.3093 Acc: 0.9000 F1: 0.8949

Training complete in 3m 30s
Best val F1: 0.9167


In [29]:
for inputs, labels in dataloaders['val']:
    y_pred = model(inputs)
y_pred 

tensor([[ 1.7426, -2.2357],
        [ 0.0782, -1.0827],
        [ 0.9488, -1.2596],
        [-1.4362,  1.4291],
        [-0.2174,  0.3437],
        [ 0.5777, -1.1007],
        [ 1.7262, -2.9884],
        [-2.3679,  2.0415],
        [ 0.0793, -0.7718],
        [-2.4263,  1.6497],
        [-0.7366,  0.3211],
        [-0.8009,  0.5786],
        [-1.3129,  1.9209],
        [-0.2972,  0.5597],
        [ 1.1532, -1.2753],
        [-0.5341, -0.3901],
        [ 1.4990, -1.6322],
        [ 2.0352, -2.6346]], grad_fn=<AddmmBackward0>)

In [27]:
from PIL import Image

model.eval()
# Define image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def preprocess_image(image_path):
    image = Image.open(image_path).convert("RGB")  # Ensure it's RGB
    image = transform(image)  # Apply transformations
    image = image.unsqueeze(0)  # Add batch dimension (1, C, H, W)
    return image


def predict(image_path, model, class_names, device):
    image = preprocess_image(image_path).to(device)  # Send image to device (GPU/CPU)
    
    # Perform inference
    model.eval()  # Ensure model is in eval mode
    with torch.no_grad():
        output = model(image)
        _, predicted_class = torch.max(output, 1)  # Get the class index

    return class_names[predicted_class.item()]  # Return class label

# Define class names based on training data
class_names = ["no", "yes"]  # Modify based on your dataset structure

# Path to a sample MRI image
sample_image = "brain_tumor_dataset/val/yes/Y6.jpg"  # Replace with an actual image path

# Run inference
predicted_label = predict(sample_image, model, class_names, device)
print(f"Predicted Label: {predicted_label}")


Predicted Label: yes


In [38]:
import torch
import torch.nn.functional as F

def predict_proba(model, dataloader, device):
    all_probs = []
    all_labels = []

    model.eval()  # Set model to evaluation mode
    with torch.no_grad():  # Disable gradient calculations
        for inputs, labels in dataloader:
            inputs = inputs.to(device)  # Move input to GPU/CPU
            outputs = model(inputs)  # Get raw model outputs (logits)

            # Apply Softmax if using CrossEntropyLoss (multi-class)
            probs = F.softmax(outputs, dim=1)[:, 1]  # Extract probability of positive class (index 1)

            all_probs.append(probs.cpu())  # Move to CPU and store
            all_labels.append(labels.cpu())


    return torch.cat(all_labels), torch.cat(all_probs)  # Concatenate all probabilities into a single tensor

# Run inference on validation dataset
val_labels, val_probs = predict_proba(model, dataloaders['val'], device)

# Print shape of output tensor
print(val_probs.shape)  # Should match number of validation samples
print("validation labels:")
print(val_labels)
print("validation probs:")
print(val_probs)  # Display tensor of probabilities


torch.Size([50])
validation labels:
tensor([1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 1,
        1, 0])
validation probs:
tensor([0.9880, 0.8528, 0.5359, 0.9536, 0.2383, 0.0444, 0.3597, 0.9580, 0.9609,
        0.2840, 0.9420, 0.9739, 0.2992, 0.1738, 0.6367, 0.0181, 0.2385, 0.8792,
        0.8150, 0.0089, 0.5652, 0.8885, 0.7020, 0.0587, 0.7989, 0.9463, 0.8787,
        0.1573, 0.9461, 0.5362, 0.8940, 0.9783, 0.0418, 0.4019, 0.0736, 0.9797,
        0.0990, 0.1118, 0.0093, 0.6828, 0.3422, 0.9833, 0.0184, 0.7118, 0.9621,
        0.0810, 0.6749, 0.7422, 0.9514, 0.1080])
