In [None]:
# =========================
# Standard library
# =========================
import os
import sys
import time
import copy
import warnings

warnings.filterwarnings("ignore")

# =========================
# Core scientific stack
# =========================
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

# Jupyter only
%matplotlib inline

# =========================
# PyTorch + TorchVision
# =========================
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
from torchvision import datasets
from torchvision import transforms as T
from torchvision import models as tv_models
from torch.utils.data import DataLoader, sampler, random_split

# =========================
# timm
# =========================
import timm
from timm.loss import LabelSmoothingCrossEntropy

# =========================
# Metrics (scikit-learn)
# =========================
import sklearn.metrics as sk_metrics
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    cohen_kappa_score,
    confusion_matrix,
    ConfusionMatrixDisplay,
    classification_report,
    roc_curve,
    auc,
)


In [None]:
def get_classes(data_dir):
    all_data = datasets.ImageFolder(data_dir)
    return all_data.classes

In [None]:
def get_classes(data_dir):
    """Get class names from dataset directory."""
    all_data = datasets.ImageFolder(data_dir)
    return all_data.classes

def get_data_loaders(data_dir, batch_size, train=False):
    """Prepare data loaders for training, validation, and testing."""
    if train:
        # Data augmentation for training
        transform = T.Compose([
            T.RandomHorizontalFlip(),
            T.RandomVerticalFlip(),
          
            T.Resize((224, 224)),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        train_data = datasets.ImageFolder(os.path.join(data_dir, "train/"), transform=transform)
        train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
        return train_loader, len(train_data)
    else:
        # Validation and test data transformations
        transform = T.Compose([
            T.Resize((224, 224)),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        val_data = datasets.ImageFolder(os.path.join(data_dir, "validation/"), transform=transform)
        test_data = datasets.ImageFolder(os.path.join(data_dir, "test/"), transform=transform)
        val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers=4)
        test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=4)
        return val_loader, test_loader, len(val_data), len(test_data)





In [None]:
dataset_path = "new_directories"

In [None]:
(train_loader, train_data_len) = get_data_loaders(dataset_path, 124, train=True)
(val_loader, test_loader, valid_data_len, test_data_len) = get_data_loaders(dataset_path, 32, train=False)

In [None]:
classes = get_classes("new_directories/train")
print(classes, len(classes))

In [None]:
dataloaders = {
    "train": train_loader,
    "val": val_loader
}
dataset_sizes = {
    "train": train_data_len,
    "val": valid_data_len
}

In [None]:
print(len(train_loader), len(val_loader), len(test_loader))

In [None]:
print(train_data_len, valid_data_len, test_data_len)

In [None]:
# now, for the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
# torch.hub.load version
HUB_URL = "SharanSMenon/swin-transformer-hub:main"
MODEL_NAME = "swin_tiny_patch4_window7_224"
model = torch.hub.load(HUB_URL, MODEL_NAME, pretrained=True)

# timm.create_model version also available
# model=timm.create_model('swin_tiny_patch4_window7_224',pretrained=True)

In [None]:
n_inputs = model.head.in_features
model.head = nn.Sequential(
    nn.Linear(n_inputs, 512),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(512, len(classes))
)
model = model.to(device)
print(model.head)

In [None]:
criterion = LabelSmoothingCrossEntropy()
criterion = criterion.to(device)
optimizer = optim.AdamW(model.head.parameters(), lr=0.001)

In [None]:
# lr scheduler
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.97)

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=10, device='cuda'):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    # Lists to store metrics per epoch
    train_losses, val_losses = [], []
    train_accs, val_accs = [], []
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print("-" * 10)
        
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode
            
            running_loss = 0.0
            running_corrects = 0
            
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            if phase == 'train':
                scheduler.step()
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            print("{} Loss: {:.4f} Acc: {:.4f}".format(phase, epoch_loss, epoch_acc))
            
            # Save metrics
            if phase == 'train':
                train_losses.append(epoch_loss)
                train_accs.append(epoch_acc)
            else:
                val_losses.append(epoch_loss)
                val_accs.append(epoch_acc)
            
            # Deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print("Best Val Acc: {:.4f}".format(best_acc))

    # Load best model weights
    model.load_state_dict(best_model_wts)

   
    
    return model, train_losses,val_losses, train_accs,val_accs


In [None]:
model_ft,train_losses,val_losses, train_accs,val_accs = train_model(model, criterion, optimizer, exp_lr_scheduler, num_epochs=30) # now it is a lot faster


## Testing

In [None]:
test_loss = 0.0
class_correct = list(0 for i in range(len(classes)))
class_total = list(0 for i in range(len(classes)))
model_ft.eval()

TRUE=np.empty((0), dtype=int)
PRED=np.empty((0), dtype=int) 

for data, target in tqdm(test_loader):
    data, target = data.to(device), target.to(device)
    with torch.no_grad(): # turn off autograd for faster testing
        output = model_ft(data)
        loss = criterion(output, target)
    test_loss = loss.item() * data.size(0)
    _, pred = torch.max(output, 1)
    
    TRUE=np.concatenate([TRUE,target.data.cpu().numpy()],0) ######
    PRED=np.concatenate([PRED,pred.data.cpu().numpy()],0) ######    
        
    correct_tensor = pred.eq(target.data.view_as(pred))
    correct = np.squeeze(correct_tensor.cpu().numpy())
    if len(target) == 32:
        for i in range(32):
            label = target.data[i]
            class_correct[label] += correct[i].item()
            class_total[label] += 1

test_loss = test_loss / test_data_len
print('Test Loss: {:.4f}'.format(test_loss))
for i in range(len(classes)):
    if class_total[i] > 0:
        print("Test Accuracy of %5s: %2d%% (%2d/%2d)" % (
            classes[i], 100*class_correct[i]/class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])
        ))
    else:
        print("Test accuracy of %5s: NA" % (classes[i]))
print("Test Accuracy of %2d%% (%2d/%2d)" % (
            100*np.sum(class_correct)/np.sum(class_total), np.sum(class_correct), np.sum(class_total)
        ))

In [None]:

finaltrain = pd.DataFrame([])

finaltrain = finaltrain._append({
                                        'Accuracy' : round(accuracy_score(TRUE, PRED)*100,3),
                                        'PrecisionTrain':round(precision_score(TRUE, PRED, average = 'weighted')*100,3),
                                        'RecallTrain':round(recall_score(TRUE, PRED, average = 'weighted')*100,3)  ,
                                        'F1Train':round(f1_score(TRUE, PRED, average = 'weighted')*100,3)}
                                      
                                        , ignore_index=True)
finaltrain

In [None]:

cm = confusion_matrix(TRUE, PRED)

row_sums = cm.sum(axis=1, keepdims=True)

# Divide each element in the confusion matrix by the corresponding row sum and multiply by 100 to get percentages
confusion_percentages = np.round(cm / row_sums * 100,2)

disp = ConfusionMatrixDisplay(confusion_matrix=confusion_percentages,display_labels=classes)

disp.plot(cmap="Blues", values_format='')
plt.title("Swin", fontsize=16)
plt.show()