In [1]:
import torch
import torchvision
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import os
import copy

# Set preference to CUDA (GPU) if avail, fallback to CPU otherwise
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


## Data Augmentation

In [None]:
# Purpose of having seperate transformations for training and validation datasets is to introduce randomness and variability in training
data_transforms = {
    'train': transforms.Compose([
        # randomly crops input image to size of 224 x 224 pixels, helps model learn to recognize objs in diff scales and proportions
        transforms.RandomResizedCrop(224),
        
        # default prob of 0.5, learn from mirrored versions
        transforms.RandomHorizontalFlip(),
        
        # rotate by random angle betwn -10 and 10 degrees
        transforms.RandomRotation(10),
        
        # converts image into a PyTorch tensor, necessary as PyTorch models operate on tensors
        transforms.ToTensor(),
        
        # normalizes the tensor image w a mean and std dev for each RGB, ensures input features similar scale. Specific values used are standard for models pre-trained on ImageNet dataset.
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        
        # resize input image to 256 pixels on shorter side
        transforms.Resize(256),
        
        # crops image to 224x224 pixels around cntre
        transforms.CenterCrop(224),
        
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

## Data Loading

In [None]:
#loads image from a specified directory 
data_dir = 'a5_data'

In [None]:
# create a dictionary for training and validation. Uses ImageFolder class, assigns labels based on their subdirectory names. 
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']}

# load data into memory in batches, making it easier to manage memory usage and improve computation efficiency.
# data will be loaded in batches of 4 in random order as data will be shuffled at every epoch, reduce overfitting
# num_workers=4 allow parallel data loading, speeding up process
dataloaders = {x: DataLoader(image_datasets[x], batch_size=4, shuffle=True, num_workers=4) for x in ['train', 'val']}

# create dict to store no of imgs avail in training and validation datasets. Use for calculating accuracy or loss per epoch
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

In [None]:
#retrieve list of class names from training dataset
class_names = image_datasets['train'].classes

## Model Preparation, Loss Function and Optimizer

In [2]:
# Load a pretrained model and reset final fully connected layer.
model = models.resnet18(pretrained=True)

# retrieve the size of input features produced by the preultimate layer
num_ftrs = model.fc.in_features

# modifies final fully connect layer (model.fc) to match no. of classes in dataset
model.fc = nn.Linear(num_ftrs, len(class_names))

model = model.to(device)

# create an instance of 'CrossEntropyClass', for classification tasks. Measures diff betwn probability distributions of predicted o/p and true labels
# citerion is used later on to calculate loss prediction and actual. the lower, the better.
criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
# SGD updates model parameters using the gradient of the loss with respect to a subset of the data. Makes training faster and can also help model to escape local minima.
# smaller learning rate, model updates its parameters more slowly, lead to more precise convergence, require more epoches
# momentum is to accelerate SGD optimizer and dampen oscillations
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)



## Training, Validation Function and Training Execution

In [3]:
# Decay LR by a factor of 0.1 every 7 epochs
# Purpose of this is to balance the exploration and exploitation phases of training the model. Initially, main objective is to explore the parameter space broadly, but as you get closer to optimal parameters, you want to fine-tune and exploit promising regions.
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward
                # Track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                

        print()

    print(f'Best val Acc: {best_acc:4f}')

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model

# Now we can train the model
model_ft = train_model(model, criterion, optimizer, exp_lr_scheduler, num_epochs=25)

Epoch 0/24
----------
train Loss: 0.6686 Acc: 0.7031
val Loss: 0.3567 Acc: 0.8267

Epoch 1/24
----------
train Loss: 0.5094 Acc: 0.7568
val Loss: 0.3565 Acc: 0.8299

Epoch 2/24
----------
train Loss: 0.4565 Acc: 0.7834
val Loss: 0.3294 Acc: 0.8403

Epoch 3/24
----------
train Loss: 0.4304 Acc: 0.7954
val Loss: 0.3669 Acc: 0.8195

Epoch 4/24
----------
train Loss: 0.4065 Acc: 0.8101
val Loss: 0.2883 Acc: 0.8698

Epoch 5/24
----------
train Loss: 0.3794 Acc: 0.8226
val Loss: 0.2975 Acc: 0.8674

Epoch 6/24
----------
train Loss: 0.3775 Acc: 0.8249
val Loss: 0.3024 Acc: 0.8618

Epoch 7/24
----------
train Loss: 0.3321 Acc: 0.8467
val Loss: 0.2674 Acc: 0.8730

Epoch 8/24
----------
train Loss: 0.3235 Acc: 0.8564
val Loss: 0.2702 Acc: 0.8850

Epoch 9/24
----------
train Loss: 0.3104 Acc: 0.8606
val Loss: 0.2654 Acc: 0.8778

Epoch 10/24
----------
train Loss: 0.3112 Acc: 0.8559
val Loss: 0.2744 Acc: 0.8810

Epoch 11/24
----------
train Loss: 0.2967 Acc: 0.8679
val Loss: 0.2627 Acc: 0.8786

Ep

## Test Data Evaluation

In [4]:
# Load the test dataset
test_dataset = datasets.ImageFolder(os.path.join(data_dir, 'test'), transform=transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]))

test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

In [5]:
# Ensure model in eval mode
model.eval()

#init the predictn and true label lists
true_labels = []
pred_labels = []

# Disable gradient computation
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        
        true_labels.extend(labels.cpu().numpy())
        pred_labels.extend(preds.cpu().numpy())
        


In [7]:
import numpy as np

# Calculate accuracy
accuracy = np.sum(np.array(pred_labels) == np.array(true_labels)) / len(true_labels)
print(f'Accuracy on the test set: {accuracy:.4f}')

Accuracy on the test set: 0.8851


In [8]:
print(class_names)
print(len(class_names))

['MEL', 'NV']
2


In [9]:
PATH = "model_state_dict.pth"

In [10]:
# Save the model state dictionary
torch.save(model.state_dict(), PATH)


## Running based on Saved data

In [11]:
import torch
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import numpy as np
import os


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [12]:
test_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [13]:
data_dir = 'a5_data'  # Or the path to your dataset
test_dataset = datasets.ImageFolder(os.path.join(data_dir, 'test'), transform=test_transforms)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

In [14]:
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = torch.nn.Linear(num_ftrs, 2)

In [15]:
model.load_state_dict(torch.load('model_state_dict.pth'))
model = model.to(device)


In [16]:
model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [17]:
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f'Accuracy of the model on the test images: {accuracy * 100:.2f}%')

Accuracy of the model on the test images: 88.51%


## No augmentation

In [18]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Transformation without data augmentation for training data
data_transforms_no_aug = {
    'train': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    # Assuming the validation transforms are already defined correctly
    'val': data_transforms['val']  
}


In [19]:
image_datasets_no_aug = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms_no_aug[x])
                         for x in ['train', 'val']}
dataloaders_no_aug = {x: DataLoader(image_datasets_no_aug[x], batch_size=4, shuffle=True, num_workers=4)
                      for x in ['train', 'val']}

In [20]:
def initialize_model(num_classes, use_pretrained=True):
    # Load a pretrained ResNet-18 model
    model = models.resnet18(pretrained=use_pretrained)
    
    # Get the number of input features for the final layer (fully connected layer)
    num_ftrs = model.fc.in_features
    
    # Replace the final fully connected layer with a new one with the correct number of classes
    model.fc = nn.Linear(num_ftrs, num_classes)
    
    return model


In [23]:
model_without_aug = initialize_model(len(class_names), use_pretrained=True).to(device)

In [24]:
def train_model_no_aug(model, criterion, optimizer, scheduler, num_epochs=25):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders_no_aug[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward
                # Track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                

        print()

    print(f'Best val Acc: {best_acc:4f}')

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [25]:
# Assuming loss criterion, optimizer, and scheduler setup are done
model_ft_no_aug = train_model_no_aug(model_without_aug, criterion, optimizer, exp_lr_scheduler, num_epochs=25)


Epoch 0/24
----------
train Loss: 0.8084 Acc: 0.5009
val Loss: 0.7603 Acc: 0.5184

Epoch 1/24
----------
train Loss: 0.8078 Acc: 0.4988
val Loss: 0.7527 Acc: 0.5272

Epoch 2/24
----------
train Loss: 0.8097 Acc: 0.4961
val Loss: 0.7498 Acc: 0.5184

Epoch 3/24
----------
train Loss: 0.8082 Acc: 0.4992
val Loss: 0.7898 Acc: 0.5184

Epoch 4/24
----------
train Loss: 0.8095 Acc: 0.4984
val Loss: 0.7796 Acc: 0.5056

Epoch 5/24
----------
train Loss: 0.8098 Acc: 0.5003
val Loss: 0.7968 Acc: 0.5056

Epoch 6/24
----------
train Loss: 0.8090 Acc: 0.5005
val Loss: 0.7789 Acc: 0.5080

Epoch 7/24
----------
train Loss: 0.8077 Acc: 0.4972
val Loss: 0.7618 Acc: 0.5288

Epoch 8/24
----------
train Loss: 0.8124 Acc: 0.4967
val Loss: 0.7561 Acc: 0.5080

Epoch 9/24
----------
train Loss: 0.8083 Acc: 0.5020
val Loss: 0.7722 Acc: 0.5160

Epoch 10/24
----------
train Loss: 0.8084 Acc: 0.4984
val Loss: 0.7751 Acc: 0.5144

Epoch 11/24
----------
train Loss: 0.8076 Acc: 0.4944
val Loss: 0.7878 Acc: 0.5144

Ep

In [26]:
def evaluate_model(model, dataloader, device):
    model.eval()  # Set the model to evaluation mode
    correct_predictions = 0
    total_predictions = 0
    
    with torch.no_grad():  # Inference mode, gradients not needed
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)  # Forward pass
            _, predictions = torch.max(outputs, 1)  # Get the index of the max log-probability as the prediction
            
            correct_predictions += (predictions == labels).sum().item()
            total_predictions += labels.size(0)
    
    accuracy = correct_predictions / total_predictions
    print(f'Accuracy: {accuracy:.4f}')
    return accuracy


In [27]:
evaluate_model(model_ft_no_aug, test_loader, device)

Accuracy: 0.5066


0.5065885797950219

In [28]:
PATH_NO_AUG = "model_state_dict_no_aug.pth"

In [29]:
# Save the model state dictionary
torch.save(model_without_aug.state_dict(), PATH_NO_AUG)