In [1]:
import random
import numpy as np
import torch

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

In [2]:
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from PIL import Image
from sklearn.metrics import f1_score
from tqdm import tqdm

In [3]:
class ICAODataset(Dataset):
    def __init__(self, csv_file, root_dir, augment=False):
        self.df = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.augment = augment

        self.transform_common = T.Compose([
            T.Resize((224, 224)),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        self.transform_aug = T.Compose([
            T.RandomHorizontalFlip(p=0.5),
            T.Resize((224, 224)),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.df.iloc[idx, 0])
        label = int(self.df.iloc[idx, 1])
        image = Image.open(img_path).convert("RGB")

        if self.augment and label == 1:
            image = self.transform_aug(image)
        else:
            image = self.transform_common(image)

        return image, label

In [4]:
train_dataset = ICAODataset('dataset/train.csv', root_dir='dataset/train', augment=True)
test_dataset = ICAODataset('dataset/val.csv', root_dir='dataset/val', augment=False)

labels = train_dataset.df['label'].values
class_sample_count = np.array([len(np.where(labels == t)[0]) for t in np.unique(labels)])
weight = 1. / class_sample_count
samples_weight = np.array([weight[t] for t in labels])

sampler = WeightedRandomSampler(samples_weight, len(samples_weight), replacement=True)

train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
model = models.resnet50(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, 1)
model = model.to(device)

num_neg = np.sum(labels == 0)
num_pos = np.sum(labels == 1)
pos_weight = torch.tensor([num_neg / num_pos]).to(device)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)



In [6]:
def evaluate_model(model, data_loader, device):
    model.eval()
    preds, targets = [], []
    with torch.no_grad():
        for images, labels in data_loader:
            images = images.to(device)
            labels = labels.float().to(device)
            outputs = torch.sigmoid(model(images).squeeze(1))
            preds.extend((outputs > 0.5).int().cpu().numpy())
            targets.extend(labels.int().cpu().numpy())
    return f1_score(targets, preds)

In [7]:
best_f1 = 0
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=3, factor=0.5)

for epoch in range(10):
    model.train()
    running_loss = 0
    for images, labels in tqdm(train_loader):
        images = images.to(device)
        labels = labels.float().to(device)

        optimizer.zero_grad()
        outputs = model(images).squeeze(1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    f1 = evaluate_model(model, test_loader, device)
    print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}, F1: {f1:.4f}")

    scheduler.step(f1)

    if f1 > best_f1:
        best_f1 = f1
        torch.save(model.state_dict(), "resnet50_best_f1.pth")
        print(f"Saved new best model at Epoch {epoch+1} with F1: {f1:.4f}")

100%|██████████| 125/125 [01:05<00:00,  1.90it/s]


Epoch 1, Loss: 0.9889, F1: 0.2615
Saved new best model at Epoch 1 with F1: 0.2615


100%|██████████| 125/125 [01:05<00:00,  1.91it/s]


Epoch 2, Loss: 0.5002, F1: 0.2987
Saved new best model at Epoch 2 with F1: 0.2987


100%|██████████| 125/125 [01:05<00:00,  1.92it/s]


Epoch 3, Loss: 0.1860, F1: 0.3710
Saved new best model at Epoch 3 with F1: 0.3710


100%|██████████| 125/125 [01:05<00:00,  1.90it/s]


Epoch 4, Loss: 0.2407, F1: 0.2609


100%|██████████| 125/125 [01:05<00:00,  1.90it/s]


Epoch 5, Loss: 0.2312, F1: 0.2643


100%|██████████| 125/125 [01:04<00:00,  1.93it/s]


Epoch 6, Loss: 0.3078, F1: 0.3810
Saved new best model at Epoch 6 with F1: 0.3810


100%|██████████| 125/125 [01:06<00:00,  1.89it/s]


Epoch 7, Loss: 0.1384, F1: 0.2594


100%|██████████| 125/125 [01:05<00:00,  1.91it/s]


Epoch 8, Loss: 0.1883, F1: 0.3750


100%|██████████| 125/125 [01:06<00:00,  1.88it/s]


Epoch 9, Loss: 0.0837, F1: 0.4094
Saved new best model at Epoch 9 with F1: 0.4094


100%|██████████| 125/125 [01:05<00:00,  1.90it/s]


Epoch 10, Loss: 0.0306, F1: 0.3636


In [10]:
# Reload model
model = models.resnet50(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, 1)
model.load_state_dict(torch.load("resnet50_best_f1.pth"))
model = model.to(device)
model.eval()

# Evaluate
all_preds = []
all_labels = []

with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        outputs = model(images).squeeze(1)
        preds = (torch.sigmoid(outputs) > .5).int().cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(labels.numpy())

f1 = f1_score(all_labels, all_preds)
print(f"Test F1 Score (from saved model): {f1:.4f}")

  model.load_state_dict(torch.load("resnet50_best_f1.pth"))


Test F1 Score (from saved model): 0.4094
