# Download data from Kaggle

In [None]:
from google.colab import userdata
import os
os.environ['KAGGLE_USERNAME'] = userdata.get('KAGGLE_USERNAME')
os.environ['KAGGLE_KEY'] = userdata.get('KAGGLE_KEY')

!kaggle datasets download -d https://www.kaggle.com/datasets/maparla/prado-museum-pictures
!unzip /content/prado-museum-pictures.zip -d /content/prado-museum-pictures/ 

Delete useless jpgs found in the images folder 

In [None]:
images_folder = '/content/prado-museum-pictures/images/images'

u_files = [useless_file 
           for useless_file in os.listdir(images_folder)
           if '._' in useless_file]

for u_file in u_files:
    os.remove(os.path.join(images_folder, u_file))

In [None]:
import pandas as pd

In [None]:
prado = pd.read_csv('/content/prado-museum-pictures/prado.csv')

In [None]:
len(prado)

# Process supports

Drop na

In [None]:
prado = prado.dropna(subset=['technical_sheet_soporte'])

Aggregate the paper, canvas and wood supported workarts

In [None]:
prado.loc[:,'technical_sheet_soporte'] = prado.loc[:,'technical_sheet_soporte'].apply(lambda x: 'Papel' if x.startswith('Papel') else x)
prado.loc[:,'technical_sheet_soporte'] = prado.loc[:,'technical_sheet_soporte'].apply(lambda x: 'Lienzo' if x.startswith('Lienzo') else x)
prado.loc[:,'technical_sheet_soporte'] = prado.loc[:,'technical_sheet_soporte'].apply(lambda x: 'Tabla' if x.startswith('Tabla') else x)

In [None]:
prado.technical_sheet_soporte.value_counts().sort_values(ascending=False)[:10]

Filter to keep the three most frequent supports and then undersample to balance the dataset

In [None]:
# Filter the dataframe to keep only the first three supports
filtered_prado = prado[prado['technical_sheet_soporte'].isin(['Papel', 'Tabla', 'Lienzo'])]

# Calculate smaller frequency 
min_occurrences = filtered_prado.groupby('technical_sheet_soporte')['technical_sheet_soporte'].count().min()

# Undersample each support to the minimum count
undersampled_prado = filtered_prado.groupby('technical_sheet_soporte').apply(lambda x: x.sample(min_occurrences)).reset_index(drop=True)

Plot the number of samples

In [None]:
import matplotlib.pyplot as plt

# Assuming filtered_prado is your filtered DataFrame
# Count the number of observations for each class
counts_before = filtered_prado['technical_sheet_soporte'].value_counts().sort_values(ascending=False)

# Define the counts after sampling (each class has 733 samples)
classes = ['Papel', 'Lienzo', 'Tabla']
counts_after = pd.Series([733, 733, 733], index=classes)

# Create subplots
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

# Plot the counts before sampling
counts_before.plot(kind='bar', ax=axes[0], color=['lightcoral', 'lightskyblue', 'lightgreen'])
axes[0].set_title('Number of observations before sampling')
axes[0].set_ylabel('Count')
axes[0].set_xlabel('Support')
axes[0].spines['top'].set_visible(False)
axes[0].spines['right'].set_visible(False)
axes[0].spines['left'].set_visible(False)
axes[0].spines['bottom'].set_visible(False)

# Plot the counts after sampling
counts_after.plot(kind='bar', ax=axes[1], color=['lightcoral', 'lightskyblue', 'lightgreen'])
axes[1].set_title('Number of observations after sampling')
axes[1].set_ylabel('Count')
axes[1].set_xlabel('Support')
axes[1].spines['top'].set_visible(False)
axes[1].spines['right'].set_visible(False)
axes[1].spines['left'].set_visible(False)
axes[1].spines['bottom'].set_visible(False)

# Display the plots
plt.tight_layout()
plt.show()

# Create the dataset

Create a column for matching the filenames

In [None]:
undersampled_prado['image_file_name'] = undersampled_prado.work_image_url.apply(lambda x: os.path.basename(x))

Rename the support column and create the ultimate df with only filename and support type

In [None]:
undersampled_prado.rename({'technical_sheet_soporte': 'support'}, axis=1, inplace=True)
processed_prado = undersampled_prado[['image_file_name', 'support']]

In [None]:
f'{len(processed_prado)} images in the processed dataset'

Save a .feather file

In [None]:
processed_prado.to_feather('/content/prado-museum-pictures/processed_prado.feather')

# Images organization

Create the folders tree divided by training | validation | test and the resepctive supports

In [1]:
import shutil
from sklearn.model_selection import train_test_split

# Paths
data_path = "/content/prado-museum-pictures/images/images"
processed_prado = "/content/prado-museum-pictures/processed_prado.feather"
base_dir = "/content/prado-museum-pictures/dataset"

# Load the .feather file
df = pd.read_feather(processed_prado)

os.makedirs(base_dir, exist_ok=True)
# Define train and validation directories
train_dir = os.path.join(base_dir, 'train')
validation_dir = os.path.join(base_dir, 'validation')

# Create directories for each support type
support_types = df['support'].unique()
for support in support_types:
    os.makedirs(os.path.join(train_dir, support), exist_ok=True)
    os.makedirs(os.path.join(validation_dir, support), exist_ok=True)

# Split the dataset into training and validation sets
train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['support'], random_state=42)

# Function to move files to the respective directories
def move_files_and_return_missing(file_names, src_dir, dest_dir, support_type):
    missing_files = []
    for file_name in file_names:
        src_path = os.path.join(src_dir, file_name)
        dest_path = os.path.join(dest_dir, support_type, file_name)
        try:
            shutil.copy(src_path, dest_path)
        except:
            missing_files.append(file_name)
            print(file_name, ' missing')
    
    return missing_files

# Move training files
for support in support_types:
    train_files = train_df[train_df['support'] == support]['image_file_name'].tolist()
    missing_train_files = move_files_and_return_missing(train_files, data_path, train_dir, support)

# Move validation files
for support in support_types:
    val_files = val_df[val_df['support'] == support]['image_file_name'].tolist()
    missing_val_files = move_files_and_return_missing(val_files, data_path, validation_dir, support)

print("Dataset organized successfully.")

4230ce8a-ff28-41a7-b5c6-932b86b1a62c.jpg
28212471-a0d5-483e-80a9-b4ad009bf25b.jpg
e7573626-3a12-4acd-b7eb-1db4a6d9aa42.jpg
86d75c9b-af1c-4e52-86fe-9ee9f7987ae8.jpg
f91fbc09-e595-48fe-b8af-e3636a52acfd.jpg
Dataset organized successfully.


In [None]:
val_dir = os.path.join(base_dir, 'validation')
test_dir = os.path.join(base_dir, 'test')

# Create the test folder
os.makedirs(test_dir, exist_ok=True)

# Function to move images from a specific category
def move_to_test(support):
    src_subdir = os.path.join(val_dir, support)
    dst_subdir = os.path.join(test_dir, support)
    
    # Ensure the destination subdir exists
    os.makedirs(dst_subdir, exist_ok=True)
    
    num_of_images_to_move = 34 if support == 'Papel' else 33
    # Move 33 images from the source subdir to the destination subdir excpet for the 'Papel' category
    # Total images in the test set 100 = 33+33+34
    for filename in os.listdir(src_subdir)[:num_of_images_to_move]:
        if filename.endswith(('.jpg')):
            src_file = os.path.join(src_subdir, filename)
            dst_file = os.path.join(dst_subdir, filename)
            shutil.move(src_file, dst_file)
            print(f'Moved {filename} from {support}')

# Call the function for each support
support_types = ['Lienzo', 'Papel', 'Tabla']
for support in support_types:
    move_to_test(support)

Delete the rows with missing images to update the feather file

In [2]:
values_to_drop = missing_train_files + missing_val_files
updated_prado = processed_prado.loc[~processed_prado['image_file_name'].isin(values_to_drop)]
updated_prado = updated_prado.reset_index(drop=True)
updated_prado.to_feather('/content/prado-museum-pictures/processed_prado.feather')

# Modeling

Setup

In [None]:
!pip install dropblock optuna torchviz

In [6]:
# General
from tqdm.notebook import tqdm
from PIL import Image
from collections import defaultdict

# Deep learning
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from dropblock import DropBlock2D
from torchviz import make_dot

# HPT
import optuna
import pickle

# Image processing
from torchvision import datasets, utils
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

In [None]:
shutil.rmtree('sample_data')

## Data loading

In [10]:
IMG_SIZE = (224, 224)
BATCH_SIZE = 32

# Define image transformations
transform = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # Imagenet standards for RGB images
])

In [None]:
# Create directories to store the data
train_dir = "/content/prado-museum-pictures/dataset/train"
val_dir = "/content/prado-museum-pictures/dataset/validation"
test_dir = "/content/prado-museum-pictures/dataset/test"

# Load the data using the downloaded directories
train_dataset = datasets.ImageFolder(train_dir, transform=transform)
val_dataset = datasets.ImageFolder(val_dir, transform=transform)
test_dataset = datasets.ImageFolder(test_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [16]:
# Create directories to store the data
test_dir = r"C:\Users\leoac\Downloads\archive\dataset_3_classes\test"

# Load the data using the downloaded directories
test_dataset = datasets.ImageFolder(test_dir, transform=transform)

test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Models

### Network architecture

In [None]:
class HeavyCNN(nn.Module):
    def __init__(self, num_classes):
        super(HeavyCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.dropblock = DropBlock2D(block_size=5, drop_prob=0.3)  # DropBlock layer
        self.fc1 = nn.Linear(128 * 28 * 28, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.dropblock(x)  # Apply DropBlock
        x = x.view(-1, 128 * 28 * 28)  # Flatten the tensor
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [4]:
class LightCNN(nn.Module):
    def __init__(self, num_classes):
        super(LightCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.dropblock = DropBlock2D(block_size=5, drop_prob=0.3)  # DropBlock layer
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(128, 128)
        self.bn_fc1 = nn.BatchNorm1d(128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = self.dropblock(x)  # Apply DropBlock
        x = self.global_pool(x)
        x = x.view(-1, 128)  # Flatten the tensor
        x = self.dropout(F.relu(self.bn_fc1(self.fc1(x))))
        x = self.fc2(x)
        return x

In [28]:
model = LightCNN(num_classes=3)
for inputs, labels in test_loader:
    output = model(inputs)

    # Visualize the model
    make_dot(output, params=dict(list(model.named_parameters()))).render("lightcnn", format="png")
    break

ExecutableNotFound: failed to execute WindowsPath('dot'), make sure the Graphviz executables are on your systems' PATH

## Architecture selection

Evaluate two architectures with constant hyperparameters

In [None]:
model = LightCNN(num_classes=3) # Change to HeavyCNN for the heavy model

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

num_epochs = 15

In [None]:
# Define optimizer, scheduler, and criterion
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
criterion = nn.CrossEntropyLoss()

Train

In [None]:
for epoch in tqdm(range(num_epochs)):
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'''
          Epoch [{epoch+1}/{num_epochs}]
          Loss: {running_loss/len(train_loader):.4f}''')

Evaluate

In [None]:
# Set model to evaluation mode
model.eval()
correct = 0
total = 0
incorrect_images = []

# Initialize dictionaries to count errors and accuracy per label
label_errors = defaultdict(int)
label_correct = defaultdict(int)
label_total = defaultdict(int)

# No gradient calculation in evaluation mode
with torch.no_grad():
    for inputs, labels in tqdm(val_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Track errors and correct predictions per label
        for label, prediction in zip(labels, predicted):
            label_total[label.item()] += 1
            if label == prediction:
                label_correct[label.item()] += 1
            else:
                label_errors[label.item()] += 1

In [None]:
# Print overall accuracy
accuracy = 100 * correct / total
print(f'Overall Accuracy: {accuracy:.2f}%')

labels_accuracy_dict = {}
# Print errors and accuracy per label
for label in label_total.keys():
    errors = label_errors[label]
    total_per_label = label_total[label]
    correct_per_label = label_correct[label]
    accuracy_per_label = correct_per_label / total_per_label
    labels_accuracy_dict[label] = accuracy_per_label
    print(f'Label {label}: {errors} errors out of {total_per_label}, Accuracy: {accuracy_per_label:.2f}')

In [None]:
# Data from the test
labels = ['Lienzo', 'Papel', 'Tabla']
accuracies = [labels_accuracy_dict[0], labels_accuracy_dict[1], labels_accuracy_dict[2]]

# Plotting the data
plt.figure(figsize=(8, 6))
bars = plt.bar(labels, accuracies, color='lightblue', width=0.5)

# Adding accuracy values on top of the columns
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width() / 2.0, height, f'{height:.2f}', ha='center', va='bottom')

plt.xlabel('Support type')
plt.ylabel('Accuracy')
plt.title('Accuracy per Support type - Light CNN') # Change to Heavy CNN for the heavy model
plt.ylim(0, 1)

# Removing axis lines
plt.gca().spines['top'].set_visible(False)
plt.gca().spines['right'].set_visible(False)
plt.gca().spines['left'].set_visible(False)
plt.gca().spines['bottom'].set_visible(False)

plt.show()

Label 0: Lienzo

Label 1: Papel

Label 2: Tabla

In [None]:
torch.save(model.state_dict(), 'LIGHT_prado_artwork_support_classifier.pth') # Change to HEAVY for the heavy model

LightCNN is chosen due to its small weight and its performances comparable to the heavy model

# Hyperparameters tuning

In [None]:
def train_and_evaluate(model, train_loader, val_loader, params, device):
    # Set the model to training mode
    model.train()

    # Define the optimizer and loss function
    optimizer = optim.SGD(model.parameters(), lr=params['learning_rate'], momentum=params['momentum'])
    criterion = nn.CrossEntropyLoss()

    # Training loop
    for epoch in tqdm(range(15), desc='Epochs'): # 15 epochs fixed
        for batch in tqdm(train_loader, desc='Training'):
            # Get the inputs and labels
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    # Evaluate the model
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in tqdm(val_loader, desc='Validation'):
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(val_loader)
    accuracy = 100 * correct / total
    return val_loss, accuracy

In [None]:
def objective(trial):
    params = {
        'learning_rate': trial.suggest_loguniform('learning_rate', 1e-5, 1e-1),
        # 'batch_size': trial.suggest_categorical('batch_size', [32, 64, 128]),
        # 'epochs': trial.suggest_categorical('epochs', [5, 10, 15]),
        'momentum': trial.suggest_uniform('momentum', 0.8, 0.99)
    }
    model = LightCNN(num_classes = 3)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    val_loss, accuracy = train_and_evaluate(model, train_loader, val_loader, params, device)
    return accuracy # optional: return val_loss


In [None]:
# Launch the optimization
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=5) # study.optimize will also print the best params

In [None]:
# Extract the best trial and the best model
best_trial = study.best_trial
print('Best accuracy: ', best_trial.values[0])
print('Params: ', best_trial.params)

# Testing

Model loading

In [None]:
best_model = LightCNN(num_classes=3)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_model.to(device)

num_epochs = 15

Re-train

In [None]:
# Define optimizer, and criterion
optimizer = torch.optim.SGD(best_model.parameters(), lr=best_trial.params['learning_rate'], momentum=best_trial.params['momentum'])
criterion = nn.CrossEntropyLoss()

In [None]:
for epoch in tqdm(range(num_epochs)):
    best_model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = best_model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'''
          Epoch [{epoch+1}/{num_epochs}]
          Loss: {running_loss/len(train_loader):.4f}''')

Evaluate

In [None]:
# Set model to evaluation mode
best_model.eval()
correct = 0
total = 0
incorrect_images = []

# Initialize dictionaries to count errors and accuracy per label
label_errors = defaultdict(int)
label_correct = defaultdict(int)
label_total = defaultdict(int)

# No gradient calculation in evaluation mode
with torch.no_grad():
    for inputs, labels in tqdm(test_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = best_model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Track errors and correct predictions per label
        for label, prediction in zip(labels, predicted):
            label_total[label.item()] += 1
            if label == prediction:
                label_correct[label.item()] += 1
            else:
                label_errors[label.item()] += 1

In [None]:
# Print overall accuracy
accuracy = 100 * correct / total
print(f'Overall Accuracy: {accuracy:.2f}%')

labels_accuracy_dict = {}
# Print errors and accuracy per label
for label in label_total.keys():
    errors = label_errors[label]
    total_per_label = label_total[label]
    correct_per_label = label_correct[label]
    accuracy_per_label = correct_per_label / total_per_label
    labels_accuracy_dict[label] = accuracy_per_label
    print(f'Label {label}: {errors} errors out of {total_per_label}, Accuracy: {accuracy_per_label:.2f}')

Label 0: Lienzo

Label 1: Papel

Label 2: Tabla

In [None]:
# Data from the validation
labels = ['Lienzo', 'Papel', 'Tabla']
accuracies = [labels_accuracy_dict[0], labels_accuracy_dict[1], labels_accuracy_dict[2]]

# Plotting the data
plt.figure(figsize=(8, 6))
bars = plt.bar(labels, accuracies, color='lightblue', width=0.5)

# Adding accuracy values on top of the columns
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width() / 2.0, height, f'{height:.2f}', ha='center', va='bottom')

plt.xlabel('Support type')
plt.ylabel('Accuracy')
plt.title('Accuracy per Support type - Light CNN')
plt.ylim(0, 1)  # Setting y-axis limit to 1 for percentage representation

# Removing axis lines
plt.gca().spines['top'].set_visible(False)
plt.gca().spines['right'].set_visible(False)
plt.gca().spines['left'].set_visible(False)
plt.gca().spines['bottom'].set_visible(False)

plt.show()