In [None]:
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import models
from torchvision.datasets import ImageFolder
! pip install torchsummary
import torchsummary
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision
from torchvision import datasets, models, transforms, utils
from torchvision.transforms import v2
import cv2
import os
from glob import glob
from tqdm import tqdm
import shutil
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix , accuracy_score, classification_report
import seaborn as sns

In [None]:
train_df = pd.DataFrame({"path":[],"label":[], "class_id":[]})
train_path = './PALM/Train/PALM-Training400'
folder_list = ['./PALM/Train/PALM-Training400/'+ item for item in os.listdir(train_path)]
label_dict = {
    "H":0, 
    "N":0,
    "P":1,
}
for i in range(len(folder_list)):
    
        folder = folder_list[i].split('/')[-1][0]
        if folder == 'H':
            folder = 'N'
        new_data =pd.DataFrame({"path":folder_list[i],"label":folder, "class_id":label_dict[folder]}, index=[1])
        train_df = pd.concat([train_df, new_data], ignore_index=True)

train_df[["path"]] = train_df[["path"]].astype(str)
train_df[["label"]] = train_df[["label"]].astype(str)
train_df[["class_id"]] = train_df[["class_id"]].astype(int)

In [None]:
val_df = pd.DataFrame({"path":[],"label":[], "class_id":[]})
val_path = r'./PALM/Validation/Validation-400'
folder_list = [r'./PALM/Validation/Validation-400/'+ item for item in os.listdir(val_path)]
val_labels = pd.read_excel(r'./PALM/Validation/PM_Label_and_Fovea_Location.xlsx')
val_labels_list = val_labels['Label'].values.tolist()
val_labels_name = {
    1:'P',
    0:'N'
}
for i in range(len(folder_list)):
        folder = folder_list[i].split('/')[-1][0]
        new_data =pd.DataFrame({"path":folder_list[i],"label":val_labels_name[val_labels_list[i]], "class_id":val_labels_list[i]}, index=[1])
        val_df = pd.concat([val_df, new_data], ignore_index=True)

val_df[["path"]] = val_df[["path"]].astype(str)
val_df[["label"]] = val_df[["label"]].astype(str)
val_df[["class_id"]] = val_df[["class_id"]].astype(int)

In [None]:
test_df = pd.DataFrame({"path":[],"label":[], "class_id":[]})
test_path = r'./PALM/Test/PALM-Testing400-Images'
folder_list = [r'./PALM/Test/PALM-Testing400-Images/'+ item for item in os.listdir(test_path)]
test_labels = pd.read_excel(r'./PALM/Test/PM_Label_and_Fovea_Location.xlsx')
test_labels_list = test_labels['Label'].values.tolist()
test_labels_name = {
    1:'P',
    0:'N'
}
for i in range(len(folder_list)):
        folder = folder_list[i].split('/')[-1][0]
        new_data =pd.DataFrame({"path":folder_list[i],"label":test_labels_name[test_labels_list[i]], "class_id":test_labels_list[i]}, index=[1])
        test_df = pd.concat([test_df, new_data], ignore_index=True)

test_df[["path"]] = test_df[["path"]].astype(str)
test_df[["label"]] = test_df[["label"]].astype(str)
test_df[["class_id"]] = test_df[["class_id"]].astype(int)

In [None]:
show_imgs = 15
idx = np.random.randint(0, len(train_df),size=show_imgs)
fig, axes = plt.subplots(show_imgs//5, 5, figsize=(15,10))
axes = axes.flatten()
for i, ax in enumerate(axes):
    full_path = train_df.loc[idx[i]]['path']
    ax.imshow(plt.imread(full_path))
    ax.set_title(train_df.loc[idx[i]]['label'])
    ax.set_axis_off()

In [None]:
show_imgs = 15
idx = np.random.randint(0, len(test_df),size=show_imgs)
fig, axes = plt.subplots(show_imgs//5, 5, figsize=(15,10))
axes = axes.flatten()
for i, ax in enumerate(axes):
    full_path = test_df.loc[idx[i]]['path']
    ax.imshow(plt.imread(full_path))
    ax.set_title(test_df.loc[idx[i]]['label'])
    ax.set_axis_off()

In [None]:
train_transforms = v2.Compose([
    v2.Resize(256),
    v2.RandomResizedCrop(size=(224, 224), antialias=True),
    v2.RandomHorizontalFlip(p=0.5),
    v2.RandomVerticalFlip(p=0.5),
    v2.RandomAffine(degrees=(-10, 10), translate=(0.1, 0.1), scale=(0.9, 1.1)),
    v2.RandomErasing(p=0.5, scale=(0.1,0.15)),
    v2.PILToTensor(),
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),

])

val_transforms = v2.Compose([
    v2.Resize((224,224)),
    v2.PILToTensor(),
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
])

test_transforms = v2.Compose([
    v2.Resize((224,224)),
    v2.PILToTensor(),
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
])

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, dataframe, transforms_):
        self.df = dataframe
        self.transforms_ = transforms_
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        image_path = self.df.iloc[index]['path']
        # img = Image.open(image_path).convert("LA")
        img = Image.open(image_path).convert("RGB")
        # img = Image.open(image_path)
        transformed_img = self.transforms_(img)
        class_id = self.df.iloc[index]['class_id'].tolist()
        return transformed_img, class_id

In [None]:
device = torch.device("cuda" if torch.cuda.is_available else "cpu")
# device = 'cpu'
train_dataset = MyDataset(train_df, train_transforms)
val_dataset = MyDataset(val_df, val_transforms)
test_dataset = MyDataset(test_df, test_transforms)

BATCH_SIZE = 8
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE,shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
class Attention(nn.Module):
    def __init__(self, input_dim):
        super(Attention, self).__init__()
        self.W = nn.Linear(input_dim, 1)

    def forward(self, x):
        attn_weights = F.softmax(self.W(x), dim=1)
        output = attn_weights * x
        return output

In [None]:
class_size = 1
model = models.efficientnet_v2_l(weights='DEFAULT')
model.classifier[1] = nn.Sequential(
    Attention(1280),
    nn.Linear(1280, 64),
    nn.ReLU(),
    nn.Linear(64, 1),
    nn.Sigmoid()
)
model(torch.randn((16,3,256,256))).shape

In [None]:
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, matthews_corrcoef, recall_score, precision_score
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression

def comp_result(y_test, y_pred, y_proba):
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_proba)
    mcc = matthews_corrcoef(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)

    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"AUC: {auc:.4f}")
    print(f"MCC: {mcc:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"Precision: {precision:.4f}")
    
    return accuracy,f1,auc,mcc,recall,precision

In [None]:
def train(dataloader, model, loss_fn, optimizer, lr_scheduler):
    size = len(dataloader.dataset) # number of samples
    num_batches = len(dataloader) # batches per epoch
    model.train()
    epoch_loss = 0.0
    correct_predictions = 0 
    for (data_,target_) in dataloader:
        target_ = target_.type(torch.float32)
#         print(target_.shape)
#         target_ = torch.tensor(target_.reshape(-1,1))
#         print(target_)
        data_, target_ = data_.to(device), target_.to(device)
        target_ = target_.reshape(-1,1)
        optimizer.zero_grad()
        
        # Forward propagation
        outputs = model(data_)
        loss = loss_fn(outputs,target_)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss = epoch_loss + loss.item()
        
        predictions = (outputs > 0.5).float() 
        correct_predictions += (predictions.to('cpu') == target_.to('cpu')).sum().item()
    lr_scheduler.step()
    return correct_predictions/size, epoch_loss/num_batches


def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset) 
    num_batches = len(dataloader) 
    epoch_loss = 0.0
    correct_predictions = 0 
    predict_labels_com = []
    test_labels_com = []
    test_outputs_com = []
    with torch.no_grad():
        # This will disable backward propagation
        model.eval()
        for (data_,target_) in dataloader:
            target_ = target_.type(torch.float32)
            
            data_, target_ = data_.to(device), target_.to(device)
            target_ = target_.reshape(-1,1)
            # Forward propagation
            outputs = model(data_)
            
            # Computing loss 
            loss = loss_fn(outputs,target_)
            # Computing statistics.
            epoch_loss = epoch_loss + loss.item()
            predictions = (outputs > 0.5).float() 
            correct_predictions += (predictions.to('cpu') == target_.to('cpu')).sum().item()
            
            predict_labels_com.extend(predictions.tolist())
            test_labels_com.extend(target_.tolist())
            test_outputs_com.extend(outputs.tolist())
            
        predict_labels_com = np.array(predict_labels_com)
        test_labels_com = np.array(test_labels_com)
        test_outputs_com = np.array(test_outputs_com)
        accuracy,f1,auc,mcc,recall,precision = comp_result(test_labels_com,predict_labels_com,test_outputs_com)
            
    return  accuracy,f1,auc,mcc,recall,precision

In [None]:
import os
model.to(device)
EPOCHS = 50

logs = {
    'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []
}

criterion  = nn.BCELoss()


# Optimizer which will use gradients to train model.
learning_rate = 0.0001
momentum = 0.9
weight_decay = 0.1
# optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, betas=(0.9, 0.999), eps=1e-08, weight_decay=weight_decay, amsgrad=False)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
# optimizer = torch.optim.Adam(model.parameters(), lr=0.0007)
lr_milestones = [7, 14, 21, 28, 35]
multi_step_lr_scheduler = lr_scheduler.MultiStepLR(optimizer, milestones=lr_milestones, gamma=0.1)

# Earlystopping
patience = 5
counter = 0
best_acc = -1
best_auc = -1
for epoch in tqdm(range(EPOCHS)):
    train_acc, train_loss,  = train(train_loader, model, criterion, optimizer, multi_step_lr_scheduler)
    print("val===================",epoch)
    val_accuracy,val_f1,val_auc,val_mcc,val_recall,val_precision = test(val_loader, model, criterion)
    print('=======================')
    print(f'EPOCH: {epoch} \
    train_loss: {train_loss:.4f}, train_acc: {train_acc:.3f} \
    val_loss: {val_loss:.4f}, val_acc: {val_acc:.3f} \
    Learning Rate: {optimizer.param_groups[0]["lr"]}')
    
    logs['train_loss'].append(train_loss)
    logs['train_acc'].append(train_acc)
    logs['val_loss'].append(val_auc)
    logs['val_acc'].append(val_accuracy)