In [1]:
import os
import torch
import PIL
from torch import nn
from torch.utils.data.dataloader import default_collate
import pickle
import torchvision.datasets as datasets
from torchvision.transforms import ToTensor
import torchvision.transforms as transforms
import torchvision.models
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import utils

#ACTUALLY needed for OUR PROJECT
import pickle
from sklearn.model_selection import train_test_split

device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
print(f'Using device:',device)

# Set a random seed for everything important
def seed_everything(seed: int):
    import random, os
    import numpy as np
    import torch
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = False
    torch.backends.cudnn.benchmark = False

# Set a seed with a random integer, in this case, I choose my verymost favourite sequence of numbers
seed_everything(123)

Using device: mps


In [29]:
class TumorDataset(torch.utils.data.Dataset): 
    def __init__(self, data):
        self.data = data

    def __getitem__(self, idx): 
        target = torch.tensor(self.data[idx][0])
        image = torch.tensor(self.data[idx][1])
        return image, target

    def __len__(self):
        return (self.data).shape[0]

def collate_fn(batch):
    return tuple(x_.to(device) for x_ in default_collate(batch))

def get_dataset(test_size, val_size, v=True): 
    with open("dataset.pkl", "rb") as f: 
        rawdata = pickle.load(f)

    dataset = TumorDataset(data=rawdata)

    test_amount, val_amount = int(dataset.__len__() * test_size), int(dataset.__len__() * val_size)

    # this function will automatically randomly split your dataset but you could also implement the split yourself
    train_set, val_set, test_set = torch.utils.data.random_split(dataset, [
                (dataset.__len__() - (test_amount + val_amount)), 
                test_amount, 
                val_amount
    ])
    
    print(f"There are {len(train_set)} examples in the training set")
    print(f"There are {len(test_set)} examples in the test set \n")
    print(f"Image shape is: {train_set[0][0].shape}, label example is {train_set[0][1]}")
    
    return train_set, val_set, test_set

train_set, val_set, test_set = get_dataset(test_size=0.15, val_size=0.15)

# Make dataloaders
batch_size=16

train_dataloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
validation_dataloader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_dataloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)


There are 3220 examples in the training set
There are 690 examples in the test set 

Image shape is: torch.Size([128, 128]), label example is 0


In [None]:
class VGG16D(torch.nn.Module):
    def __init__(self, num_classes, in_channels=1, features_fore_linear=25088, dataset=None):
        super().__init__()
        
        # Helper hyperparameters to keep track of VGG16 architecture
        conv_stride = ...
        pool_stride = ...
        conv_kernel = ...
        pool_kernel = ...
        dropout_probs = ...
        optim_momentum = ...
        weight_decay = ...
        learning_rate = ...

        # Define features and classifier each individually, this is how the VGG16-D model is orignally defined
        # Define the VGG16-D convolutional (feature extraction) layers
        self.features = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Flatten(),
        )

        # Define the VGG16-D fully connected (classification) layers
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),

            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),

            nn.Linear(4096, num_classes),
        )

        # In the paper, they mention updating towards the 'multinomial logistic regression objective'
        # As can be read in Bishop p. 159, taking the logarithm of this equates to the cross-entropy loss function.
        self.criterion = nn.CrossEntropyLoss()

        # Optimizer - For now just set to Adam to test the implementation
        self.optim = torch.optim.Adam(list(self.features.parameters()) + list(self.classifier.parameters()), lr=0.001)
        # self.optim = torch.optim.SGD(list(self.features.parameters()) + list(self.classifier.parameters()), lr=learning_rate, momentum=optim_momentum, weight_decay=weight_decay)

        self.dataset = dataset

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

    def train_model(self, train_dataloader, epochs=1, val_dataloader=None):
        
        # Call .train() on self to turn on dropout
        self.train()

        # To hold accuracy during training and testing
        train_accs = []
        test_accs = []

        for epoch in range(epochs):
            
            epoch_acc = 0

            for inputs, targets in tqdm(train_dataloader):
                logits = self(inputs)
                loss = self.criterion(logits, targets)
                loss.backward()

                self.optim.step()
                self.optim.zero_grad()

                # Keep track of training accuracy
                epoch_acc += (torch.argmax(logits, dim=1) == targets).sum().item()
            train_accs.append(epoch_acc / len(train_dataloader.dataset))

            # If val_dataloader, evaluate after each epoch
            if val_dataloader is not None:
                # Turn off dropout for testing
                self.eval()
                acc = self.eval_model(val_dataloader)
                test_accs.append(acc)
                print(f"Epoch {epoch} validation accuracy: {acc}")
                # turn on dropout after being done
                self.train()
        
        return train_accs, test_accs

    def eval_model(self, test_dataloader):
        
        self.eval()
        total_acc = 0

        for input_batch, label_batch in test_dataloader:
            logits = self(input_batch)

            total_acc += (torch.argmax(logits, dim=1) == label_batch).sum().item()

        total_acc = total_acc / len(test_dataloader.dataset)

        return total_acc

    def predict(self, img_path):
        img = PIL.Image.open(img_path)
        img = self.dataset.dataset.transform(img)
        classification = torch.argmax(self(img.unsqueeze(dim=0)), dim=1)
        return img, classification
    
    def predict_random(self, num_predictions=16):
        """
        Plot random images from own given dataset
        """
        random_indices = np.random.choice(len(self.dataset)-1, num_predictions, replace=False)
        classifcations = []
        labels = []
        images = []
        for idx in random_indices:
            img, label = self.dataset.__getitem__(idx)

            classifcation = torch.argmax(self(img.unsqueeze(dim=0)), dim=1)

            classifcations.append(classifcation)
            labels.append(label)
            images.append(img)

        return classifcations, labels, images

def get_vgg_weights(model):
    """
    Loads VGG16-D weights for the classifier to an already existing model
    Also sets training to only the classifier
    """
    # Load the complete VGG16 model
    temp = torchvision.models.vgg16(weights='DEFAULT')

    # Get its state dict
    state_dict = temp.state_dict()

    # Change the last layer to fit our, smaller network
    state_dict['classifier.6.weight'] = torch.randn(10, 4096)
    state_dict['classifier.6.bias'] = torch.randn(10)

    # Apply the state dict and set the classifer (layer part) to be the only thing we train
    model.load_state_dict(state_dict)

    for param in model.features.parameters():
        param.requires_grad = False

    model.optim = torch.optim.Adam(model.classifier.parameters())


    return model

In [None]:
# Get the in_channels and input dimensions from the first batch
in_channels = next(iter(train_dataloader))[0].shape[1]
in_width_height = next(iter(train_dataloader))[0].shape[-1]

# Create a dummy model to find the dimension before the first linear layer
CNN_model = VGG16D(num_classes=10, in_channels=in_channels)

# WARNING - THIS PART MIGHT BREAK
features_fore_linear = utils.get_dim_before_first_linear(CNN_model.features, in_width_height, in_channels, brain=True)

dummy_input = torch.randn(1, in_channels, in_width_height, in_width_height)
dummy_output = CNN_model.features(dummy_input)
n_features = dummy_output.shape[1]

# Path to save and load model weights
weights_path = 'vgg16d_weights.pth'

# Initialize the model
CNN_model = VGG16D(num_classes=10, in_channels=in_channels, features_fore_linear=n_features, dataset=test_set)
CNN_model.to(device)  # Move model to MPS, CUDA, or CPU

# **Load model weights if available**
if os.path.isfile(weights_path):
    print(f"Loading existing weights from {weights_path}...")
    CNN_model.load_state_dict(torch.load(weights_path))
else:
    print("No pre-trained weights found. Initializing new model...")


# Train model
train_epochs = 2
train_accs, test_accs = CNN_model.train_model(train_dataloader, epochs=train_epochs, val_dataloader=test_dataloader)

# **Save model weights after training**
torch.save(CNN_model.state_dict(), weights_path)
print(f"Model weights saved to {weights_path}.")

# Check if previous accuracy files exist, and load them
if os.path.isfile('train_accs.pkl'):
    with open('train_accs.pkl', 'rb') as f:
        old_train_accs = pickle.load(f)
else:
    old_train_accs = []

if os.path.isfile('test_accs.pkl'):
    with open('test_accs.pkl', 'rb') as f:
        old_test_accs = pickle.load(f)
else:
    old_test_accs = []

# Append new accuracies to the existing list
all_train_accs = old_train_accs + train_accs
all_test_accs = old_test_accs + test_accs

# Save back to the same files
with open('train_accs.pkl', 'wb') as f:
    pickle.dump(all_train_accs, f)

with open('test_accs.pkl', 'wb') as f:
    pickle.dump(all_test_accs, f)


No pre-trained weights found. Initializing new model...


  2%|▏         | 10/592 [00:11<11:15,  1.16s/it]


KeyboardInterrupt: 