# Environment setup

In [60]:
import os
import gc
import cv2
import math
import copy
import time
import random
import glob
from typing import Dict, List
from matplotlib import pyplot as plt
import seaborn as sns
from random import sample
from matplotlib.offsetbox import OffsetImage, AnnotationBbox

# For data manipulation
import numpy as np
import pandas as pd

# Pytorch Imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader
from torch.cuda import amp
import torchvision

# Utils
import joblib
from tqdm import tqdm
from collections import defaultdict

# Sklearn Imports
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split
from sklearn.metrics import balanced_accuracy_score

# For Image Models
import timm
from PIL import Image

# Albumentations for augmentations
import albumentations as A
from albumentations.pytorch import ToTensorV2

# For colored terminal text
from colorama import Fore, Back, Style
b_ = Fore.BLUE
sr_ = Style.RESET_ALL

import warnings
warnings.filterwarnings("ignore")

from pathlib import Path

# For descriptive error messages
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
os.environ["OPENCV_IO_MAX_IMAGE_PIXELS"] = str(pow(2,60))
print(f"torch version {torch.__version__}") 
print(f'Torchvision version {torchvision.__version__}')

torch version 2.0.0
Torchvision version 0.15.1


In [61]:
CONFIG = {
    "seed": 42,
    "img_size": 224,
    "model_name": "swin",
    "num_classes": 5,
    "valid_batch_size": 64,
    "test_batch_size": 1,
    "device": torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
    "train": False, # To train and save the model. Should be False when submitting
    "split_ratio": 0.2,
    "num_workers": os.cpu_count(),
    "epochs": 50,
    "sandbox": False, # True when finding optimal hyperparameters. Should be False when submitting.
}

In [62]:
def set_seed(seed=42):
    '''Sets the seed of the entire notebook so results are the same every time we run.
    This is for REPRODUCIBILITY.'''
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # When running on the CuDNN backend, two further options must be set
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    # Set a fixed value for the hash seed
    os.environ['PYTHONHASHSEED'] = str(seed)
    
set_seed(CONFIG['seed'])

In [63]:
ROOT_DIR = '/kaggle/input/UBC-OCEAN'
TEST_DIR = '/kaggle/input/UBC-OCEAN/test_thumbnails'
ALT_TEST_DIR = '/kaggle/input/UBC-OCEAN/test_images'
TRAIN_DIR = '/kaggle/input/UBC-OCEAN/train_thumbnails'
ALT_TRAIN_DIR = '/kaggle/input/UBC-OCEAN/train_images'

# Data processing

In [64]:
def get_test_file_path(image_id):
    if os.path.exists(f"{TEST_DIR}/{image_id}_thumbnail.png"):
        return f"{TEST_DIR}/{image_id}_thumbnail.png"
    else: 
        return f"{ALT_TEST_DIR}/{image_id}.png"

In [65]:
df = pd.read_csv(f"{ROOT_DIR}/test.csv")
df['file_path'] = df['image_id'].apply(get_test_file_path)
df['label'] = 0 # dummy
df

Unnamed: 0,image_id,image_width,image_height,file_path,label
0,41,28469,16987,/kaggle/input/UBC-OCEAN/test_thumbnails/41_thu...,0


In [66]:
def trim(im):
    """
    Converts the image to grayscale using cv2, then computes binary matrix
    of the pixels that are above a certrain threshold, then takes out the first
    row where a certain percentage of the pixels are above the threshold will
    be the first clip point. Same idea for col, max row, max col.
    """

    percentage = 0.002
    upper_percentage = 0.97

    img = (np.array(im))
    img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    row_sums = np.sum(img_gray, axis=1)
    col_sums = np.sum(img_gray, axis=0)
    rows = np.where(np.logical_or(row_sums < img.shape[1] * percentage,
                                  row_sums > img.shape[1] * upper_percentage))[0]
    cols = np.where(np.logical_or(col_sums < img.shape[0] * percentage,
                                  col_sums > img.shape[0] * upper_percentage))[0]
    im_crop = np.delete(img, rows, axis=0)
    im_crop = np.delete(im_crop, cols, axis=1)
    return im_crop*255

In [67]:
class UBCDataset(Dataset):
    def __init__(self, df, transforms=None):
        self.df = df
        self.file_names = df['file_path'].values
        print(df['label'].values)
        self.labels = df['label'].values
        self.transforms = transforms
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        img_path = self.file_names[index]
        img = plt.imread(img_path)
        img = trim(img)
        label = self.labels[index]
        
        if self.transforms:
            img = self.transforms(image=img)["image"]
            
        return torch.tensor(img), torch.tensor(label, dtype=torch.long)
        

In [68]:
data_transforms = {
    "valid": A.Compose([
        A.Resize(CONFIG['img_size'], CONFIG['img_size']),
        A.Normalize(
                mean=[0.485, 0.456, 0.406], 
                std=[0.229, 0.224, 0.225], 
                max_pixel_value=255.0, 
                p=1.0
            ),
        ToTensorV2()], p=1.)
}

In [69]:
def get_train_file_path(image_id):
    if os.path.exists(f"{TRAIN_DIR}/{image_id}_thumbnail.png"):
        return f"{TRAIN_DIR}/{image_id}_thumbnail.png"
    else:
        return f"{ALT_TRAIN_DIR}/{image_id}.png"

In [70]:
traindf = pd.read_csv('/kaggle/input/UBC-OCEAN/train.csv')
traindf['file_path'] = traindf['image_id'].apply(get_train_file_path)


# # Step 1: Group by 'label'
# grouped = traindf.groupby('label')

# # Create a 5x3 subplot grid (5 rows and 3 columns)
# fig, axes = plt.subplots(5, 3, figsize=(25, 25))

# # Step 2 and 3: Select 3 random rows from each group and plot the images
# for i, (label, group) in enumerate(grouped):
#     # Select 3 random rows from the group
#     random_rows = group.sample(3)
    
#     for j, (_, row) in enumerate(random_rows.iterrows()):
#         ax = axes[i, j]  # Select the current subplot
#         ax.set_title(f"Label {label} | ID: {row['image_id']}")  # Set the subplot title

#         # Load and display the image from 'file_path'
#         img = plt.imread(row['file_path'])
#         img = (trim(img)).astype(np.uint8)
#         ax.imshow(Image.fromarray(img))
#         ax.axis('off')

# # Hide any empty subplots
# for i in range(len(grouped), 5 * 3):
#     axes[i // 3, i % 3].axis('off')

# plt.tight_layout()
# plt.show()

## Preparing the training data frame for training

In [71]:
traindf['label'][traindf['label']=="HGSC"] = 0
traindf['label'][traindf['label']=="EC"] = 1
traindf['label'][traindf['label']=="CC"] = 2
traindf['label'][traindf['label']=="LGSC"] = 3
traindf['label'][traindf['label']=="MC"] = 4
traindf

Unnamed: 0,image_id,label,image_width,image_height,is_tma,file_path
0,4,0,23785,20008,False,/kaggle/input/UBC-OCEAN/train_thumbnails/4_thu...
1,66,3,48871,48195,False,/kaggle/input/UBC-OCEAN/train_thumbnails/66_th...
2,91,0,3388,3388,True,/kaggle/input/UBC-OCEAN/train_images/91.png
3,281,3,42309,15545,False,/kaggle/input/UBC-OCEAN/train_thumbnails/281_t...
4,286,1,37204,30020,False,/kaggle/input/UBC-OCEAN/train_thumbnails/286_t...
...,...,...,...,...,...,...
533,65022,3,53355,46675,False,/kaggle/input/UBC-OCEAN/train_thumbnails/65022...
534,65094,4,55042,45080,False,/kaggle/input/UBC-OCEAN/train_thumbnails/65094...
535,65300,0,75860,27503,False,/kaggle/input/UBC-OCEAN/train_thumbnails/65300...
536,65371,0,42551,41800,False,/kaggle/input/UBC-OCEAN/train_thumbnails/65371...


In [72]:
train_dataset = UBCDataset(traindf, transforms=data_transforms["valid"]) # TODO: Add training transforms
train_dataloader = DataLoader(train_dataset, batch_size=CONFIG['valid_batch_size'], 
                          num_workers=CONFIG["num_workers"], shuffle=True, pin_memory=True)

[0 3 0 3 1 0 0 0 0 0 0 0 0 0 2 0 0 0 2 2 0 4 3 1 0 1 0 4 3 0 2 1 0 1 3 0 4
 4 0 1 4 2 2 0 0 0 0 0 1 0 4 0 3 2 0 0 0 1 2 1 2 0 4 1 0 3 0 1 1 2 1 2 3 0
 0 2 0 1 0 0 3 0 3 4 4 0 0 2 0 1 0 1 3 1 2 0 1 0 2 0 1 3 0 0 2 3 2 1 1 0 3
 2 3 2 0 2 0 4 1 1 4 0 1 0 1 0 0 2 0 2 2 3 1 1 0 1 0 1 0 2 0 2 0 2 0 2 0 0
 1 0 1 0 2 1 0 0 0 1 1 2 0 0 0 1 0 0 0 1 2 0 0 4 0 0 0 1 4 1 3 3 1 0 4 1 2
 2 1 2 2 2 0 1 1 4 1 3 1 0 2 0 0 0 3 4 0 1 0 2 2 3 0 0 2 2 0 0 3 1 2 0 0 2
 0 0 1 0 0 1 0 0 0 4 2 1 0 0 3 0 0 0 0 2 0 0 2 2 2 2 0 1 0 1 0 1 0 4 4 2 4
 3 0 1 0 1 3 2 0 1 0 0 1 0 1 3 3 3 1 0 3 4 0 3 1 0 1 0 2 0 4 0 0 1 4 1 1 0
 0 2 0 2 0 0 3 0 4 4 4 4 0 3 0 0 4 0 0 0 1 2 3 1 0 3 1 2 2 3 1 3 0 0 0 4 4
 0 3 1 0 0 0 3 0 4 2 0 0 1 1 2 1 0 3 2 3 0 1 2 0 2 4 2 2 2 1 0 0 0 2 0 0 1
 1 2 0 4 2 2 1 1 1 2 0 0 1 1 1 2 2 0 0 0 1 0 2 1 0 2 0 4 0 1 1 4 0 0 2 0 4
 2 4 1 0 0 0 4 3 1 1 1 1 1 0 0 0 0 1 0 1 1 1 2 1 1 0 4 1 2 0 0 2 2 0 2 3 1
 0 1 1 2 0 1 2 1 1 0 1 1 0 1 3 0 0 0 2 0 0 1 0 2 1 4 0 4 3 4 0 2 1 2 4 0 3
 4 0 2 2 1 0 2 2 1 0 0 2 

In [73]:
def visualize_transformed_image(dataframe, transform):
    # Select a random row from the DataFrame
    random_row = dataframe.sample(n=1).iloc[0]

    # Get the image file path from the selected row
    file_path = random_row['file_path']
    print(random_row['is_tma'])
    print(file_path)
    # Load the original image
    original_image = plt.imread(file_path)
    trimmed_image = trim(original_image)
    
    # Apply the transform to the original image
    transformed_image = transform(image=trimmed_image)["image"]

    # Visualize the original and transformed images
    fig, axes = plt.subplots(1, 2, figsize=(20, 10))

    axes[0].set_title('Original Image')
    axes[0].imshow(original_image)
    axes[0].axis('off')

    axes[1].set_title('Transformed Image')
    axes[1].imshow(transformed_image.permute(1, 2, 0))
    axes[1].axis('off')

    plt.show()

# Visualize a random image from the DataFrame with the transform applied
if CONFIG["sandbox"]:
    visualize_transformed_image(traindf, data_transforms["valid"])


## Training and validation split from the training set

In [74]:
# Split the data into training and validation sets
if CONFIG["sandbox"]:
    train_data, val_data = train_test_split(traindf, test_size=CONFIG["split_ratio"], random_state=CONFIG["seed"])

    train_dataset = UBCDataset(train_data, transforms=data_transforms["valid"]) # TODO: Add training transforms
    train_dataloader = DataLoader(train_dataset, batch_size=CONFIG['valid_batch_size'], 
                              num_workers=CONFIG["num_workers"], shuffle=True, pin_memory=True)

    val_dataset = UBCDataset(val_data, transforms=data_transforms["valid"]) 
    val_dataloader = DataLoader(val_dataset, batch_size=CONFIG['valid_batch_size'], 
                              num_workers=CONFIG["num_workers"], shuffle=False, pin_memory=True)

# Train and save model

## Save model

In [75]:
def save_model(model: torch.nn.Module,
               target_dir: str,
               model_name: str):
    """Saves a PyTorch model to a target directory.
    
    Args:
        model: A target PyTorch model to save.
        target_dir: A directory for saving the model to.
        model_name: A filename for the saved model. Should include either ".pth" or ".pt" as the file extension.
        
    Example usage:
        save_model(model=model_0,
        targer_dir="models", 
        model_name="model_1")
    """

    # Create target directory
    target_dir_path = Path(target_dir)
    target_dir_path.mkdir(parents=True,
                          exist_ok=True)

    # Create model save path
    assert model_name.endswith(".pth") or model_name.endswith(".pt"),  "model_name should end with '.pt' or '.pth'"
    model_save_path = target_dir_path / model_name

    # Save the model state_dict()
    print(f"[INFO] Saving model to : {model_save_path}")
    torch.save(obj=model,
               f=model_save_path)


## Importing a pretrained ViT
This is done online before we turn off the internet access. 

In [76]:
if CONFIG["train"] or CONFIG["sandbox"]:
    # 1. Get pretrained weights for ViT-base
    pretrained_weights = torchvision.models.Swin_V2_B_Weights.DEFAULT
    #pretrained_vit_weights=torch.load('/kaggle/input/vit-weights/vit_b_16-c867db91.pth')
    # 2. Setup a ViT model instance with pretrained weights
    pretrained_model = torchvision.models.swin_v2_b(weights=pretrained_weights).to(CONFIG["device"])

    # 3. Freeze the base parameters
    for parameter in pretrained_model.parameters():
        parameter.requires_grad = False
        parameter.to(CONFIG["device"])

    # 4. Change the classifier head
    set_seed()
    pretrained_model.head = nn.Sequential(
            nn.Linear(in_features=pretrained_model.head.in_features, out_features = 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(in_features=256, out_features=64),
            nn.ReLU(),
            nn.Dropout(0.15),
            nn.Linear(in_features=64, out_features = CONFIG["num_classes"])
            
    ).to(CONFIG["device"])
    pretrained_model

    save_model(pretrained_model, # Needs to be saved as a dataset so that it works offline
             "/kaggle/working/",
             f"{CONFIG['model_name']}-pretrained.pth")

## Training the model

In [77]:
"""
Contains functions for training and testing a PyTorch model.
"""
import torch

from tqdm.auto import tqdm
from typing import Dict, List, Tuple

def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> Tuple[float, float]:
    """Trains a PyTorch model for a single epoch.

    Turns a target PyTorch model to training mode and then
    runs through all of the required training steps (forward
    pass, loss calculation, optimizer step).

    Args:
    model: A PyTorch model to be trained.
    dataloader: A DataLoader instance for the model to be trained on.
    loss_fn: A PyTorch loss function to minimize.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    device: A target device to compute on (e.g. "cuda" or "cpu").

    Returns:
    A tuple of training loss and training accuracy metrics.
    In the form (train_loss, train_accuracy). For example:

    (0.1112, 0.8743)
    """
    # Put model in train mode
    model.train()

    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch 
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module,
              device: torch.device) -> Tuple[float, float]:
    """Tests a PyTorch model for a single epoch.

    Turns a target PyTorch model to "eval" mode and then performs
    a forward pass on a testing dataset.

    Args:
    model: A PyTorch model to be tested.
    dataloader: A DataLoader instance for the model to be tested on.
    loss_fn: A PyTorch loss function to calculate loss on the test data.
    device: A target device to compute on (e.g. "cuda" or "cpu").

    Returns:
    A tuple of testing loss and testing accuracy metrics.
    In the form (test_loss, test_accuracy). For example:

    (0.0223, 0.8985)
    """
    # Put model in eval mode
    model.eval() 

    # Setup test loss and test accuracy values
    test_loss, test_acc, balanced_acc = 0, 0, 0
    pred_labels = []
    true_labels = []
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))
            pred_labels = pred_labels + test_pred_labels.cpu().tolist()
            true_labels = true_labels + y.cpu().tolist()

    # Adjust metrics to get average loss and accuracy per batch 
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    balanced_acc = balanced_accuracy_score(true_labels, pred_labels)
    return test_loss, test_acc, balanced_acc

def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs: int,
          device: torch.device) -> Dict[str, List]:
    """Trains and tests a PyTorch model.

    Passes a target PyTorch models through train_step() and test_step()
    functions for a number of epochs, training and testing the model
    in the same epoch loop.

    Calculates, prints and stores evaluation metrics throughout.

    Args:
    model: A PyTorch model to be trained and tested.
    train_dataloader: A DataLoader instance for the model to be trained on.
    test_dataloader: A DataLoader instance for the model to be tested on.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    loss_fn: A PyTorch loss function to calculate loss on both datasets.
    epochs: An integer indicating how many epochs to train for.
    device: A target device to compute on (e.g. "cuda" or "cpu").

    Returns:
    A dictionary of training and testing loss as well as training and
    testing accuracy metrics. Each metric has a value in a list for 
    each epoch.
    In the form: {train_loss: [...],
                  train_acc: [...],
                  test_loss: [...],
                  test_acc: [...]} 
    For example if training for epochs=2: 
                 {train_loss: [2.0616, 1.0537],
                  train_acc: [0.3945, 0.3945],
                  test_loss: [1.2641, 1.5706],
                  test_acc: [0.3400, 0.2973]} 
    """
    # Create empty results dictionary
    results = {"train_loss": [],
                "train_acc": [],
                "test_loss": [],
                "test_acc": [],
                "balanced_acc": [],
                }

    # Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_fn=loss_fn,
                                          optimizer=optimizer,
                                          device=device)
        test_loss, test_acc, balanced_acc = test_step(model=model,
          dataloader=test_dataloader,
          loss_fn=loss_fn,
          device=device)

        # Print out what's happening
        print(
                f"Epoch: {epoch+1} | "
                f"train_loss: {train_loss:.4f} | "
                f"train_acc: {train_acc:.4f} | "
                f"test_loss: {test_loss:.4f} | "
                f"test_acc: {test_acc:.4f} | "
                f"balanced_acc: {balanced_acc:.4f}"
        )

        # Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)
        results["balanced_acc"].append(balanced_acc)

    # Return the filled results at the end of the epochs
    return results

In [78]:
# Create optimizer and loss function
def train_model(train_dl, test_dl, model_name, store_model=True):
    optimizer = torch.optim.Adam(params=pretrained_model.parameters(),
                                 lr=1e-3)
    loss_fn = torch.nn.CrossEntropyLoss()
    
    if test_dl is None:
        test_dl = train_dl
    # Train the classifier head of the pretrained ViT feature extractor model
    set_seed()
    results = train(model=pretrained_model,
                   train_dataloader=train_dl,
                   test_dataloader=test_dl,
                   optimizer=optimizer,
                   loss_fn=loss_fn,
                   epochs=CONFIG["epochs"],
                   device=CONFIG["device"])
    if store_model:
        save_model(pretrained_model, # Needs to be saved as a dataset so that it works offline
                 "/kaggle/working/",
                 model_name)
        
    return results

In [79]:
if CONFIG["train"]:
    results = train_model(train_dl=train_dataloader, test_dl=train_dataloader, model_name=f"{CONFIG['model_name']}.pth")
elif CONFIG["sandbox"]:
    results = train_model(train_dl=train_dataloader, test_dl=val_dataloader, model_name=f"{CONFIG['model_name']}.pth")

In [80]:
def plot_loss_curves(results: Dict[str, List[float]]):
    """Plots training curves of a results dictionary.

    Args:
        results (dict): dictionary containing list of values, e.g.
            {"train_loss": [...],
             "train_acc": [...],
             "test_loss": [...],
             "test_acc": [...]}
    """

    # Get the loss values of the results dictionary (training and test)
    loss = results['train_loss']
    test_loss = results['test_loss']

    # Get the accuracy values of the results dictionary (training and test)
    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    # Figure out how many epochs there were
    epochs = range(len(results['train_loss']))

    # Setup a plot
    plt.figure(figsize=(15, 7))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend()
    plt.show()
    
    # Plot accuracy
    plt.subplot(1, 1, 1)
    plt.plot(epochs, results["balanced_acc"], label='balanced_acc')
    plt.title('Balanced Accuracy for validation data')
    plt.xlabel('Epochs')
    plt.legend()
    plt.show()

In [81]:
# Plot the loss curves
if CONFIG["train"] or CONFIG["sandbox"]:
    plot_loss_curves(results) 

## Testing the model

In [82]:
if CONFIG["train"] or CONFIG["sandbox"]:
    model = torch.load(f"/kaggle/working/{CONFIG['model_name']}.pth")
else:
    model = torch.load(f"/kaggle/input/{CONFIG['model_name']}-trained/{CONFIG['model_name']}-trained.pth") # Needs to be loaded from the input folder so that it works offline

In [83]:
test_dataset = UBCDataset(df, transforms=data_transforms["valid"])
test_loader = DataLoader(test_dataset, batch_size=CONFIG['test_batch_size'], 
                          num_workers=CONFIG["num_workers"], shuffle=False, pin_memory=True)

[0]


In [84]:
class_names = ["HGSC","EC","CC","LGSC","MC"]

# Function to load the model and predict on selected image
def predict_on_image(model, image_path, device=CONFIG["device"]):
    print(device)
    # Load the image and turn it into torch.float32 (same type as model)
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    # Preprocess the image to get it between 0 and 1
    transform = data_transforms["valid"]
    
    image = transform(image=img)["image"] # make sure image has batch dimension (shape: [batch_size, height, width, color_channels])
    image = torch.tensor(np.expand_dims(image, axis=0))
    print(image.to(device).dtype)
    
    # Predict on image
    model.eval()
    with torch.inference_mode():
        # Put image to target device
        image = image.to(device)
        print(image.device)
        
        # Get prediction logits
        pred_logits = model(image)
        print(pred_logits)
        # Get prediction probabilities
        pred_probs = torch.softmax(pred_logits, dim=1)

        # Get prediction label
        pred_label = torch.argmax(pred_probs, dim=1).detach().cpu()
        pred_label_class = class_names[pred_label]

    print(f"[INFO] Pred label: {pred_label} Pred class: {pred_label_class}, Pred prob: {pred_probs.max():.3f}")

In [85]:
CONFIG["device"]
class_names[int(traindf['label'][traindf['image_id']==10077])]

'EC'

In [86]:
if CONFIG["train"]:
    predict_on_image(model, "/kaggle/input/UBC-OCEAN/train_thumbnails/10077_thumbnail.png", device=CONFIG["device"])

In [87]:
# Plot images and make predictions for manual testing
def show_images_from_dataframe(dataframe, num_images):
    # Select 'num_images' random rows from the DataFrame
    selected_rows = dataframe.sample(num_images)
    
    for _, row in selected_rows.iterrows():
        file_path = row['file_path']
        image = cv2.imread(file_path)
        
        plt.figure(figsize=(30, 30))
        plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        plt.title(f"Image ID: {row['image_id']}")
        plt.axis('off')
        plt.show()
    return selected_rows

if CONFIG["sandbox"]:        
    selected_rows = show_images_from_dataframe(val_data, 5)

In [88]:
def predict_and_visualize_images_with_labels(selected_rows, model):
    model.eval()

    for _, row in selected_rows.iterrows():
        file_path = row['file_path']
        image = cv2.imread(file_path)

        # Apply the transform to the original image
        transformed_image = data_transforms["valid"](image=image)["image"].unsqueeze(0).to(CONFIG["device"])

        # Perform inference
        with torch.no_grad():
            output = model(transformed_image)

        # Get the predicted class label
        predicted_class = torch.argmax(output).item()

        plt.figure(figsize=(8, 8))
        plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        plt.title(f"Image ID: {row['image_id']}\nTrue Label: {class_names[row['label']]}\nPredicted Label: {class_names[predicted_class]}")
        plt.axis('off')
        plt.show()
        
if CONFIG["sandbox"]:       
    predict_and_visualize_images_with_labels(selected_rows, model)

In [89]:
def get_label(label):
    return class_names[label]

In [90]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

if CONFIG["sandbox"]:
    preds = []
    true = []
    with torch.no_grad():
        bar = tqdm(enumerate(val_dataloader), total=len(val_dataloader))
        for step, (data, y) in bar:        
            images = data.to(CONFIG["device"], dtype=torch.float)        
            batch_size = images.size(0)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            true.append(y.detach().cpu().numpy())
            preds.append(predicted.detach().cpu().numpy())
    preds = [get_label(item) for item in np.concatenate(preds).flatten()]
    true = [get_label(item) for item in np.concatenate(true).flatten()]

    # Compute the confusion matrix
    cm = confusion_matrix(true, preds)

    # Create a ConfusionMatrixDisplay with display_labels parameter
    cmd = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    cmd.plot(cmap=plt.cm.Blues)  # You can choose a different color map if needed
    plt.show()

# Create submission

In [91]:
preds = []
with torch.no_grad():
    bar = tqdm(enumerate(test_loader), total=len(test_loader))
    for step, (data, _) in bar:        
        images = data.to(CONFIG["device"], dtype=torch.float)        
        batch_size = images.size(0)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        
        preds.append( predicted.detach().cpu().numpy() )
preds = np.concatenate(preds).flatten()
preds


  0%|          | 0/1 [00:00<?, ?it/s]

array([0])

In [92]:
df_sub_2 = pd.read_csv(f"{ROOT_DIR}/test.csv")
df_sub_2

Unnamed: 0,image_id,image_width,image_height
0,41,28469,16987


In [93]:
df_sub_2 = pd.read_csv(f"{ROOT_DIR}/test.csv")
df_sub_2["label"] = preds    
df_sub_2["label"] = df_sub_2["label"].apply(get_label)
df_sub_2 = df_sub_2.drop("image_width", axis=1)
df_sub_2 = df_sub_2.drop("image_height", axis=1)
df_sub_2.to_csv("submission.csv", index=False)

## Check that submission looks reasonable

In [94]:
df_sub_2

Unnamed: 0,image_id,label
0,41,HGSC
