In [1]:
try:
    from torchinfo import summary
except:
    print("[INFO] Couldn't find torchinfo... installing it.")
    !pip install -q torchinfo
    from torchinfo import summary

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import os
from pathlib import Path

import re
import random
import matplotlib.pyplot as plt
import math
import torch

from PIL import Image
from pandas import DataFrame
from typing import Tuple, Dict, List
from torch.utils.data import Dataset, DataLoader, ConcatDataset

from sklearn.model_selection import StratifiedKFold

"""
Contains functions for training and testing a PyTorch model.
"""
from torchinfo import summary
from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

from torchvision import datasets, models, transforms
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim

In [3]:
data_path = Path('/kaggle/input/oral-dataset/patches')
dir_list = os.listdir(data_path)
print("Files and directories in '", data_path, "' :")
# prints all files
print(dir_list)

Files and directories in ' /kaggle/input/oral-dataset/patches ' :
['sabpatch_parsed_test.csv', 'images', 'sabpatch_parsed_folders.csv']


In [4]:
image_dir = data_path / 'images'
train_df = pd.read_csv(data_path/'sabpatch_parsed_folders.csv')
train_ds = train_df[['path','lesion']]
test_df = pd.read_csv(data_path/'sabpatch_parsed_test.csv')
test_ds = test_df[['path','lesion']]

In [5]:
ALPHA = 0.00005 ## Learning Rate
EPOCH = 10  ## Epochs
BATCH_SIZE = 32
K_FOLDS = 5

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

MEAN = [0.485, 0.456, 0.406]
STD = [0.229, 0.224, 0.225]

In [6]:
# Write a custom dataset class (inherits from torch.utils.data.Dataset)
class ImageFolderCustom(Dataset):

    # 1. Initialize with a targ_dir and transform (optional) parameter
    def __init__(self,
                 targ_dir: str,
                 path_df: DataFrame,
                 transform=None) -> None:

        # Get all image paths, classes
        self.img_df = path_df

        # Set all images to proper path
        self.img_df['path'] = self.check_path(targ_dir)

        self.paths = list(self.img_df['path'])

        # Setup transforms
        self.transform = transform

        self.classes, self.class_to_idx = self.find_classes()

    # 2. check if its already in proper format
    def check_path(self,
                   targ_dir: str) -> DataFrame:
        if str(targ_dir) in self.img_df.iloc[0,0]:
            return self.img_df['path'].astype('string')
        else:
            return str(targ_dir)+ '/' +  self.img_df['path'].astype('string')

    # 3. Make function to load images
    def load_image(self,
                   index: int) -> Image.Image:
        "Opens an image via a path and returns it."
        image_path = self.img_df.iloc[index, 0]
        return Image.open(image_path)

    # 4. Overwrite the __len__() method (optional but recommended for subclasses of torch.utils.data.Dataset)
    def __len__(self) -> int:
        "Returns the total number of samples."
        return self.img_df.shape[0]

    # 5. Overwrite the __getitem__() method (required for subclasses of torch.utils.data.Dataset)
    def __getitem__(self,
                    index: int) -> Tuple[torch.Tensor, int]:
        "Returns one sample of data, data and label (X, y)."
        img = self.load_image(index)
        class_name  = self.img_df.iloc[index, 1] # expects path in data_folder/class_name/image.jpeg
        class_idx = self.class_to_idx[class_name]

        # Transform if necessary
        if self.transform:
            return self.transform(img), class_idx # return data, label (X, y)
        else:
            return img, class_idx # return data, label (X, y)


    def find_classes(self) -> Tuple[List[str], Dict[str, int]]:

        col = self.img_df.columns
        # 1. Get the class names by scanning the target directory
        classes = sorted(self.img_df[col[1]].unique())

        # 2. Raise an error if class names not found
        if not classes:
            raise FileNotFoundError(f"Couldn't find any classes.")

        # 3. Create a dictionary of index labels (computers prefer numerical rather than string labels)
        class_to_idx = {cls_name: i for i, cls_name in enumerate(classes)}
        return classes, class_to_idx

In [7]:
manual_transforms = transforms.Compose([
        transforms.Resize((224, 224)), # 1. Reshape all images to 224x224 (though some models may require different sizes)
        transforms.ToTensor(), # 2. Turn image values to between 0 & 1
        transforms.Normalize(mean = MEAN, # 3. A mean of [0.485, 0.456, 0.406] (across each colour channel)
                         std = STD)
    ])



    # test_transforms = transforms.Compose([
    #     #transforms.Resize((64, 64)),
    #     transforms.ToTensor(),
    # ])


train_data = ImageFolderCustom(targ_dir = image_dir,
                                          path_df = train_ds,
                                          transform= manual_transforms)

test_data = ImageFolderCustom(targ_dir = image_dir,
                                          path_df = test_ds,
                                          transform= manual_transforms)

classes, class_to_idx = train_data.find_classes()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.img_df['path'] = self.check_path(targ_dir)


In [8]:
model = models.resnet50(pretrained=True).to(device)

# for param in model.parameters():
#     param.requires_grad = False

model.fc = nn.Sequential(
               nn.Linear(2048, 128),
               nn.ReLU(inplace=True),
               nn.Linear(128, 3)).to(device)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 165MB/s]


In [9]:


def reset_weights(m):
  '''
    Try resetting model weights to avoid
    weight leakage.
  '''
  for layer in m.children():
    if hasattr(layer, 'reset_parameters'):
#         print(f'Reset trainable parameters of layer = {layer}')
        layer.reset_parameters()

In [10]:
# loss_fn = nn.CrossEntropyLoss(reduction='sum') # computes the cross entropy loss between input logits and target.

# optimizer = torch.optim.Adam(model.parameters(), lr = ALPHA)

In [11]:
# def train_epoch(model,device,dataloader,loss_fn,optimizer):
#     train_loss,train_correct=0.0,0
#     model.train()
#     for images, labels in dataloader:

#         images,labels = images.to(device),labels.to(device)
#         optimizer.zero_grad()
#         output = model(images)
#         loss = loss_fn(output,labels)
#         loss.backward()
#         optimizer.step()
#         train_loss += loss.item() * images.size(0)
#         scores, predictions = torch.max(output.data, 1)
#         train_correct += (predictions == labels).sum().item()

#     return train_loss,train_correct

# def valid_epoch(model,device,dataloader,loss_fn):
#     valid_loss, val_correct = 0.0, 0
#     model.eval()
#     with torch.no_grad():
#         for images, labels in dataloader:
#             images,labels = images.to(device),labels.to(device)
#             output = model(images)
#             loss=loss_fn(output,labels)
#             valid_loss+=loss.item()*images.size(0)
#             scores, predictions = torch.max(output.data,1)
#             val_correct+=(predictions == labels).sum().item()

#     return valid_loss,val_correct

In [12]:
# dataset = ConcatDataset([train_data, test_data])
# labels = [t[1] for t in dataset]

In [13]:
# history = {'train_loss': [], 'test_loss': [],'train_acc':[],'test_acc':[]}




# # Define the K-fold Cross Validator
# kfold =  StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

#   # Start print
# print('--------------------------------')

# # K-fold Cross Validation model evaluation
# for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset, labels)):
#     print('Fold {}'.format(fold + 1))

#     # Sample elements randomly from a given list of ids, no replacement.
#     train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
#     test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
    
#     # Define data loaders for training and testing data in this fold
#     trainloader = torch.utils.data.DataLoader(
#                       dataset, 
#                       batch_size=32, sampler=train_subsampler)
#     testloader = torch.utils.data.DataLoader(
#                       dataset,
#                       batch_size=32, sampler=test_subsampler)
    
    
#     model.to(device)
#     optimizer = optim.Adam(model.parameters(), lr=0.002)

#     for epoch in range(EPOCH):
#         train_loss, train_correct=train_epoch(model,device,trainloader, loss_fn,optimizer)
#         test_loss, test_correct=valid_epoch(model,device,testloader, loss_fn)

#         train_loss = train_loss / len(trainloader.sampler)
#         train_acc = train_correct / len(trainloader.sampler) * 100
#         test_loss = test_loss / len(testloader.sampler)
#         test_acc = test_correct / len(testloader.sampler) * 100

#         print("Epoch:{}/{} AVG Training Loss:{:.3f} AVG Test Loss:{:.3f} AVG Training Acc {:.2f} % AVG Test Acc {:.2f} %".format(epoch + 1,
#                                                                                                              EPOCH,
#                                                                                                              train_loss,
#                                                                                                              test_loss,
#                                                                                                              train_acc,
#                                                                                                              test_acc))
#         history['train_loss'].append(train_loss)
#         history['test_loss'].append(test_loss)
#         history['train_acc'].append(train_acc)
#         history['test_acc'].append(test_acc)  

In [14]:
class KFold():
#     wandb.init(
#     project="ressnet50-v1",
#     config={
#             "epochs": EPOCH,
#             "batch_size": BATCH_SIZE,
#             "lr": ALPHA,
#             "architecture": "CNN",
#             })

    def __init__(self,
                 model,
                 loss_fn,
                 optimizer,
                 device,
                 early_stopping = False):

        self.model = model
        self.loss_fn = loss_fn
        self.optimizer = optimizer
        self.device = device

        self.early_stopping = early_stopping
        self.counter = 0
        self.early_stop = False # type: ignore
        self.best_score = None

#         # Copy your config
#         self.config = wandb.config

    def check_early_stop(self,
                   val_loss,
                   delta,
                   verbose,
                   patience,
                   epoch):

        score = -val_loss
        # print(verbose)
        if self.best_score is None:
            self.best_score = score

        elif score < self.best_score + delta and epoch < self.epochs:
            self.counter += 1

            if verbose:
                print(f"Early stopping counter: {self.counter} out of {patience}")

            if self.counter >= patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.early_stop = False
            self.counter = 0
            

    def train_epoch(self, epoch):
        
        y_tr_true, y_tr_pred= [], []
        train_loss = 0.0
        
        model.train()
        for i, data in enumerate(tqdm(self.trainloader, desc=f'Epoch {epoch + 1}/{self.epochs}', unit='batch')):
            
            images, labels = data
            y_tr_true.extend(labels) # collect all training labels
            images,labels = images.to(device),labels.to(device)
            
            optimizer.zero_grad()
            output = model(images)
            loss = loss_fn(output,labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * images.size(0)
            scores, predictions = torch.max(output.data, 1)
            
            y_tr_pred.extend(predictions.cpu())
           
        
        train_correct =  accuracy_score(y_tr_true, y_tr_pred)

        return train_loss,train_correct

    def valid_epoch(self, epoch):
        
        valid_loss = 0.0
        y_tr_true, y_tr_pred= [], []
        
        model.eval()
        with torch.no_grad():
            for i, data in enumerate(tqdm(self.testloader, desc=f'Epoch {epoch + 1}/{self.epochs}', unit='batch')):
                
                images, labels = data
                y_tr_true.extend(labels) # collect all training labels
                images,labels = images.to(device),labels.to(device)
                
                output = model(images)
                loss=loss_fn(output,labels)
                valid_loss+=loss.item()*images.size(0)
                scores, predictions = torch.max(output.data,1)
                y_tr_pred.extend(prediction.cpu())
        
        val_correct = accuracy_score(y_tr_true, y_tr_pred)

        return valid_loss,val_correct
    
    def train(self,
              train_data,
              test_data,
              epochs=1,
              k_folds=5,
              delta = 0,
              patience = 10,
              verbose = False):

        self.epochs = epochs
        self.train_dataloader = train_data
        self.test_dataloader = test_data


        # Create empty results dictionary
        results = {"epoch":[],
                "train_loss": [],
                "train_acc": [],
                "test_loss": [],
                "test_acc": []
        }

        # Define the K-fold Cross Validator
        kfold =  StratifiedKFold(n_splits= k_folds, shuffle=True, random_state=42)
        
        self.dataset = ConcatDataset([train_data, test_data])
        labels = [t[1] for t in self.dataset]

        # Make sure model on target device
        self.model.to(self.device)

        # Loop through training and testing steps for a number of epochs
        print('--------------------------------')

        # K-fold Cross Validation model evaluation
        for fold, (train_ids, test_ids) in enumerate(kfold.split(self.dataset, labels)):
            print('Fold {}'.format(fold + 1))
            
            # Sample elements randomly from a given list of ids, no replacement.
            train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
            test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)

            
            # Define data loaders for training and testing data in this fold
            self.trainloader = torch.utils.data.DataLoader(
                              self.dataset, 
                              batch_size = BATCH_SIZE,
                              sampler = train_subsampler)

            self.testloader = torch.utils.data.DataLoader(
                              self.dataset,
                              batch_size = BATCH_SIZE, 
                              sampler = test_subsampler)

            for epoch in range(self.epochs):
                train_loss, train_correct = self.train_epoch(epoch)
                test_loss, test_correct = self.valid_epoch(epoch)

                train_loss = train_loss / len(self.trainloader.sampler)
                train_acc = train_correct * 100
                test_loss = test_loss / len(self.testloader.sampler)
                test_acc = test_correct * 100
                                                                                                            

                # Print out what's happening
                print(
                f"Epoch: {epoch+1} | "
                f"train_loss: {train_loss:.4f} | "
                f"train_acc: {train_acc:.4f} | "
                f"test_loss: {test_loss:.4f} | "
                f"test_acc: {test_acc:.4f}"
                )

                # Update results dictionary
                results["epoch"].append(epoch+1)
                results["train_loss"].append(train_loss)
                results["test_loss"].append(test_loss)
                results["train_acc"].append(train_acc)
                results["test_acc"].append(test_acc)



                if self.early_stopping:
                    self.check_early_stop(test_loss, delta, verbose, patience, epoch)
                    if self.early_stop:
                        print("Early Stopping")
                        break

        # Mark the run as finished
#         wandb.finish()
        # Return the filled results at the end of the epochs
        return results


In [15]:
loss_fn = nn.CrossEntropyLoss(reduction='sum') # computes the cross entropy loss between input logits and target.

optimizer = torch.optim.Adam(model.parameters(), lr = ALPHA)

# Start the timer
from timeit import default_timer as timer
start_time = timer()

# Setup training and save the results
Engine = KFold(model=model, loss_fn=loss_fn, optimizer=optimizer, device=device, early_stopping=True)
history = Engine.train(train_data=train_data, test_data=test_data, epochs=EPOCH, verbose=True, patience = 5)

# End the timer and print out how long it took
end_time = timer()
print(f"[INFO] Total training time: {end_time-start_time:.3f} seconds")

--------------------------------
Fold 1


Epoch 1/10:   0%|          | 0/95 [00:00<?, ?batch/s]

TypeError: can't convert cuda:0 device type tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.

In [None]:

avg_train_loss = np.mean(history['train_loss'])
avg_test_loss = np.mean(history['test_loss'])
avg_train_acc = np.mean(history['train_acc'])
avg_test_acc = np.mean(history['test_acc'])

print('Performance of {} fold cross validation'.format(K_FOLDS))
print("Average Training Loss: {:.4f} \t Average Test Loss: {:.4f} \t Average Training Acc: {:.3f} \t Average Test Acc: {:.3f}".format(avg_train_loss,avg_test_loss,avg_train_acc,avg_test_acc)) 