In [None]:
# Library
from google.colab import drive
drive.mount('/content/drive')

import os
import pandas as pd
import numpy as np
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
import copy
from sklearn.metrics import mean_absolute_error
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from torchvision.models import resnet50, ResNet50_Weights
import time

In [None]:
# -------------------------
# Paths to the Data
# -------------------------
root_dir = "Datasets"
cleaned_data_path = os.path.join(root_dir, "df_postwar.pkl")

# -------------------------
# Copy images from Drive to local Colab disk
# -------------------------
local_image_dir = "Datasets/images_resized"
os.makedirs(local_image_dir, exist_ok=True)

# -------------------------
# Load tabular data
# -------------------------
df_postwar = pd.read_pickle(cleaned_data_path)
df_postwar['Log Price'] = np.log1p(df_postwar['Price Sold USD'])
print("Data loaded. Columns:", df_postwar.columns.tolist())

# -------------------------
# Assign local image paths
# -------------------------
def get_image_path(idx):
    filename = f"art_{idx:05d}.jpg"
    path = os.path.join(local_image_dir, filename)
    return path if os.path.exists(path) else None

df_postwar['Image Path'] = df_postwar.index.to_series().apply(get_image_path)
df_postwar = df_postwar[df_postwar['Image Path'].notnull()]
print(f"{len(df_postwar)} artworks with valid image files in local disk")

In [None]:
# -------------------------
# STEP 1: VARIABLE TYPES
# -------------------------
target_col = 'Log Price'
numerical_cols = [
    'Area',
    'Sale Year',
    'CPI_US',
    'Artist Ordered Median Price']
cat_cols = [
    'Paint Final Imputed Collapsed',
    'Material Final Imputed Collapsed',
    'Artist Name',
    'Auction House',
    'Country',
    'Birth Period',
    'Alive Status'
]

# -------------------------
# STEP 2: ENCODE CATEGORICAL VARIABLES
# -------------------------
cat_vocab_sizes = {}
for col in cat_cols:
    df_postwar[col] = df_postwar[col].astype('category')
    df_postwar[col + '_idx'] = df_postwar[col].cat.codes.clip(lower=0)
    cat_vocab_sizes[col] = len(df_postwar[col].cat.categories)

In [None]:
# -------------------------
# STEP 3: SPLIT + NORMALIZE + FEATURE ADD
# -------------------------
df_postwar = df_postwar.sort_values('Sale Date Cleaned').reset_index(drop=True)
df_postwar['Sale Year'] = pd.to_datetime(df_postwar['Sale Date Cleaned']).dt.year

def add_ordered_artist_median(df):
    df = df.sort_values('Sale Date Cleaned').copy()
    df['Artist Ordered Median Price'] = (
        df.groupby('Artist Name')['Price Sold USD']
          .transform(lambda x: x.shift().expanding().median()))

    # New Variable: Artist Sale Count
    df['Artist Sale Count'] = (
        df.groupby('Artist Name')
          .cumcount()
    )

    return df[df['Artist Ordered Median Price'].notnull()].copy()

train_df = add_ordered_artist_median(df_postwar[df_postwar['Sale Year'] <= 2014])
val_df = add_ordered_artist_median(df_postwar[(df_postwar['Sale Year'] > 2014) & (df_postwar['Sale Year'] <= 2018)])
test_df = add_ordered_artist_median(df_postwar[df_postwar['Sale Year'] > 2018])

print("Numerical columns:", numerical_cols)

scaler = StandardScaler()
train_df[numerical_cols] = scaler.fit_transform(train_df[numerical_cols])
val_df[numerical_cols] = scaler.transform(val_df[numerical_cols])
test_df[numerical_cols] = scaler.transform(test_df[numerical_cols])

In [None]:
# -------------------------
# STEP 4: DATASET & DATALOADER
# -------------------------
# Converts tabular data into a format PyTorch so that it can be fed to NN

# The class saves the categorical data as indices, and the numerical data as floats
class ArtPriceDataset(Dataset):

    #Initializes the dataset by saving the cat, numerical, image paths, and the target
    def __init__(self, df, cat_cols, num_cols, target_col, transform = None):

        print("📦 Initializing ArtPriceDataset...")

        self.cat_data = df[[col + '_idx' for col in cat_cols]].values.astype('int64').astype('int64')
        self.num_data = df[num_cols].values.astype('float32')
        self.targets = df[target_col].values.astype('float32')
        self.image_paths = df['Image Path'].values
        print(f"✅ Dataset contains {len(self.targets)} samples.")

        self.transform = transform or transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
             transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                  std=[0.229, 0.224, 0.225])
        ])

    def __len__(self): # returns number of rows
        return len(self.targets)

    # For a given row, it returns a dictionary with the categorical and numerical data, and the target
    # {'cat': ..., 'num': ..., 'img': ..., 'target': ...} for that artwork
    def __getitem__(self, idx):
        if idx < 2:
          print(f"📸 Loading item {idx}: {self.image_paths[idx]}")

        cat_tensor = torch.tensor(self.cat_data[idx], dtype=torch.long)
        num_tensor = torch.tensor(self.num_data[idx], dtype=torch.float32)
        target_tensor = torch.tensor(self.targets[idx], dtype=torch.float32)

        # Load and transform image
        image_path = self.image_paths[idx]

        try:
            image = Image.open(image_path).convert("RGB")
            image_tensor = self.transform(image)
        except Exception as e:
            print(f"❌ Error loading image {image_path}: {e}")
            image_tensor = torch.zeros(3, 224, 224)

        return {
            'cat': cat_tensor,
            'num': num_tensor,
            'img': image_tensor,
            'target': target_tensor
        }

# Defining a shared image transform
# Resizes the image to 224x224 and converts it to a tensor
# Tensor = data cotainer like an arrsay but any dimension -> ie. (64, 3, 224, 224) Batch × Channels × Height × Width
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# Create datasets using the transform
train_ds = ArtPriceDataset(train_df, cat_cols, numerical_cols, target_col, transform=image_transform)
val_ds = ArtPriceDataset(val_df, cat_cols, numerical_cols, target_col, transform=image_transform)
test_ds = ArtPriceDataset(test_df, cat_cols, numerical_cols, target_col, transform=image_transform)

# Create DataLoaders
# Wraps data in batches, shuffles it during training, and feeds it to the model
train_loader = DataLoader(train_ds, batch_size=64, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=64)
test_loader = DataLoader(test_ds, batch_size=64)

In [None]:
# -------------------------
# STEP 5: MAIN MULTIMODAL MODEL
# -------------------------
# Defining the multimodal neural network

class ArtPriceMultimodalNN(nn.Module):

    def __init__(self, cat_vocab_sizes, num_numerical):
        super().__init__()

        # 1. Categorical embeddings
        self.embeddings = nn.ModuleList() # Initialize a list of embedding layers (one for each cat variable)
        self.embedding_dims = [] # Initialize a list of embedding output sizes

        for vocab_size in cat_vocab_sizes.values(): # Calculates the embedding dimensions for each categorical variable
            emb_dim = min(50, (vocab_size + 1) // 2) # embedding dimension per cat variable
            self.embeddings.append(nn.Embedding(vocab_size, emb_dim))
            self.embedding_dims.append(emb_dim)
        self.total_emb_dim = sum(self.embedding_dims)

        # 2. Pretrained ResNet50 image encoder (without final layer)
        base_resnet = resnet50(weights=ResNet50_Weights.DEFAULT) # load pretrained ResNet50

        # Freeze all layers
        for param in base_resnet.parameters():
            param.requires_grad = False

        # Unfreeze only the second-to-last block (layer4)
        for param in base_resnet.layer4.parameters():
            param.requires_grad = True

        self.image_encoder = nn.Sequential(*list(base_resnet.children())[:-1]) # remove fc layer, so outputs a 2048-dim feature vector for each image


        # 3. Fully connected head (Tabular + Image features)
        # Three Layered NN that concats the categorical, numerical, and image features
        self.fc = nn.Sequential(
            nn.Linear(self.total_emb_dim + num_numerical + 2048, 2048), # embeddings + numerical + image features
            nn.ReLU(),
            nn.BatchNorm1d(2048),
            nn.Dropout(0.3),
            nn.Linear(2048, 1000),
            nn.ReLU(),
            nn.BatchNorm1d(1000),
            nn.Dropout(0.3),
            nn.Linear(1000, 1)
        )

    def forward(self, cat_data, num_data, image_data):
        # Embeddings
        embedded = [emb(cat_data[:, i]) for i, emb in enumerate(self.embeddings)]
        cat_out = torch.cat(embedded, dim=1) # Concatenates all the embedded outputs into a single vector

        # Image encoding
        img_features = self.image_encoder(image_data)  # Feeds images through ResNet50
        img_out = img_features.view(img_features.size(0), -1)  # flatten to [batch, 2048], makes ready for concatenation

        # Combine all features: categorical embeddings + numerical + image vector
        x = torch.cat([cat_out, num_data, img_out], dim=1)
        return self.fc(x).squeeze() # squezze to dimension [batch, 1] for the final output (single price per artwork)

model = ArtPriceMultimodalNN(cat_vocab_sizes=cat_vocab_sizes, num_numerical=len(numerical_cols))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


In [None]:
# -------------------------
# STEP 6: TRAINING WITH EARLY STOPPING
# -------------------------
def train_model(model, train_loader, val_loader, criterion, optimizer, n_epochs=50, patience=10):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_val_mae = float("inf")
    epochs_no_improve = 0
    history = {"train_loss": [], "val_loss": [], "train_mae": [], "val_mae": []}

    for epoch in range(n_epochs):
        print(f"\n Starting Epoch {epoch+1}/{n_epochs}...")
        epoch_start = time.time()

        model.train()
        y_train_true = []
        y_train_pred = []

        train_start = time.time()
        for i, batch in enumerate(train_loader):
            if i % 5 == 0:
               print(f"Train batch {i+1}/{len(train_loader)}")

            cat = batch['cat'].to(device)
            num = batch['num'].to(device)
            img = batch['img'].to(device)
            target = batch['target'].to(device)

            optimizer.zero_grad()
            output = model(cat, num, img)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            y_train_true.extend(target.detach().cpu().numpy())
            y_train_pred.extend(output.detach().cpu().numpy())


        train_mae = mean_absolute_error(y_train_true, y_train_pred)
        print(f"Finished training loop in {train_time:.2f} sec. Train MAE: {train_mae:.4f}")

        model.eval()
        y_val_true = []
        y_val_pred = []

        with torch.no_grad():
            for batch in val_loader:
                cat = batch['cat'].to(device)
                num = batch['num'].to(device)
                img = batch['img'].to(device)
                target = batch['target'].to(device)

                output = model(cat, num, img)

                y_val_true.extend(target.cpu().numpy())
                y_val_pred.extend(output.cpu().numpy())

        val_mae = mean_absolute_error(y_val_true, y_val_pred)
        history['train_mae'].append(train_mae)
        history['val_mae'].append(val_mae)

        print(f"Epoch {epoch+1:02d}: Train MAE = {train_mae:.4f}, Val MAE = {val_mae:.4f}")

        # Early stopping
        if val_mae < best_val_mae:
            best_val_mae = val_mae
            best_model_wts = copy.deepcopy(model.state_dict())
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print("Early stopping")
                break

    model.load_state_dict(best_model_wts)
    return model, history

In [None]:
# -------------------------
# STEP 7: RUN TRAINING
# -------------------------
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
trained_model, history = train_model(model, train_loader, val_loader, criterion, optimizer)
print("Training complete")

In [None]:
# -------------------------
# STEP 8: VISUALIZE LOSS CURVE
# -------------------------
import matplotlib.pyplot as plt

plt.figure(figsize=(10,5))
plt.plot(history['train_mae'], label='Train MAE')
plt.plot(history['val_mae'], label='Validation MAE')
plt.title('MAE per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Mean Absolute Error')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# -------------------------
# STEP 9: PREDICTIONS & RESIDUALS
# -------------------------
y_true = []
y_pred = []

trained_model.eval()
with torch.no_grad():
    for batch in val_loader:
        cat = batch['cat'].to(device)
        num = batch['num'].to(device)
        img = batch['img'].to(device)
        target = batch['target'].to(device)

        output = trained_model(cat, num, img)

        y_true.extend(target.cpu().numpy())
        y_pred.extend(output.cpu().numpy())

# Convert to arrays
y_true = np.array(y_true)
y_pred = np.array(y_pred)
residuals = y_true - y_pred
mae_val = np.mean(np.abs(residuals))
print(f"Final Validation MAE: {mae_val:.4f}")

# Plot: Predicted vs Actual
plt.figure(figsize=(6,6))
plt.scatter(y_true, y_pred, alpha=0.5)
plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], '--', color='gray')
plt.title('Predicted vs Actual Log Prices')
plt.xlabel('Actual Log Price')
plt.ylabel('Predicted Log Price')
plt.grid(True)
plt.show()

# Plot: Residuals
plt.figure(figsize=(8,4))
plt.hist(residuals, bins=50, alpha=0.7)
plt.title('Residuals (Actual - Predicted)')
plt.xlabel('Residual')
plt.ylabel('Frequency')
plt.grid(True)
plt.show()
