In [17]:
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import os

import torch
from torch.utils.data import Dataset
import torch.optim as optim
import torch.nn as nn
from torchvision import transforms

from sklearn.metrics import classification_report, f1_score
from sklearn import svm, metrics

import pandas as pd

In [3]:
# Define hyperparameters
BATCH_SIZE = 128
LEARNING_RATE = 0.001

In [4]:
# Set the device
device = "cpu"
if torch.cuda.is_available():
    device =  "cuda"
elif torch.backends.mps.is_available():
    device = "mps" # Use M1 Mac GPU
print(device)

mps


In [14]:
class ExtractorClassifierDataset(Dataset):
    def __init__(self, data, resize_shape = (256,256)):
        self.data = data
        self.to_tensor = transforms.ToTensor()
        self.resize_image = transforms.Resize(resize_shape, antialias=True)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image, label = self. data[idx]
        image_tensor = self.to_tensor(image)
        image_tensor = self.resize_image(image_tensor)
        image_tensor = image_tensor/255
        label_tensor = torch.zeros(2)
        label_tensor[label] = 1
        return image_tensor, label_tensor

data_dir = "../../../datasets/extractor_classifier/"

train_list = []
validation_list = []
test_list = []

for class_name in ("relevant", "not_relevant"):
    if class_name == "relevant":
        label = 0
    else:
        label = 1

    for image in os.listdir(f"{data_dir}/train/{class_name}"):
        src_path = f"{data_dir}/train/{class_name}/{image}"
        train_list.append((Image.open(src_path), label))

    for image in os.listdir(f"{data_dir}/validation/{class_name}"):
        src_path = f"{data_dir}/validation/{class_name}/{image}"
        validation_list.append((Image.open(src_path), label))

    for image in os.listdir(f"{data_dir}/test/{class_name}"):
        src_path = f"{data_dir}/test/{class_name}/{image}"
        test_list.append((Image.open(src_path), label))

# Define the train/validation/test datasets
train_data = ExtractorClassifierDataset(train_list)
validation_data = ExtractorClassifierDataset(validation_list)
test_data = ExtractorClassifierDataset(test_list)

In [15]:
# Define the dataloaders for each dataset
train_loader = torch.utils.data.DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, )
validation_loader = torch.utils.data.DataLoader(validation_data, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=True)

In [8]:
from torchvision.models import resnet50

class Resnet50Model(nn.Module):
    def __init__(self, pretrained=True):
        super(Resnet50Model, self).__init__()
        self.resnet_model = resnet50(pretrained=pretrained)
        self.fc = nn.Linear(in_features=1000, out_features=2)

    def forward(self, x):
        x = self.resnet_model(x)
        x = self.fc(x)
        return x

In [9]:
def train(model, train_loader, valid_loader, test_loader, criterion, optimizer, epochs):
    # Move the model to the device
    model.to(device)

    # Define variables to track the best validation accuracy and the corresponding model state
    best_valid_acc = 0.0
    best_model_state = None
    train_loss_values = []
    valid_loss_values = []
    train_acc_values = []
    valid_acc_values = []

    for epoch in range(epochs):
        # Train the model for one epoch
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        model.train()
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            _, labels = torch.max(labels.data, 1)
            train_correct += (predicted == labels).sum().item()
            train_total += labels.size(0)
        train_loss = train_loss / len(train_loader.dataset)
        train_loss_values.append(train_loss)
        train_acc = train_correct / train_total
        train_acc_values.append(train_acc)

        # Evaluate the model on the validation set
        valid_loss = 0.0
        valid_correct = 0
        valid_total = 0
        model.eval()
        with torch.no_grad():
            for images, labels in valid_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                valid_loss += loss.item() * images.size(0)
                _, predicted = torch.max(outputs.data, 1)
                _, labels = torch.max(labels.data, 1)
                valid_correct += (predicted == labels).sum().item()
                valid_total += labels.size(0)
        valid_loss /= len(valid_loader.dataset)
        valid_acc = valid_correct / valid_total
        valid_loss_values.append(valid_loss)
        valid_acc_values.append(valid_acc)

        print(f"Epoch {epoch+1}/{epochs} - "
              f"Train Loss: {train_loss:.4f} - Train Acc: {train_acc:.4f} - "
              f"Valid Loss: {valid_loss:.4f} - Valid Acc: {valid_acc:.4f}")

        validate_model(model)

        # Save the model state if the current validation accuracy is better than the previous best
        if valid_acc > best_valid_acc:
            best_valid_acc = valid_acc
            best_model_state = model.state_dict()

    # Load the best model state and evaluate on the test set
    model.load_state_dict(best_model_state)
    model.eval()
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            _, labels = torch.max(labels.data, 1)
            test_correct += (predicted == labels).sum().item()
            test_total += labels.size(0)
    test_acc = test_correct / test_total

    print(f"Test Acc: {test_acc:.4f}")

    return model, train_loss_values, train_acc_values, valid_loss_values, valid_acc_values

def validate_model(model):
    model.to(device)
    y_validation = []
    y_pred = []
    model.eval()
    with torch.no_grad():
        for images, labels in validation_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            _, labels = torch.max(labels.data, 1)
            y_validation.extend(labels.detach().cpu().numpy())
            y_pred.extend(predicted.detach().cpu().numpy())
    print(f"Accuracy: {metrics.accuracy_score(y_validation, y_pred)}")
    print(classification_report(y_validation, y_pred))

In [None]:
resnet_model = Resnet50Model()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet_model.parameters(), lr=LEARNING_RATE)

if os.path.isfile("model_results/resnet50_model_50.pt"):
    resnet_model.load_state_dict(torch.load("model_results/resnet50_model_50.pt", map_location=torch.device(device)))
    validate_model(resnet_model)
else:
    print("Train model")
    model, train_loss_values, train_accuracy_values, validation_loss_values, validation_accuracy_values = train(resnet_model, train_loader, validation_loader, test_loader, criterion, optimizer, 50)
    torch.save(model.state_dict(), "model_results/resnet50_model_50.pt")
    np.save("model_results/resnet50_model_50_train_loss", train_loss_values)
    np.save("model_results/resnet50_model_50_train_acc", train_accuracy_values)
    np.save("model_results/resnet50_model_50_valid_loss", validation_loss_values)
    np.save("model_results/resnet50_model_50_valid_acc", validation_accuracy_values)