In [83]:
import os
import shutil
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision import models
import torch.nn as nn
import torch.optim as optim
import torch
import time
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt


# Load datasets
data_dir = "/Users/quentin/Desktop/ML_MRIqc_DATASET/ML_Data/pics"
accepted_dir = os.path.join(data_dir, "Accepted")
rejected_dir = os.path.join(data_dir, "Rejected")
new_dir = "/Users/quentin/Desktop/ML_MRIqc_DATASET/ML_Data/Cleaned"

accepted = [
    os.path.join(root, file)
    for root, _, files in os.walk(accepted_dir)
    for file in files if file.endswith(".png")
]
rejected = [
    os.path.join(root, file)
    for root, _, files in os.walk(rejected_dir)
    for file in files if file.endswith(".png")
]

# Rename the data because u were a lazy idiot and didn't want to originally rename the raw data
def rename_files_in_folder(src_folder, dest_folder):
    os.makedirs(dest_folder, exist_ok=True)  
    for root, _, files in os.walk(src_folder):
        folder_name = os.path.basename(root)  
        for file in files:
            if file.endswith(".png"):
                old_path = os.path.join(root, file)
                new_name = f"{folder_name}_{file}"
                new_path = os.path.join(dest_folder, new_name)
                shutil.copy(old_path, new_path)

rename_files_in_folder(accepted_dir, os.path.join(new_dir, "Accepted"))
rename_files_in_folder(rejected_dir, os.path.join(new_dir, "Rejected"))


### train/test/validation data
accepted_clean = [
    os.path.join(root, file)
    for root, _, files in os.walk(os.path.join(new_dir, "Accepted"))
    for file in files if file.endswith(".png")
]
rejected_clean = [
    os.path.join(root, file)
    for root, _, files in os.walk(os.path.join(new_dir, "Rejected"))
    for file in files if file.endswith(".png")
]

# 80% train, 10% test, 10% validation
train_acc, temp_acc = train_test_split(accepted_clean, test_size=0.2, random_state=420)
val_acc, test_acc = train_test_split(temp_acc, test_size=0.5, random_state=420)

train_rej, temp_rej = train_test_split(rejected_clean, test_size=0.2, random_state=420)
val_rej, test_rej = train_test_split(temp_rej, test_size=0.5, random_state=420)

# helper function
def create_split_folder(output_dir, data, class_name):
    class_dir = os.path.join(output_dir, class_name)
    os.makedirs(class_dir, exist_ok=True)
    for path in data:
        shutil.copy(path, class_dir)

output_dir = "/Users/quentin/Desktop/ML_MRIqc_DATASET/Ready_Data"
splits = {
    "train": (train_acc, train_rej),
    "val": (val_acc, val_rej),
    "test": (test_acc, test_rej),
}

for split, (acc_data, rej_data) in splits.items():
    create_split_folder(os.path.join(output_dir, split), acc_data, "Accepted")
    create_split_folder(os.path.join(output_dir, split), rej_data, "Rejected")



In [69]:
#### My simple 2 layer CNN

# Define transformations for preprocessing
data_transforms = {
    "train": transforms.Compose([
        transforms.Resize((224, 224)),  # Resize to VGG-16 input size
        transforms.RandomHorizontalFlip(),  # Augmentation
        transforms.ToTensor(),  # Convert to tensor
        transforms.Normalize([0.5], [0.5])  # Normalize grayscale images
    ]),
    "val": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ]),
    "test": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ])
}

NameError: name 'class_name' is not defined

In [None]:
# Load VGG-16 model and modify the classifier
vgg16 = models.vgg16(pretrained=True)
num_features = vgg16.classifier[6].in_features
vgg16.classifier[6] = nn.Sequential(
    nn.Linear(num_features, 256),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(256, 2),
    nn.Softmax(dim=1)
)
vgg16 = vgg16.to(DEVICE)

# Define Adaptive gradient & X-entropy loss
optimizer = optim.Adagrad(vgg16.parameters(), lr=0.001, weight_decay=1e-5)
criterion = nn.CrossEntropyLoss()


# Training function
def train_model(model, criterion, optimizer, train_loader, val_loader, epochs=10):
    train_loss = []
    val_loss = []
    train_acc = []
    val_acc = []
    
    for epoch in range(epochs):
        # re-train model
        model.train()
        running_loss, correct, total = 0.0, 0, 0
        for images, labels in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{epochs}"):
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
        train_loss.append(running_loss / len(train_loader))
        train_acc.append(100.0 * correct / total)

        # Validation phase
        model.eval()
        running_loss, correct, total = 0.0, 0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(DEVICE), labels.to(DEVICE)
                outputs = model(images)
                loss = criterion(outputs, labels)
                running_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
        val_loss.append(running_loss / len(val_loader))
        val_acc.append(100.0 * correct / total)

        print(f"Epoch [{epoch+1}/{epochs}] -> Train Loss: {train_loss[-1]:.4f}, "
              f"Train Acc: {train_acc[-1]:.2f}%, Val Loss: {val_loss[-1]:.4f}, Val Acc: {val_acc[-1]:.2f}%")

    return train_loss, val_loss, train_acc, val_acc

# Train the model
train_loss, val_loss, train_acc, val_acc = train_model(vgg16, criterion, optimizer, dataloaders["train"], dataloaders["val"], epochs=10)

# Test function
def test_model(model, test_loader):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    test_loss = running_loss / len(test_loader)
    test_acc = 100.0 * correct / total
    print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")
    return test_loss, test_acc

# Evaluate on test data
test_loss, test_acc = test_model(vgg16, dataloaders["test"])


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vgg16 = vgg16.to(device)

num_epochs = 10
best_model_wts = vgg16.state_dict()
best_acc = 0.0

for epoch in range(num_epochs):
    print(f"Epoch {epoch}/{num_epochs - 1}")
    print("-" * 10)
    
    for phase in ["train", "val"]:
        if phase == "train":
            vgg16.train()
        else:
            vgg16.eval()
        
        running_loss = 0.0
        running_corrects = 0
        
        for inputs, labels in dataloaders[phase]:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            
            with torch.set_grad_enabled(phase == "train"):
                outputs = vgg16(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
                
                if phase == "train":
                    loss.backward()
                    optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        
        epoch_loss = running_loss / dataset_sizes[phase]
        epoch_acc = running_corrects.double() / dataset_sizes[phase]
        
        print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")
        
        if phase == "val" and epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = vgg16.state_dict()


In [None]:
all_preds = []
all_labels = []

vgg16.eval()
for inputs, labels in dataloaders["val"]:
    inputs, labels = inputs.to(device), labels.to(device)
    outputs = vgg16(inputs)
    _, preds = torch.max(outputs, 1)
    all_preds.extend(preds.cpu().numpy())
    all_labels.extend(labels.cpu().numpy())

conf_mat = confusion_matrix(all_labels, all_preds)
sns.heatmap(conf_mat, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
plt.ylabel("Actual")
plt.xlabel("Predicted")
plt.title("Confusion Matrix")
plt.show()