In [1]:
import time
import sys
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from dataset import ImageDataset
import torch.nn.functional as F
from torchmetrics import Precision, Recall
from torchvision.models import resnet18
import warnings
from collections import defaultdict
import wandb
import datetime
import os
from vit.vit import VisionTransformer
warnings.filterwarnings('ignore')
torch.set_float32_matmul_precision('high')


run_name = f'VIT_{datetime.datetime.now()}'
run_path = f'training_checkpoints/{run_name}'

wandb.init(project="cells", 
           entity="adamsoja",
          name=run_name)

import random
random.seed(2233)
torch.manual_seed(2233)

from albumentations import (
    Compose,
    Resize,
    OneOf,
    RandomBrightness,
    RandomContrast,
    MotionBlur,
    MedianBlur,
    GaussianBlur,
    VerticalFlip,
    HorizontalFlip,
    ShiftScaleRotate,
    Normalize,
)

transform = Compose(
    [
        OneOf([RandomBrightness(limit=0.4, p=1), RandomContrast(limit=0.4, p=1)]),
        OneOf([MotionBlur(blur_limit=3), MedianBlur(blur_limit=3), GaussianBlur(blur_limit=3),], p=0.7,),
        VerticalFlip(p=0.5),
        HorizontalFlip(p=0.5),]
)

class MyModel(nn.Module):
    def __init__(self, model, learning_rate, weight_decay):
        super(MyModel, self).__init__()
        self.model = model
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.criterion = nn.CrossEntropyLoss()
        self.metric_precision = Precision(task="multiclass", num_classes=4, average=None).to('cuda')
        self.metric_recall = Recall(task="multiclass", num_classes=4, average=None).to('cuda')
        self.train_loss = []
        self.valid_loss = []
        self.precision_per_epochs = []
        self.recall_per_epochs = []

        self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
        self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, mode="min", factor=0.1, patience=2, min_lr=5e-6, verbose=True)
        self.step = 0

    
    def forward(self, x):
        return self.model(x)

    def train_one_epoch(self, trainloader):
        self.step += 1
        self.train()
        for batch_idx, (inputs, labels) in enumerate(trainloader):
            inputs, labels = inputs.to('cuda'), labels.to('cuda')
            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, labels)
            loss.backward()
            self.optimizer.step()
            _, preds = torch.max(outputs, 1)
            _, labels = torch.max(labels, 1)
            self.metric_precision(preds, labels)
            self.metric_recall(preds, labels)
            self.train_loss.append(loss.item())


        

        
        avg_loss = np.mean(self.train_loss)
        self.train_loss.clear()
        precision = self.metric_precision.compute()
        recall = self.metric_recall.compute()
        self.precision_per_epochs.append(precision)
        self.recall_per_epochs.append(recall)
        print(f'train_loss: {avg_loss}')
        print(f'train_precision: {precision}')
        print(f'train_recall: {recall}')

        wandb.log({'loss': avg_loss},step=self.step)
        wandb.log({'Normal precision': precision[0].item()},step=self.step)
        wandb.log({'Inflamatory precision': precision[1].item()},step=self.step)
        wandb.log({'Tumor precision': precision[2].item()},step=self.step)
        wandb.log({'Other precision': precision[3].item()},step=self.step)


        wandb.log({'Normal recall': recall[0].item()},step=self.step)
        wandb.log({'Inflamatory recall': recall[1].item()},step=self.step)
        wandb.log({'Tumor recall': recall[2].item()},step=self.step)
        wandb.log({'Other recall': recall[3].item()},step=self.step)
        
        
        self.metric_precision.reset()
        self.metric_recall.reset()


    

    def evaluate(self, testloader):
        self.eval()
        with torch.no_grad():
            for batch_idx, (inputs, labels) in enumerate(testloader):
                inputs, labels = inputs.to('cuda'), labels.to('cuda')
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)
                _, labels = torch.max(labels, 1)
                self.metric_precision(preds, labels)
                self.metric_recall(preds, labels)
                self.valid_loss.append(loss.item())
    
        avg_loss = np.mean(self.valid_loss)
        self.scheduler.step(avg_loss)
        self.valid_loss.clear()
        precision = self.metric_precision.compute()
        recall = self.metric_recall.compute()
        print(f'val_loss: {avg_loss}')
        print(f'val_precision: {precision}')
        print(f'val_recall: {recall}')
        self.metric_precision.reset()
        self.metric_recall.reset()

        wandb.log({'val_loss': avg_loss}, step=self.step)
        
        wandb.log({'val_Normal precision': precision[0].item()},step=self.step)
        wandb.log({'val_Inflamatory precision': precision[1].item()},step=self.step)
        wandb.log({'val_Tumor precision': precision[2].item()},step=self.step)
        wandb.log({'val_Other precision': precision[3].item()},step=self.step)


        wandb.log({'val_Normal recall': recall[0].item()},step=self.step)
        wandb.log({'val_Inflamatory recall': recall[1].item()},step=self.step)
        wandb.log({'val_Tumor recall': recall[2].item()},step=self.step)
        wandb.log({'val_Other recall': recall[3].item()},step=self.step)


        for param_group in self.optimizer.param_groups:
            print(f"Learning rate: {param_group['lr']}")
        return avg_loss

torch.cuda.empty_cache()
batch_size = 256
trainset = ImageDataset(data_path='train_data', transform=transform)
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=5)

testset = ImageDataset(data_path='validation_data')
testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=3)

learning_rate = 0.001
weight_decay = 0.00001

model = VisionTransformer(image_size=32, in_channels=3, num_classes=4, hidden_dims=[128, 128], dropout_rate=0.6)
model = model.to('cuda')


my_model = MyModel(model=model, learning_rate=learning_rate, weight_decay=weight_decay)
my_model = my_model.to('cuda')

num_epochs = 100
early_stop_patience = 10
best_val_loss = float('inf')
best_model_state_dict = None

for epoch in range(num_epochs):
    print('========================================')
    print(f'EPOCH: {epoch}') 
    time_start = time.perf_counter()
    my_model.train_one_epoch(trainloader)
    val_loss = my_model.evaluate(testloader)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_state_dict = my_model.state_dict()
        torch.save(best_model_state_dict, f'{run_path}.pth')
        
        patience_counter = 0
    else:
        patience_counter += 1

    if patience_counter >= early_stop_patience:
        print(f"Early stopping at epoch {epoch} with best validation loss {best_val_loss}")
        break
    time_epoch = time.perf_counter() - time_start
    print(f'epoch {epoch} time:  {time_epoch/60}')
    print('--------------------------------')

# Load the best model state dict
print(f'{run_path}.pth')
my_model.load_state_dict(torch.load(f'{run_path}.pth'))
print(f'{run_path}.pth')

ModuleNotFoundError: No module named 'dataset'

In [None]:
from sklearn.metrics import confusion_matrix, classification_report
from torch.utils.data import DataLoader
import numpy as np
import torch 
from torchvision.models import resnet18
import torch.nn as nn


def test_report(model, dataloader):
    """Prints confusion matrix for testing dataset
    dataloader should be of batch_size=1."""

    y_pred = []
    y_test = []
    model.eval()
    with torch.no_grad():
        for data, label in dataloader:
            output = model(data)
            label = label.numpy()
            output = output.numpy()
            y_pred.append(np.argmax(output))
            y_test.append(np.argmax(label))
        print(confusion_matrix(y_test, y_pred))
        print(classification_report(y_test, y_pred))

testset =ImageDataset(data_path='test_data')
dataloader = DataLoader(testset, batch_size=1, shuffle=True)

test_report(my_model.to('cpu'), dataloader)

In [None]:
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchmetrics import Precision, Recall
from dataset import ImageDataset
import numpy as np
import datetime
import random
import time
random.seed(2233)
torch.manual_seed(2233)

#After /255 so in loading dataset there are no division by 255 just this normalization
mean = [0.5006, 0.3526, 0.5495]
std = [0.1493, 0.1341, 0.1124]

from albumentations import (
    Compose,
    Resize,
    OneOf,
    RandomBrightness,
    RandomContrast,
    MotionBlur,
    MedianBlur,
    GaussianBlur,
    VerticalFlip,
    HorizontalFlip,
    ShiftScaleRotate,
    Normalize,
)

transform = Compose(
    [
        Normalize(mean=mean, std=std),
        OneOf([RandomBrightness(limit=0.1, p=1), RandomContrast(limit=0.1, p=0.8)]),
        OneOf([MotionBlur(blur_limit=3), MedianBlur(blur_limit=3), GaussianBlur(blur_limit=3),], p=0.7,),
        VerticalFlip(p=0.5),
        HorizontalFlip(p=0.5),
    ]
)

transform_test = Compose(
    [Normalize(mean=mean, std=std)]
)



def objective(trial):
    # Hyperparameters to be tuned
    batch_size = trial.suggest_int('batch_size', 32, 768)
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-6, 1e-2)
    dropout_rate = trial.suggest_uniform('dropout_rate', 0.1, 0.8)
    num_heads = trial.suggest_int('num_heads', 2, 16)
    patch_size =trial.suggest_categorical('patch_size', [4, 8])
    embedding_dim = trial.suggest_int('embedding_dim', 32, 256)
    num_layers = trial.suggest_int('num_layers', 3, 6)
    
    # Data loaders
    trainset = ImageDataset(data_path='train_data', transform=transform)
    trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=3)
    
    testset = ImageDataset(data_path='validation_data', transform=transform_test)
    testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

    # Model setup
    model = VisionTransformer(image_size=32, 
                          in_channels=3, 
                          num_classes=4, 
                          hidden_dims=[32], 
                          dropout_rate=dropout_rate,
                          embedding_dim=embedding_dim,
                          patch_size=patch_size,
                          num_layers=3,
                          num_heads=num_heads,
                          use_linear_patch=False,
                          use_conv_stem=True,
                          use_conv_patch=False)
    model = model.to('cuda')

    # Custom model class
    class MyModel(nn.Module):
        def __init__(self, model, learning_rate):
            super(MyModel, self).__init__()
            self.model = model
            self.criterion = nn.CrossEntropyLoss()
            self.optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
            self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, mode="min", factor=0.1, patience=7, min_lr=5e-6, verbose=True)
            self.step = 0
            self.metric_precision = Precision(task="multiclass", num_classes=num_classes, average=None).to('cuda')
            self.metric_recall = Recall(task="multiclass", num_classes=num_classes, average=None).to('cuda')
            self.train_loss = []
            self.valid_loss = []

        def forward(self, x):
            return self.model(x)

        def train_one_epoch(self, trainloader):
            self.train()
            for inputs, labels in trainloader:
                inputs, labels = inputs.to('cuda'), labels.to('cuda')
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                self.train_loss.append(loss.item())
            avg_loss = np.mean(self.train_loss)
            self.train_loss.clear()
            return avg_loss

        def evaluate(self, testloader):
            self.eval()
            with torch.no_grad():
                for inputs, labels in testloader:
                    inputs, labels = inputs.to('cuda'), labels.to('cuda')
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs, labels)
                    self.valid_loss.append(loss.item())
            avg_loss = np.mean(self.valid_loss)
            self.valid_loss.clear()
            self.scheduler.step(avg_loss)
            return avg_loss

    my_model = MyModel(model=model, learning_rate=learning_rate)
    my_model = my_model.to('cuda')
    early_stop_patience = 15
    num_epochs = 100
    best_val_loss = float('inf')
    for epoch in range(num_epochs):
        my_model.train_one_epoch(trainloader)
        val_loss = my_model.evaluate(testloader)
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
        else:
            patience_counter += 1


        if patience_counter >= early_stop_patience:
            print(f"Early stopping at epoch {epoch} with best validation loss {best_val_loss}")
            break
            
    return best_val_loss

# Start the optimization
study = optuna.create_study(direction='minimize',
                            storage="sqlite:///db.sqlite3",  
                            study_name="vit_finetune_test",
                           load_if_exists=True)
start = time.perf_counter()
study.optimize(objective, n_trials=2)
stop = time.perf_counter()
print(f"Best trial: {study.best_trial.value}")
print(f"Best hyperparameters: {study.best_trial.params}")
