In [None]:
import torch
import torchvision
import copy
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import timm

from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from torchvision import models
from torch.optim import lr_scheduler
from tqdm import tqdm
from sklearn.metrics import f1_score, roc_auc_score

In [24]:
class AddNoise(object):
    def __init__(self, noise_level):
        self.noise_level = noise_level

    def __call__(self, img):
        img_tensor = transforms.functional.to_tensor(img)
        noise = torch.rand_like(img_tensor) * self.noise_level
        noisy_img = img_tensor + noise
        return transforms.functional.to_pil_image(noisy_img)

In [25]:
SIZE = 224
COLOR_DEVIATION = 0.01
image_transforms = {
    'train': transforms.Compose([
        transforms.Resize(size=SIZE),
        AddNoise(0.01),
        transforms.ColorJitter(brightness=(1.0-COLOR_DEVIATION,1.0+COLOR_DEVIATION),contrast=(1.0-COLOR_DEVIATION,1.0+COLOR_DEVIATION),saturation=(1.0-COLOR_DEVIATION,1.0+COLOR_DEVIATION),hue=(-1*COLOR_DEVIATION,COLOR_DEVIATION)),
        transforms.RandomRotation(degrees=0),
        transforms.RandomAffine(degrees=0),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'validation': transforms.Compose([
        transforms.Resize(size=SIZE),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        transforms.Resize(size=SIZE),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
}

In [26]:
root = 'eyepac-light-v2-512-jpg/'
train_directory = root + 'train'
test_directory = root + 'test'
validation_directory = root + 'validation'

# batch size
batch_size = 4

# Load Data from folders
data = {
    'train': datasets.ImageFolder(root=train_directory, transform=image_transforms['train']),
    'validation': datasets.ImageFolder(root=validation_directory, transform=image_transforms['validation']),
    'test': datasets.ImageFolder(root=test_directory, transform=image_transforms['test'])
}

In [27]:
train_data_size = len(data['train'])
validation_data_size = len(data['validation'])
test_data_size = len(data['test'])

train_data = DataLoader(data['train'], batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
validation_data = DataLoader(data['validation'], batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)
test_data = DataLoader(data['test'], batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)

In [28]:
print("cuda:0" if torch.cuda.is_available() else "cpu")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

cuda:0


In [None]:
def get_model(model_name: str):
    match model_name:
        case "mobilenetv3-large": # 8 min/epoch
            model_ft = models.mobilenet_v3_large(weights=torchvision.models.MobileNet_V3_Large_Weights.DEFAULT, progress=True)
            model_ft.classifier[-1] = nn.Linear(1280, 2)
        case "mobilenetv3-small": # 8
            model_ft = models.mobilenet_v3_small(weights=torchvision.models.MobileNet_V3_Small_Weights.DEFAULT, progress=True)
            model_ft.classifier[-1] = nn.Linear(1024, 2)
        case "efficientnet-b0": # 8
            model_ft = models.efficientnet_b0(weights='IMAGENET1K_V1', progress=True)
            model_ft.classifier[-1] = nn.Linear(1280, 2)
        case "resnet18": # 8
            model_ft = models.resnet18(weights='IMAGENET1K_V1', progress=True)
            model_ft.fc = nn.Linear(512, 2)
        case "efficientnet-b3":  # 20
            model_ft = models.efficientnet_b3(weights='IMAGENET1K_V1', progress=True)
            model_ft.classifier[-1] = nn.Linear(1536, 2)
        case "squeezenet": # 6.3
            model_ft = models.squeezenet1_0(weights='IMAGENET1K_V1', progress=True)
            model_ft.classifier[1] = nn.Conv2d(512, 2, kernel_size=(1,1), stride=(1,1))
            model_ft.num_classes = 2
        case "vit-tiny": # 2.3
            model_ft = timm.create_model('vit_tiny_patch16_224', pretrained=True, num_classes=2)
        case "densenet121":
            model_ft = models.densenet121(weights='IMAGENET1K_V1', progress=True)
            model_ft.classifier = nn.Linear(1024, 2)
        case "convnext-tiny": # 18
            model_ft = timm.create_model('convnext_tiny', pretrained=True, num_classes=2)
        case _:
            raise ValueError(f"Unknown model: {model_name}")
    
    return model_ft

In [30]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=20):
    best_acc = 0.0
    
    train_metrics = []

    for epoch in range(num_epochs):
        # Create a progress bar for the epoch
        epoch_progress = tqdm(total=len(train_data), desc=f'Epoch {epoch + 1}/{num_epochs}', position=0, leave=False)

        for phase in ['train', 'validation']:
            running_loss = 0.0
            running_corrects = 0
            all_labels, all_preds = [], []

            # Set model mode
            if phase == 'train':
                model.train()
                dataloader = train_data
            else:
                model.eval()
                dataloader = validation_data
            
            # Iterate over data
            for inputs, labels in dataloader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0) / len(data[phase])
                running_corrects += torch.sum(preds == labels.data).double() / len(data[phase])
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

                # Update progress bar
                epoch_progress.set_postfix(phase=phase, loss=loss.item(), acc=running_corrects.item())
                epoch_progress.update()

            # Adjust learning rate
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss
            epoch_acc = running_corrects
            epoch_f1 = f1_score(all_labels, all_preds, average='binary')
            epoch_auc = roc_auc_score(all_labels, all_preds)
            epoch_vram = torch.cuda.memory_allocated() / 1024**2
            
            train_metrics.append({'Loss': epoch_loss.real, 'Acc': epoch_acc.item(), 'F1': epoch_f1, 'Auc': epoch_auc, 'VRAM': epoch_vram})

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # Update best accuracy
            if phase == 'validation' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        epoch_progress.close()

    # Test the model
    model.load_state_dict(best_model_wts)
    model.eval()
    test_corrects = 0

    for inputs, labels in test_data:
        inputs = inputs.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            test_corrects += torch.sum(preds == labels.data)

    test_acc = test_corrects.double() / test_data_size
    print('Test Accuracy: {:.4f}'.format(test_acc))

    return model, train_metrics

In [None]:
models_names = ["mobilenetv3-large", "mobilenetv3-small", "efficientnet-b0", "resnet18",
                "efficientnet-b3", "squeezenet", "densenet121", "vit-tiny", "convnext-tiny"]
model_name = models_names[6]

# load pretrained model
model_ft = get_model(model_name)

model_ft = model_ft.to(device)
criterion = nn.CrossEntropyLoss()

optimizer_ft = optim.Adam(model_ft.parameters(), lr=0.001)

# Decay LR by a factor of 0.1 every 2 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=3, gamma=0.1)

# Calculate the number of parameters
total_params = sum(p.numel() for p in model_ft.parameters())
print("Total number of parameters: ", total_params)

# Train the model
model_ft, train_metrics = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=5)
train_results = pd.DataFrame(columns=['Loss', 'Acc', 'F1', 'Auc', 'VRAM'], data=train_metrics)

Total number of parameters:  6955906


Epoch 1/5: 2004it [04:13, 10.50it/s, acc=0.0247, loss=0.61, phase=validation]                            

train Loss: 0.7071 Acc: 0.5152


                                                                              

validation Loss: 1.0868 Acc: 0.5338


Epoch 2/5: 2003it [04:35, 12.50it/s, acc=0.0208, loss=0.691, phase=validation]                           

train Loss: 0.6953 Acc: 0.5164


                                                                              

validation Loss: 0.7676 Acc: 0.5000


Epoch 3/5:  40%|████      | 808/2000 [01:41<02:29,  7.95it/s, acc=0.204, loss=0.706, phase=train] 

KeyboardInterrupt: 

In [None]:
torch.save(model_ft.state_dict(), f'{model_name}_weiths.pth')
train_results.to_csv(f"{model_name}_train_metrics.csv")