In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, Subset
import matplotlib.pyplot as plt
from torchvision import transforms
import torchvision
import pandas as pd
from PIL import Image
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import fbeta_score

In [None]:
all_models = torchvision.models.list_models()
classification_models = torchvision.models.list_models(module=torchvision.models)
# print(f"all models: \n {all_models}")
print(f"classif models: \n {classification_models}")

In [None]:
# ResNet & DenseNet
# transform = transforms.Compose([
#     transforms.Resize(256),
#     transforms.CenterCrop(224),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
# ])

# EfficientNet_b1
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(240),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
df = pd.read_csv("data/train_classes.csv")
df

In [None]:
all_tags = set()
for tags in df['tags'].str.split():
    all_tags.update(tags)

In [None]:
tag_to_idx = {tag: idx for idx, tag in enumerate(sorted(all_tags))}
idx_to_tag = {idx: tag for tag, idx in tag_to_idx.items()}
print(tag_to_idx)
print(len(tag_to_idx))

In [None]:
class MultiLabelImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.df = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_name = self.df.iloc[idx, 0]
        img_path = os.path.join(self.img_dir, f"{img_name}.jpg")
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        tags = self.df.iloc[idx, 1].split()
        labels = torch.zeros(len(tag_to_idx))
        for tag in tags:
            labels[tag_to_idx[tag]] = 1
        
        return image, labels

In [None]:
def visualize_sample(dataset, idx):
    image, labels = dataset[idx]
    
    # convert the image tensor to a PIL Image for display
    if isinstance(image, torch.Tensor):
        image = transforms.ToPILImage()(image)
    
    # plot the image
    plt.figure(figsize=(10, 6))
    plt.imshow(image)
    plt.axis('off')
    
    # get the labels
    present_labels = [idx_to_tag[i] for i, label in enumerate(labels) if label == 1]
    
    # set the title with the labels
    plt.title(f"Labels: {', '.join(present_labels)}")
    plt.show()
    
    print(f"Image labels: {', '.join(present_labels)}")

In [None]:
dataset = MultiLabelImageDataset(csv_file="data/train_classes.csv", img_dir="data/train-jpg", transform=transform)

In [None]:
visualize_sample(dataset, 4)

In [None]:
train_idx, test_idx = train_test_split(list(range(len(dataset))), test_size=0.1, random_state=42)

train_dataset = Subset(dataset, train_idx)
test_dataset = Subset(dataset, test_idx)

In [None]:
batch_size = 64
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
import torchvision.models as models
from torch import nn

num_classes = 17

def ResNetClassifier(num_classes):
    # load a pre-trained model
    model_ft = models.resnet50(weights='DEFAULT')
    num_ftrs = model_ft.fc.in_features
    
    # freeze all the parameters in the network except the final layer
    # for param in model_ft.parameters():
    #     param.requires_grad = False
    
    # replace the last fully connected layer
    model_ft.fc = nn.Linear(num_ftrs, num_classes)
    return model_ft

def DenseNetClassifier(num_classes):
    # load a pre-trained model
    model_ft = models.densenet121(weights='DEFAULT')
    num_ftrs = model_ft.classifier.in_features
    
    # freeze all the parameters in the network except the final layer
    # for param in model_ft.parameters():
    #     param.requires_grad = False
    
    # replace the last fully connected layer
    model_ft.classifier = nn.Linear(num_ftrs, num_classes)
    return model_ft

def EfficientNetClassifier(num_classes):
    # load a pre-trained model
    model_ft = models.efficientnet_b1(weights='DEFAULT')
    # num_ftrs = model_ft.classifier.in_features
    
    # freeze all the parameters in the network except the final layer
    # for param in model_ft.parameters():
    #     param.requires_grad = False
    
    # replace the last fully connected layer
    model_ft.classifier = nn.Linear(1280, num_classes)
    return model_ft

In [None]:
# model = ResNetClassifier(num_classes)
# model = DenseNetClassifier(num_classes)
model = EfficientNetClassifier(num_classes)

model.to(device)
model

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 64 == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, f2 = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()

            # calculate f2 score
            pred_tags = torch.sigmoid(pred).cpu().numpy() > 0.24
            true_tags = y.cpu().numpy()
            f2 += fbeta_score(true_tags, pred_tags, beta=2, average='micro')

    test_loss /= num_batches
    f2 /= num_batches
    
    print(f"Test Error: \n f2 score: {f2:.5f}, avg loss: {test_loss:>8f} \n")
    return f2, test_loss


In [None]:
print(device)

In [None]:
learning_rate = 0.001
epochs = 5

In [None]:
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
all_loss = []
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    f2, test_loss = test_loop(test_dataloader, model, loss_fn)
    all_loss.append(test_loss)
print("Done!")

In [None]:
print(all_loss)

epochs_list = list(range(1, len(all_loss) + 1))
# print(len(all_loss))
# print(epochs)

plt.plot(epochs_list, all_loss, marker='o', color='b', label='Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
def predict_image(model, image_path, transform, idx_to_tag):
    model.eval()
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)
    image = image.to(device)

    with torch.no_grad():
        outputs = model(image)
        probabilities = torch.sigmoid(outputs)
        predicted = probabilities > 0.24
        predicted_labels = [idx_to_tag[i] for i, pred in enumerate(predicted[0]) if pred]

    return predicted_labels, probabilities[0]

In [None]:
image_path = "data/test-jpg/test_5689.jpg"
predicted_labels, probabilities = predict_image(model, image_path, transform, idx_to_tag)

print("Predicted labels:", predicted_labels)
print("Probabilities:")
for i, prob in enumerate(probabilities):
    if prob > 0.24:
        print(f"{idx_to_tag[i]}: {prob.item():.4f}")