In [1]:
import numpy as np 
import pandas as pd 
import torch 
from torch import nn
from torch import torchvision
from torch.utils.data import Dataloader

from PIL import Image
from pathlib import Path
import random
import matplotlib.pyplot as plt

import os
import zipfile

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
# Define the dataset directory
dataset_dir = Path('./HAM10000')
dataset_zip = 'skin-cancer-mnist-ham10000.zip'

# Download the dataset if it hasn't been downloaded yet
if not dataset_dir.exists():
    # Download the dataset
    !kaggle datasets download -d kmader/skin-cancer-mnist-ham10000

    # Unzip the dataset
    with zipfile.ZipFile(dataset_zip, 'r') as zip_ref:
        zip_ref.extractall(dataset_dir)

    # Remove the zip file after extraction
    os.remove(dataset_zip)
    
    print(f'Dataset downloaded and extracted to {dataset_dir}')
else:
    print(f'Dataset already exists in {dataset_dir}')


  0%|          | 0.00/5.20G [00:00<?, ?B/s]
  0%|          | 1.00M/5.20G [00:00<52:57, 1.76MB/s]
  0%|          | 2.00M/5.20G [00:00<29:03, 3.20MB/s]
  0%|          | 3.00M/5.20G [00:00<21:44, 4.28MB/s]
  0%|          | 4.00M/5.20G [00:01<20:08, 4.62MB/s]
  0%|          | 5.00M/5.20G [00:01<17:25, 5.34MB/s]
  0%|          | 6.00M/5.20G [00:01<17:40, 5.26MB/s]
  0%|          | 7.00M/5.20G [00:01<19:39, 4.73MB/s]
  0%|          | 8.00M/5.20G [00:01<20:12, 4.60MB/s]
  0%|          | 9.00M/5.20G [00:02<20:57, 4.43MB/s]
  0%|          | 10.0M/5.20G [00:02<22:30, 4.13MB/s]
  0%|          | 11.0M/5.20G [00:02<21:36, 4.30MB/s]
  0%|          | 12.0M/5.20G [00:02<19:57, 4.65MB/s]
  0%|          | 13.0M/5.20G [00:03<21:47, 4.26MB/s]
  0%|          | 14.0M/5.20G [00:03<21:39, 4.29MB/s]
  0%|          | 15.0M/5.20G [00:03<22:23, 4.14MB/s]
  0%|          | 16.0M/5.20G [00:03<22:06, 4.20MB/s]
  0%|          | 17.0M/5.20G [00:04<22:31, 4.12MB/s]
  0%|          | 18.0M/5.20G [00:04<21:51, 4.24MB/s]
 

Dataset URL: https://www.kaggle.com/datasets/kmader/skin-cancer-mnist-ham10000
License(s): CC-BY-NC-SA-4.0
Downloading skin-cancer-mnist-ham10000.zip to c:\Users\alexa\Documents\Uni\Personal Projects\Skin_Cancer_Detection

Dataset downloaded and extracted to HAM10000


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

## Helper Functions

In [None]:
import os
def walk_through_dir(dir_path):
  """
  Walks through dir_path returning its contents.
  Args:
    dir_path (str or pathlib.Path): target directory
  
  Returns:
    A print out of:
      number of subdiretories in dir_path
      number of images (files) in each subdirectory
      name of each subdirectory
  """
  for dirpath, dirnames, filenames in os.walk(dir_path):
    print(f"There are {len(dirnames)} directories and {len(filenames)} images in '{dirpath}'.")

https://www.kaggle.com/datasets/kmader/skin-cancer-mnist-ham10000

Original Challenge: https://challenge2018.isic-archive.com

https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/DBW86T
[1] Noel Codella, Veronica Rotemberg, Philipp Tschandl, M. Emre Celebi, Stephen Dusza, David Gutman, Brian Helba, Aadi Kalloo, Konstantinos Liopyris, Michael Marchetti, Harald Kittler, Allan Halpern: “Skin Lesion Analysis Toward Melanoma Detection 2018: A Challenge Hosted by the International Skin Imaging Collaboration (ISIC)”, 2018; https://arxiv.org/abs/1902.03368

[2] Tschandl, P., Rosendahl, C. & Kittler, H. The HAM10000 dataset, a large collection of multi-source dermatoscopic images of common pigmented skin lesions. Sci. Data 5, 180161 doi:10.1038/sdata.2018.161 (2018).

For good notebook: https://www.kaggle.com/code/sid321axn/step-wise-approach-cnn-model-77-0344-accuracy

In [None]:
general_path = Path("/kaggle/input/skin-cancer-mnist-ham10000")
walk_through_dir(general_path)

In [None]:
df = pd.read_csv(general_path / 'HAM10000_metadata.csv')
print(f"The shape of the metadata is {df.shape}")
df.head()

In [2]:
summary_dict = {
    'dx_distribution': df['dx'].value_counts(),
    'dx_type_distribution': df['dx_type'].value_counts(),
    'age_distribution': df['age'].describe(),
    'sex_distribution': df['sex'].value_counts(),
    'localization_distribution': df['localization'].value_counts()
}
summary_dict

IndexError: Cannot choose from an empty sequence

In [None]:
# Let's make a dictionary for each image with its metadata, just for convenience:
image_metadata_dict = df.set_index('image_id').T.to_dict()

In [None]:
# Now, we can always look up the corresponding information from an image_id:
image_id = 'ISIC_0027419'
metadata = image_metadata_dict.get(image_id)
metadata

From the competition, we know that "Cases include a representative collection of all important diagnostic categories in the realm of pigmented lesions: Actinic keratoses and intraepithelial carcinoma / Bowen's disease (akiec), basal cell carcinoma (bcc), benign keratosis-like lesions (solar lentigines / seborrheic keratoses and lichen-planus like keratoses, bkl), dermatofibroma (df), melanoma (mel), melanocytic nevi (nv) and vascular lesions (angiomas, angiokeratomas, pyogenic granulomas and hemorrhage, vasc)."

Let's plot some of the information so that it becomes a bit more clear.

# Let's first look at the different types of pigmented lesions and how many we have available to us in the dataset.
import matplotlib.pyplot as plt
import seaborn as sns

dx_data = summary_dict['dx_distribution']

plt.figure(figsize=(10, 6))
sns.barplot(x=dx_data.index, y=dx_data.values)
plt.title('Dx Distribution')
plt.xlabel('type of pigmented lesion')
plt.ylabel('frequency')
plt.show()

In [None]:
dx_data = summary_dict['dx_type_distribution']

plt.figure(figsize=(10, 6))
sns.barplot(x=dx_data.index, y=dx_data.values)
plt.title('Dx Type Distribution')
plt.xlabel('How the lesion type got confirmed')
plt.ylabel('frequency')
plt.show()

In [None]:

plt.figure(figsize=(10, 6))
sns.histplot(df['age'])
plt.title('Age Distribution')
plt.xlabel('Age')
plt.ylabel('Frequency')
plt.show()

In [None]:
dx_data = summary_dict['dx_distribution']

plt.figure(figsize=(10, 6))
sns.barplot(x=dx_data.index, y=dx_data.values)
plt.title('Dx Distribution')
plt.xlabel('type of pigmented lesion')
plt.ylabel('frequency')
plt.show()

In [None]:
image_path_part_1 = general_path / Path("HAM10000_images_part_1")
path_list_part_1 = list(image_path_part_1.glob('*.jpg'))
len(path_list_part_1)

image_path_part_2 = general_path / Path("HAM10000_images_part_2")
path_list_part_2 = list(image_path_part_2.glob('*.jpg'))
len(path_list_part_2)

all_images_path = path_list_part_1 + path_list_part_2 
len(all_images_path)

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import transforms

In [None]:
# Write a transform for image to tensors
data_transform = transforms.Compose([
    transforms.Resize(size=(64, 64)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor()
])

In [None]:
image_path_part_1 = general_path / Path("HAM10000_images_part_1")
path_list_part_1 = list(image_path_part_1.glob('*.jpg'))
len(path_list_part_1)

In [None]:
image_path_part_2 = general_path / Path("HAM10000_images_part_2")
path_list_part_2 = list(image_path_part_2.glob('*.jpg'))
len(path_list_part_2)

In [None]:
all_images_path = path_list_part_1 + path_list_part_2 
len(all_images_path)

In [None]:
def plot_transformed_images(image_paths, transform, n=3, seed=42):
    """Plots a series of random images from image_paths.

    Will open n image paths from image_paths, transform them
    with transform and plot them side by side.

    Args:
        image_paths (list): List of target image paths. 
        transform (PyTorch Transforms): Transforms to apply to images.
        n (int, optional): Number of images to plot. Defaults to 3.
        seed (int, optional): Random seed for the random generator. Defaults to 42.
    """
    random.seed(seed)
    random_image_paths = random.sample(image_paths, k=n)
    for image_path in random_image_paths:
        with Image.open(image_path) as f:
            fig, ax = plt.subplots(1, 2)
            ax[0].imshow(f) 
            ax[0].set_title(f"Original \nSize: {f.size}")
            ax[0].axis("off")

            # Transform and plot image
            # Note: permute() will change shape of image to suit matplotlib 
            # (PyTorch default is [C, H, W] but Matplotlib is [H, W, C])
            transformed_image = transform(f).permute(1, 2, 0) 
            ax[1].imshow(transformed_image) 
            ax[1].set_title(f"Transformed \nSize: {transformed_image.shape}")
            ax[1].axis("off")

            fig.suptitle(f"Class: {image_path.parent.stem}", fontsize=16)

plot_transformed_images(all_images_path, 
                        transform=data_transform, 
                        n=3)

In [None]:
from sklearn.model_selection import train_test_split

train_paths, test_paths = train_test_split(all_images_path, test_size=0.20, shuffle=True, random_state=42)

In [None]:
print(f"Number of train_paths: {len(train_paths)} and number of test_paths: {len(test_paths)}")

In [None]:
general_path    

In [None]:
import os
import shutil

base_path = '/kaggle/working'

train_dir = os.path.join(base_path, 'train_folder')
test_dir = os.path.join(base_path, 'test_folder')

if not os.path.exists(train_dir):
    os.makedirs(train_dir)
    
if not os.path.exists(test_dir):
    os.makedirs(test_dir)

def copy_images(image_paths, target_dir):
    for path in image_paths:
        
        filename = os.path.basename(path)
        new_path = os.path.join(target_dir, filename)
        
        shutil.copy(path, new_path)

if not os.listdir(train_dir):
    copy_images(train_paths, train_dir)
    
if not os.listdir(test_dir):
    copy_images(test_paths, test_dir)

In [None]:
walk_through_dir('/kaggle/working')

In [None]:
lesion_types = df['dx'].unique()

for lesion_type in lesion_types:
    os.makedirs(os.path.join(train_dir, lesion_type), exist_ok=True)
    os.makedirs(os.path.join(test_dir, lesion_type), exist_ok=True)
    
train_df, test_df = train_test_split(df, test_size=0.20, stratify=df['dx'], random_state=42)

def copy_images_to_subdir(df, base_dir, images_dir1, images_dir2):
    for index, row in df.iterrows():
        file_name = row['image_id'] + '.jpg'
        lesion_type = row['dx']
        
        source_path_1 = os.path.join(images_dir1, file_name)
        source_path_2 = os.path.join(images_dir2, file_name)
        
        if os.path.exists(source_path_1):
            source_path = source_path_1
        elif os.path.exists(source_path_2):
            source_path = source_path_2
        else:
            print(f"Image {file_name} not found in both directories.")
            continue  # Skip this file
        
        target_subdir = os.path.join(base_dir, lesion_type)
        shutil.copy(source_path, target_subdir)
        
images_part1 = general_path / 'HAM10000_images_part_1'
images_part2 = general_path / 'HAM10000_images_part_2'

copy_images_to_subdir(train_df, train_dir, images_part1, images_part2)
copy_images_to_subdir(test_df, test_dir, images_part1, images_part2)

In [None]:
from torchvision import datasets

train_data = datasets.ImageFolder(root=train_dir,
                                  transform=data_transform,
                                  target_transform=None)

test_data = datasets.ImageFolder(root=test_dir,
                                 transform=data_transform)

print(f"Train data:\n{train_data}\nTest data:\n{test_data}")

In [None]:
class_names = train_data.classes
class_dict = train_data.class_to_idx
class_names, class_dict

In [None]:
def set_seeds(seed: int=42):
    """Sets random sets for torch operations.

    Args:
        seed (int, optional): Random seed to set. Defaults to 42.
    """
    # Set the seed for general torch operations
    torch.manual_seed(seed)
    # Set the seed for CUDA torch operations (ones that happen on the GPU)
    torch.cuda.manual_seed(seed)

There are very good pretrained models out there that have been extensively trained and tested. Some of these have been trained on ImageNet, and therefore we need to apply normalization to our images so that the models perform better.

from torch.utils.data import Subset

NUM_WORKERS = os.cpu_count()
BATCH_SIZE = 32

def create_dataloaders(
    train_dir: str, 
    test_dir: str, 
    transform: transforms.Compose, 
    batch_size: int, 
    fraction: float=1.0,
    num_workers: int=NUM_WORKERS
):
    """Creates training and testing DataLoaders.

    Takes in a training directory and testing directory path and turns
    them into PyTorch Datasets and then into PyTorch DataLoaders.

    Args:
    train_dir: Path to training directory.
    test_dir: Path to testing directory.
    transform: torchvision transforms to perform on training and testing data.
    batch_size: Number of samples per batch in each of the DataLoaders.
    num_workers: An integer for number of workers per DataLoader.

    Returns:
    A tuple of (train_dataloader, test_dataloader, class_names).
    Where class_names is a list of the target classes.
    Example usage:
        train_dataloader, test_dataloader, class_names = \
        = create_dataloaders(train_dir=path/to/train_dir,
                                test_dir=path/to/test_dir,
                                transform=some_transform,
                                batch_size=32,
                                num_workers=4)
    """
    # Use ImageFolder to create dataset(s)
    train_data = datasets.ImageFolder(train_dir, transform=transform)
    test_data = datasets.ImageFolder(test_dir, transform=transform)

    # Get class names
    class_names = train_data.classes
    
    # Generate a random subset of indices for train and test sets
    subset_indices_train = np.random.choice(len(train_data), int(len(train_data) * fraction), replace=False)
    subset_indices_test = np.random.choice(len(test_data), int(len(test_data) * fraction), replace=False)

    train_subset = Subset(train_data, subset_indices_train)
    test_subset = Subset(test_data, subset_indices_test)
    
    # Turn images into data loaders
    train_dataloader = DataLoader(
        train_subset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
    )
    test_dataloader = DataLoader(
        test_subset,
        batch_size=batch_size,
        shuffle=False, # don't need to shuffle test data
        num_workers=num_workers,
        pin_memory=True,
    )

    return train_dataloader, test_dataloader, class_names

In [None]:
# These are the transforms we will use as they have been used for training the model
import torchvision
weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT

automatic_transforms = weights.transforms()
print(f"Automatically created transforms: {automatic_transforms}")

In [None]:
# Let's first start by training the model on 5 percent and 10 percent of the data
train_dataloader_5_percent, test_dataloader_5_percent, class_names = create_dataloaders(
    train_dir=train_dir,
    test_dir=test_dir,
    transform=automatic_transforms, # use automatic created transforms
    fraction=0.05,
    batch_size=BATCH_SIZE
)

train_dataloader_10_percent, test_dataloader_10_percent, class_names = create_dataloaders(
    train_dir=train_dir,
    test_dir=test_dir,
    transform=automatic_transforms, # use automatic created transforms
    fraction=0.1,
    batch_size=BATCH_SIZE
)

# Find the number of samples/batches per dataloader (using the same test_dataloader for both experiments)
print(f"Number of batches of size {BATCH_SIZE} in 5 percent training data: {len(train_dataloader_5_percent)}")
print(f"Number of batches of size {BATCH_SIZE} in 10 percent training data: {len(train_dataloader_10_percent)}")
print(f"Number of batches of size {BATCH_SIZE} in testing data: {len(train_dataloader_5_percent)} (all experiments will use the same test set)")
print(f"Number of classes: {len(class_names)}, class names: {class_names}")

In [None]:
# Let's have functions for creating the two models, as we will be experimenting with different hyperparameters.
OUT_FEATURES = len(class_names)

def create_effnetb0():
    weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT
    model = torchvision.models.efficientnet_b0(weights=weights).to(device)

    # 2. Freeze the base model layers
    for param in model.features.parameters():
        param.requires_grad = False

    # 3. Set the seeds
    set_seeds()

    # 4. Change the classifier head
    model.classifier = nn.Sequential(
        nn.Dropout(p=0.2),
        nn.Linear(in_features=1280, out_features=OUT_FEATURES)
    ).to(device)

    # 5. Give the model a name
    model.name = "effnetb0"
    print(f"[INFO] Created new {model.name} model.")
    return model

def create_effnetb2():
    # 1. Get the base model with pretrained weights and send to target device
    weights = torchvision.models.EfficientNet_B2_Weights.DEFAULT
    model = torchvision.models.efficientnet_b2(weights=weights).to(device)

    # 2. Freeze the base model layers
    for param in model.features.parameters():
        param.requires_grad = False

    # 3. Set the seeds
    set_seeds()

    # 4. Change the classifier head
    model.classifier = nn.Sequential(
        nn.Dropout(p=0.3),
        nn.Linear(in_features=1408, out_features=OUT_FEATURES)
    ).to(device)

    # 5. Give the model a name
    model.name = "effnetb2"
    print(f"[INFO] Created new {model.name} model.")
    return model

In [None]:
effnetb0 = create_effnetb0()
effnetb2 = create_effnetb2()

In [None]:
# Create epochs list
num_epochs = [5, 10]

# Create models list
models = ["effnetb0", "effnetb2"]

# Create dataloaders dictionary
train_dataloaders = {"data_10_percent": train_dataloader_5_percent,
                     "data_20_percent": train_dataloader_10_percent}

In [None]:
from typing import Dict, List, Tuple
from tqdm.auto import tqdm

def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> Tuple[float, float]:
  """Trains a PyTorch model for a single epoch.

  Turns a target PyTorch model to training mode and then
  runs through all of the required training steps (forward
  pass, loss calculation, optimizer step).

  Args:
    model: A PyTorch model to be trained.
    dataloader: A DataLoader instance for the model to be trained on.
    loss_fn: A PyTorch loss function to minimize.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
    A tuple of training loss and training accuracy metrics.
    In the form (train_loss, train_accuracy). For example:

    (0.1112, 0.8743)
  """
  # Put model in train mode
  model.train()

  # Setup train loss and train accuracy values
  train_loss, train_acc = 0, 0

  # Loop through data loader data batches
  for batch, (X, y) in enumerate(dataloader):
      # Send data to target device
      X, y = X.to(device), y.to(device)

      # 1. Forward pass
      y_pred = model(X)

      # 2. Calculate  and accumulate loss
      loss = loss_fn(y_pred, y)
      train_loss += loss.item() 

      # 3. Optimizer zero grad
      optimizer.zero_grad()

      # 4. Loss backward
      loss.backward()

      # 5. Optimizer step
      optimizer.step()

      # Calculate and accumulate accuracy metric across all batches
      y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
      train_acc += (y_pred_class == y).sum().item()/len(y_pred)

  # Adjust metrics to get average loss and accuracy per batch 
  train_loss = train_loss / len(dataloader)
  train_acc = train_acc / len(dataloader)
  return train_loss, train_acc

def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module,
              device: torch.device) -> Tuple[float, float]:
  """Tests a PyTorch model for a single epoch.

  Turns a target PyTorch model to "eval" mode and then performs
  a forward pass on a testing dataset.

  Args:
    model: A PyTorch model to be tested.
    dataloader: A DataLoader instance for the model to be tested on.
    loss_fn: A PyTorch loss function to calculate loss on the test data.
    device: A target device to compute on (e.g. "cuda" or "cpu").

  Returns:
    A tuple of testing loss and testing accuracy metrics.
    In the form (test_loss, test_accuracy). For example:

    (0.0223, 0.8985)
  """
  # Put model in eval mode
  model.eval() 

  # Setup test loss and test accuracy values
  test_loss, test_acc = 0, 0

  # Turn on inference context manager
  with torch.inference_mode():
      # Loop through DataLoader batches
      for batch, (X, y) in enumerate(dataloader):
          # Send data to target device
          X, y = X.to(device), y.to(device)

          # 1. Forward pass
          test_pred_logits = model(X)

          # 2. Calculate and accumulate loss
          loss = loss_fn(test_pred_logits, y)
          test_loss += loss.item()

          # Calculate and accumulate accuracy
          test_pred_labels = test_pred_logits.argmax(dim=1)
          test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

  # Adjust metrics to get average loss and accuracy per batch 
  test_loss = test_loss / len(dataloader)
  test_acc = test_acc / len(dataloader)
  return test_loss, test_acc

In [None]:
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter()

In [None]:
from typing import Dict, List
from tqdm.auto import tqdm

# Add writer parameter to train()
def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs: int,
          device: torch.device, 
          writer: torch.utils.tensorboard.writer.SummaryWriter # new parameter to take in a writer
          ) -> Dict[str, List]:
    """Trains and tests a PyTorch model.

    Passes a target PyTorch models through train_step() and test_step()
    functions for a number of epochs, training and testing the model
    in the same epoch loop.

    Calculates, prints and stores evaluation metrics throughout.

    Stores metrics to specified writer log_dir if present.

    Args:
      model: A PyTorch model to be trained and tested.
      train_dataloader: A DataLoader instance for the model to be trained on.
      test_dataloader: A DataLoader instance for the model to be tested on.
      optimizer: A PyTorch optimizer to help minimize the loss function.
      loss_fn: A PyTorch loss function to calculate loss on both datasets.
      epochs: An integer indicating how many epochs to train for.
      device: A target device to compute on (e.g. "cuda" or "cpu").
      writer: A SummaryWriter() instance to log model results to.

    Returns:
      A dictionary of training and testing loss as well as training and
      testing accuracy metrics. Each metric has a value in a list for 
      each epoch.
      In the form: {train_loss: [...],
                train_acc: [...],
                test_loss: [...],
                test_acc: [...]} 
      For example if training for epochs=2: 
              {train_loss: [2.0616, 1.0537],
                train_acc: [0.3945, 0.3945],
                test_loss: [1.2641, 1.5706],
                test_acc: [0.3400, 0.2973]} 
    """
    # Create empty results dictionary
    results = {"train_loss": [],
               "train_acc": [],
               "test_loss": [],
               "test_acc": []
    }

    # Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_fn=loss_fn,
                                          optimizer=optimizer,
                                          device=device)
        test_loss, test_acc = test_step(model=model,
          dataloader=test_dataloader,
          loss_fn=loss_fn,
          device=device)

        # Print out what's happening
        print(
          f"Epoch: {epoch+1} | "
          f"train_loss: {train_loss:.4f} | "
          f"train_acc: {train_acc:.4f} | "
          f"test_loss: {test_loss:.4f} | "
          f"test_acc: {test_acc:.4f}"
        )

        # Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)


        ### New: Use the writer parameter to track experiments ###
        # See if there's a writer, if so, log to it
        if writer:
            # Add results to SummaryWriter
            writer.add_scalars(main_tag="Loss", 
                               tag_scalar_dict={"train_loss": train_loss,
                                                "test_loss": test_loss},
                               global_step=epoch)
            writer.add_scalars(main_tag="Accuracy", 
                               tag_scalar_dict={"train_acc": train_acc,
                                                "test_acc": test_acc}, 
                               global_step=epoch)

            # Close the writer
            writer.close()
        else:
            pass
    ### End new ###

    # Return the filled results at the end of the epochs
    return results

In [None]:
def save_model(model: torch.nn.Module,
               target_dir: str,
               model_name: str):
  '''Saves a PyTorch model to a target directory.

  Args:
    model: A target PyTorch model to save.
    target_dir: A directory for saving the model to.
    model_name: A filename for the saved model. Should include
      either ".pth" or ".pt" as the file extension.

  Example usage:
    save_model(model=model_0,
               target_dir="models",
               model_name="05_going_modular_tingvgg_model.pth")
  '''
  # Create target directory
  target_dir_path = Path(target_dir)
  target_dir_path.mkdir(parents=True,
                        exist_ok=True)

  # Create model save path
  assert model_name.endswith(".pth") or model_name.endswith(".pt"), "model_name should end with '.pt' or '.pth'"
  model_save_path = target_dir_path / model_name

  # Save the model state_dict()
  print(f"[INFO] Saving model to: {model_save_path}")
  torch.save(obj=model.state_dict(),
             f=model_save_path)

In [None]:
def create_writer(experiment_name: str, 
                  model_name: str, 
                  extra: str=None) -> torch.utils.tensorboard.writer.SummaryWriter():
    """Creates a torch.utils.tensorboard.writer.SummaryWriter() instance saving to a specific log_dir.

    log_dir is a combination of runs/timestamp/experiment_name/model_name/extra.

    Where timestamp is the current date in YYYY-MM-DD format.

    Args:
        experiment_name (str): Name of experiment.
        model_name (str): Name of model.
        extra (str, optional): Anything extra to add to the directory. Defaults to None.

    Returns:
        torch.utils.tensorboard.writer.SummaryWriter(): Instance of a writer saving to log_dir.

    Example usage:
        # Create a writer saving to "runs/2022-06-04/data_10_percent/effnetb2/5_epochs/"
        writer = create_writer(experiment_name="data_10_percent",
                               model_name="effnetb2",
                               extra="5_epochs")
        # The above is the same as:
        writer = SummaryWriter(log_dir="runs/2022-06-04/data_10_percent/effnetb2/5_epochs/")
    """
    from datetime import datetime
    import os

    # Get timestamp of current date (all experiments on certain day live in same folder)
    timestamp = datetime.now().strftime("%Y-%m-%d") # returns current date in YYYY-MM-DD format

    if extra:
        # Create log directory path
        log_dir = os.path.join("runs", timestamp, experiment_name, model_name, extra)
    else:
        log_dir = os.path.join("runs", timestamp, experiment_name, model_name)
        
    print(f"[INFO] Created SummaryWriter, saving to: {log_dir}...")
    return SummaryWriter(log_dir=log_dir)

In [None]:
%%time

# 1 set the random seeds
set_seeds(42)

# 2 Keep track of experiment numbers
experiment_number = 0

# 3 Loop through each DataLoader
for dataloader_name, train_dataloader in train_dataloaders.items():

    # 4 Loop through each number of epochs
    for epochs in num_epochs:

        # 5 Loop through each model name
        for model_name in models:

            experiment_number += 1
            print(f"[INFO] Experiment number: {experiment_number}")
            print(f"[INFO] Model: {model_name}")
            print(f"[INFO] DataLoader: {dataloader_name}")
            print(f"[INFO] Number of epochs: {epochs}")  

            # 7. Select the model
            if model_name == "effnetb0":
                model = create_effnetb0() # creates a new model each time (important because we want each experiment to start from scratch)
            else:
                model = create_effnetb2() # creates a new model each time (important because we want each experiment to start from scratch)
            
            # 8. Create a new loss and optimizer for every model
            loss_fn = nn.CrossEntropyLoss()
            optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001)

            # 9. Train target model with target dataloaders and track experiments
            train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=test_dataloader, 
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=epochs,
                  device=device,
                  writer=create_writer(experiment_name=dataloader_name,
                                       model_name=model_name,
                                       extra=f"{epochs}_epochs"))
            
            # 10. Save the model to file so we can get back the best model
            save_filepath = f"07_{model_name}_{dataloader_name}_{epochs}_epochs.pth"
            save_model(model=model,
                       target_dir="/kaggle/working/models",
                       model_name=save_filepath)
            print("-"*50 + "\n")

As we are dealing with an unbalanced dataset, where we have different frequencies for the classes. There are a number of possible solutions for this:

1. **Resampling the Dataset:**
   - **Oversampling the minority class:** You can increase the frequency of the minority classes by duplicating samples or generating synthetic samples, for example, using techniques like SMOTE (Synthetic Minority Over-sampling Technique).
   - **Undersampling the majority class:** This involves reducing the frequency of the majority classes to balance the dataset. However, this might lead to a loss of potentially important data.

2. **Modify Class Weights:**
   - Assign higher weights to the minority classes and lower weights to the majority classes during training. Many machine learning frameworks allow you to set class weights as parameters in the training process.

3. **Data Augmentation:**
   - Especially for image data, you can use various data augmentation techniques such as rotation, flipping, zooming, or color variation to increase the number of minority class samples.

4. **Change the Evaluation Metric:**
   - Instead of using accuracy, use metrics that give better insight into class imbalances like precision, recall, F1 score, or the area under the ROC curve (AUC-ROC).

5. **Ensemble Different Resampled Datasets:**
   - Train multiple models on differently resampled datasets and combine their predictions, which could be as simple as voting or more complex meta-learning approaches.

6. **Use of Anomaly Detection Techniques:**
   - Treat the minority class as an anomaly and use anomaly detection methods which are often robust to imbalanced datasets.

7. **Transfer Learning:**
   - Utilize a pre-trained model on a large and balanced dataset and fine-tune it on your dataset. This can sometimes mitigate the imbalance problem because the model has learned robust feature representations.

8. **Cost-Sensitive Learning:**
   - Modify the learning algorithm to make the misclassification of minority classes more costly than the misclassification of majority classes.

In practice, you may need to combine several of these strategies to achieve the best result. Experiment with different techniques and evaluate the performance of your model with each change. Keep in mind that each dataset is unique, so there is no one-size-fits-all solution, and careful experimentation is key.

In [None]:
best_model_path = "models/07_effnetb2_data_10_percent_10_epochs.pth" 

best_model = create_effnetb2()

best_model.load_state_dict(torch.load(best_model_path))

In [None]:
# Check the model file size
from pathlib import Path

# Get the model size in bytes then convert to megabytes
effnetb2_model_size = Path(best_model_path).stat().st_size // (1024*1024)
print(f"EfficientNetB2 feature extractor model size: {effnetb2_model_size} MB")

Now that we have our current best model, let's plot some images and see how our model is doing. 


In [None]:
def pred_and_plot_image(model: torch.nn.Module,
                        image_path: str,
                        class_names: List[str],
                        image_size: Tuple[int, int] = (224, 224),
                        transform: torchvision.transforms = None,
                        device: torch.device=device):
    
    # Open image
    img = Image.open(image_path)

    # Create transformation for image (if one doesn't exist)
    if transform is not None:
        image_transform = transform
    else:
        image_transform = transforms.Compose([
            transforms.Resize(image_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])

    # Make sure the model is on the right device
    model.to(device)

    # Turn on model evaluation mode and inference mode
    model.eval()
    with torch.inference_mode():
        transformed_image = image_transform(img).unsqueeze(dim=0)

        target_image_pred = model(transformed_image.to(device))

        target_image_pred_probs = torch.softmax(target_image_pred, dim=1)
        target_image_pred_label = torch.argmax(target_image_pred_probs, dim=1)

    plt.figure()
    plt.imshow(img)
    plt.title(f"Pred: {class_names[target_image_pred_label]} | Prob: {target_image_pred_probs.max():.3f} | image_path: {image_path}")
    plt.axis(False)

In [None]:
num_images_to_plot = 3
test_image_path_sample = random.sample(population=all_images_path,
                                       k=num_images_to_plot) # randomly select k number of images

for image_path in test_image_path_sample:
    pred_and_plot_image(model=best_model,
                        image_path=image_path,
                        class_names=class_names,
                        image_size=(224, 224),
                        device=device)

Now let's create a confusion matrix to delve into how our model is making errors, so that we can then improve it. Let's test it on all of our testing data.

In [None]:
train_dataloader_100_percent, test_dataloader_100_percent, class_names = create_dataloaders(
    train_dir=train_dir,
    test_dir=test_dir,
    transform=automatic_transforms, # use automatic created transforms
    fraction=1.0,
    batch_size=BATCH_SIZE
)

In [None]:
import torch
import torchvision
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
from sklearn.metrics import confusion_matrix, accuracy_score
import matplotlib.pyplot as plt
import numpy as np

In [None]:
def get_information_model(model, test_dataloader):
    model.eval()
    model.to(device)

    # Initialize lists to store true and predicted labels
    true_labels = []
    predicted_labels = []

    # Disable gradient calculation for evaluation to save memory and computations
    with torch.no_grad():
        for images, labels in test_dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predictions = torch.max(outputs, 1)
            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predictions.cpu().numpy())
    
    return true_labels, predicted_labels

true_labels, predicted_labels = get_information_model(best_model, test_dataloader_100_percent)

# Calculate confusion matrix and accuracy
cm = confusion_matrix(true_labels, predicted_labels)
accuracy = accuracy_score(true_labels, predicted_labels)

# Plot the confusion matrix
plt.figure(figsize=(10, 10))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(test_data.classes))
plt.xticks(tick_marks, test_data.classes, rotation=45)
plt.yticks(tick_marks, test_data.classes)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

print(f"Accuracy: {accuracy * 100:.2f}%")

In [None]:
print(len(test_dataloader_100_percent))
print(63 * 32)

It seems our model is performing with 71.54% accuracy at the moment. Keep in mind that accuracy is not the greatest way of looking at the performance currently because our classes are unbalanced.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import (
    classification_report, confusion_matrix, accuracy_score,
    roc_auc_score, roc_curve, precision_recall_curve, auc,
    f1_score, precision_score, recall_score
)

In [None]:
# Calculate basic metrics
accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels, average='macro') # TP divided by TP + FP
recall = recall_score(true_labels, predicted_labels, average='macro') # TP divided by TP + FN
f1 = f1_score(true_labels, predicted_labels, average='macro') # Harmonic mean between precision and recall

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")

In [None]:
# Detailed classification report
report = classification_report(true_labels, predicted_labels)
print("Classification Report:\n", report)
print(class_dict)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

In [None]:
dx_data = summary_dict['dx_distribution']
dx_data

In [None]:
total_samples = dx_data.sum()
class_weights = total_samples / (len(dx_data) * dx_data)
total_samples, class_weights

In [None]:
train_dataloader_50_percent, test_dataloader_50_percent, class_names = create_dataloaders(
    train_dir=train_dir,
    test_dir=test_dir,
    transform=automatic_transforms, # use automatic created transforms
    fraction=0.5,
    batch_size=BATCH_SIZE
)

In [None]:
# Convert class weights to a PyTorch tensor
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float)

# Move class weights to GPU if available
if torch.cuda.is_available():
    class_weights_tensor = class_weights_tensor.to('cuda')

In [None]:
# Create epochs list
num_epochs = [10] # This can be tested, for example by using 5 epochs or 20

# Create models list
models = ["effnetb2"] # Here you could use effnetb0 as well

# Create dataloaders dictionary
train_dataloaders = {"data_50_percent": train_dataloader_50_percent} # Again, feel free to include more splits of data.

In [None]:
for dataloader_name, train_dataloader in train_dataloaders.items():

    # 4 Loop through each number of epochs
    for epochs in num_epochs:

        # 5 Loop through each model name
        for model_name in models:

            print(f"[INFO] Model: {model_name}")
            print(f"[INFO] DataLoader: {dataloader_name}")
            print(f"[INFO] Number of epochs: {epochs}")  

            # 7. Select the model
            if model_name == "effnetb0":
                model = create_effnetb0() # creates a new model each time (important because we want each experiment to start from scratch)
            else:
                model = create_effnetb2() # creates a new model each time (important because we want each experiment to start from scratch)
            
            # 8. Create a new loss and optimizer for every model
            loss_fn = nn.CrossEntropyLoss(weight=class_weights_tensor)
            optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001)

            # 9. Train target model with target dataloaders and track experiments
            train(model=model,
                  train_dataloader=train_dataloader,
                  test_dataloader=test_dataloader, 
                  optimizer=optimizer,
                  loss_fn=loss_fn,
                  epochs=epochs,
                  device=device,
                  writer=create_writer(experiment_name=dataloader_name,
                                       model_name=model_name,
                                       extra=f"{epochs}_epochs"))
            
            # 10. Save the model to file so we can get back the best model
            save_filepath = f"07_{model_name}_{dataloader_name}_{epochs}_epochs.pth"
            save_model(model=model,
                       target_dir="/kaggle/working/models",
                       model_name=save_filepath)
            print("-"*50 + "\n")

In [None]:
true_labels_50_percent, predicted_labels_50_percent = get_information_model(model, test_dataloader_100_percent)

# Calculate confusion matrix and accuracy
cm = confusion_matrix(true_labels_50_percent, predicted_labels_50_percent)
accuracy = accuracy_score(true_labels_50_percent, predicted_labels_50_percent)

# Plot the confusion matrix
plt.figure(figsize=(10, 10))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(test_data.classes))
plt.xticks(tick_marks, test_data.classes, rotation=45)
plt.yticks(tick_marks, test_data.classes)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

print(f"Accuracy: {accuracy * 100:.2f}%")

In [None]:
# Detailed classification report
report_weighted_classes = classification_report(true_labels, predicted_labels)
print("Classification Report:\n", report_weighted_classes)
print(class_dict)

In [None]:
# Let's compare to our previous model
report = classification_report(true_labels, predicted_labels)
print("Classification Report:\n", report)
print(class_dict)

Let's now focus on the data a bit more

In [None]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # ImageNet norms
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

In [None]:
import torchvision.models
mobilenet_v3_small = torchvision.models.mobilenet_v3_small(pretrained=True)
mobilenet_v3_small

In [None]:
effnetb2 = torchvision.models.EffNetb2.
effnetb2

In [None]:
def create_mobile_net_v3():
    # 1. Get the base model with pretrained weights and send to target device
    weights = torchvision.models.MobileNetV3.DEFAULT
    model = torchvision.models.mobilenet_v3(weights=weights).to(device)

    # 2. Freeze the base model layers
    for param in model.features.parameters():
        param.requires_grad = False

    # 3. Set the seeds
    set_seeds()

    # 4. Change the classifier head
    model.classifier = nn.Sequential(
        nn.Dropout(p=0.1),
        nn.Linear(in_features=1408, out_features=OUT_FEATURES)
    ).to(device)

    # 5. Give the model a name
    model.name = "effnetb2"
    print(f"[INFO] Created new {model.name} model.")
    return model

Realistically, we should focus more on the data and smaller models before attempting something like this.

In [None]:
import pandas