# Genre classification using ResNet152

In [None]:
from copy import deepcopy
from PIL import Image
from time import time
from typing import Tuple

import pandas as pd
import torch
import torch.nn as nn
from torch.nn import Module
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.dataset import random_split
from torchvision import transforms
from torchvision.models import resnet152

torch.manual_seed(7)

In [None]:
DIR_TRAIN = "../data"

## Preprocessing

In [None]:
class MultimodalDataset(Dataset):
    def __init__(self, path_x, path_y, img_dir) -> None:
        super().__init__()
        self.img_dir = img_dir
        self.df = pd.read_csv(path_x).drop(["Id"], axis=1)
        self.labels = pd.read_csv(path_y).drop(["Id"], axis=1)
        return
    
    def __len__(self) -> int:
        return self.labels.shape[0]

    def __getitem__(self, index) -> Tuple[str, str, int]:
        text = self.df["Title"].iloc[index]
        image = Image.open(f"{self.img_dir}/{self.df['Cover_image_name'].iloc[index]}")
        label = self.labels["Genre"].iloc[index]
        return (text, image, label)

In [None]:
pipeline_label = lambda x: int(x)
def pipeline_image(image):
    preprocess = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    return preprocess(image)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def collate_batch(batch):
    list_image, list_label = list(), list()
    for (__, image, label) in batch:
        list_image.append(pipeline_image(image))
        list_label.append(pipeline_label(label))
    list_image = torch.stack(list_image)
    list_label = torch.tensor(list_label, dtype=torch.long)
    return list_image.to(device), list_label.to(device)

## Model

In [None]:
class Book_genre(Module):
    def __init__(self) -> None:
        super().__init__()
        # Image
        self.resnet = resnet152(weights="ResNet152_Weights.IMAGENET1K_V1")
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])
        for param in self.resnet.parameters():
            param.requires_grad = False
        # NN
        self.fc1 = nn.Linear(2048, 128)
        self.fc2 = nn.Linear(128, 30)
        return

    def forward(self, images):
        x = self.resnet(images)
        x = x.squeeze()
        x = self.fc1(x).relu()
        x = self.fc2(x)
        return x

In [None]:
def evaluate(model, dataloader):
    model.eval()
    acc, count = 0, 0
    list_predictions = list()
    with torch.no_grad():
        for images, labels in dataloader:
            predictions = model(images)
            acc += (predictions.argmax(1) == labels).sum().item()
            count += labels.size(0)
            list_predictions.append(predictions.argmax(1))
    return acc/count, torch.cat(list_predictions)

In [None]:
def train(model, dataloader_train, dataloader_val, optimizer, criterion, max_epochs:int =10):
    count = 0
    log_interval = 100
    PATIENCE = 10
    patience = 0
    train_acc = 0
    best_val_acc = 0
    best_model = None

    for epoch in range(1, max_epochs+1):
        for index, (images, labels) in enumerate(dataloader_train):
            model.train()
            optimizer.zero_grad()
            predictions = model(images)
            loss = criterion(predictions, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
            optimizer.step()
            train_acc += (predictions.argmax(1) == labels).sum().item()
            count += labels.size(0)
            if index % log_interval == 0:
                val_acc, __ = evaluate(model, dataloader_val)
                print(
                    f"Epoch: {epoch}/{max_epochs:2d}"
                    f" | Batch: {index:4d}/{len(dataloader_train)}"
                    f" | Train Acc: {100 * train_acc / count:4.5f} %"
                    f" | Val Acc: {100 * val_acc:4.5f} %"
                )
                if val_acc >= best_val_acc:
                    best_val_acc = val_acc
                    patience = 0
                    best_model = deepcopy(model)
                else:
                    patience += 1
                if patience == PATIENCE:
                    print("[STOP] Patience expired.")
                    return best_model
        print("#" * 75)
    print("[STOP] Maximum epochs reached.")
    return best_model

## Training

In [None]:
EPOCHS = 50
LR = 1e-4
BATCH_SIZE = 64

model = Book_genre().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

dataset_train = MultimodalDataset(
    path_x=f"{DIR_TRAIN}/train_x.csv",
    path_y=f"{DIR_TRAIN}/train_y.csv",
    img_dir=f"{DIR_TRAIN}/images/"
)
val_split_idx = int(0.95 * len(dataset_train))
split_train_, split_val_ = random_split(dataset_train, [val_split_idx, len(dataset_train) - val_split_idx])
dataloader_train = DataLoader(
    dataset=split_train_,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_batch
)
dataloader_val = DataLoader(
    dataset=split_val_,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_batch
)

In [None]:
best_model = train(
    model=model,
    dataloader_train=dataloader_train,
    dataloader_val=dataloader_val,
    optimizer=optimizer,
    criterion=criterion,
    max_epochs=EPOCHS
)
torch.save(best_model, f"resnet_{int(time())}.pt")

## Test

In [None]:
dataset_test = MultimodalDataset(
    path_x=f"{DIR_TRAIN}/non_comp_test_x.csv",
    path_y=f"{DIR_TRAIN}/data/non_comp_test_y.csv",
    img_dir=f"{DIR_TRAIN}/images"
)

dataloader_test = DataLoader(
    dataset=dataset_test,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_batch
)
acc_test, predictions_test = evaluate(model=model, dataloader=dataloader_test)
print(f"\nTest accuracy: {100 * acc_test:.4f} %")

# Save output to disk.
df = pd.DataFrame({
    "Id": torch.Tensor(range(predictions_test.size(0))),
    "Genre": predictions_test.to("cpu")
})
df.to_csv("resnet_non_comp_test_pred_y.csv", index=False)