In [1]:
import itertools
import sys
import os
import numpy as np

In [2]:
from sklearn.model_selection import train_test_split

In [41]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchsummary import summary
import torchinfo

In [4]:
sys.path.append(os.path.abspath(".."))
from utils import read_features, read_targets, print_info_features, print_info_targets, d_types_methods

## Read Features and Targets

In [5]:
path = os.path.abspath(os.path.join(os.getcwd(), "../../data/chronology_prediction"))

In [6]:
targets = ["StartYear", "YearRange"]

In [7]:
X = read_features(path, f_type="tensors")
y = read_targets(path, targets, f_type="tensors")

Loaded X_train_tfidf
Loaded X_train_bert
Loaded X_train_cannyhog
Loaded X_train_resnet
Loaded X_train_vit
Loaded X_test_tfidf
Loaded X_test_bert
Loaded X_test_cannyhog
Loaded X_test_resnet
Loaded X_test_vit
Loaded y_train
Loaded y_test


In [8]:
print_info_features(X)

{
	train: {
		tfidf: 
			<class 'torch.Tensor'>
			shape = torch.Size([1719, 300]), 
		bert: 
			<class 'torch.Tensor'>
			shape = torch.Size([1719, 768]), 
		cannyhog: 
			<class 'torch.Tensor'>
			shape = torch.Size([1719, 2917]), 
		resnet: 
			<class 'torch.Tensor'>
			shape = torch.Size([1719, 2048]), 
		vit: 
			<class 'torch.Tensor'>
			shape = torch.Size([1719, 768]), 
	},
	test: {
		tfidf: 
			<class 'torch.Tensor'>
			shape = torch.Size([191, 300]), 
		bert: 
			<class 'torch.Tensor'>
			shape = torch.Size([191, 768]), 
		cannyhog: 
			<class 'torch.Tensor'>
			shape = torch.Size([191, 2917]), 
		resnet: 
			<class 'torch.Tensor'>
			shape = torch.Size([191, 2048]), 
		vit: 
			<class 'torch.Tensor'>
			shape = torch.Size([191, 768]), 
	},
}


In [9]:
print_info_targets(y)

{
	train: 
		<class 'torch.Tensor'>
		shape   = torch.Size([1719, 2])
	test: 
		<class 'torch.Tensor'>
		shape   = torch.Size([191, 2])
}


## Train-Validation Split

In [10]:
indices = np.arange(y["train"].shape[0])
train_idx, val_idx = train_test_split(indices, test_size=0.1, random_state=42)

train_idx = torch.tensor(train_idx, dtype=torch.int64)
val_idx = torch.tensor(val_idx, dtype=torch.int64)

In [11]:
X = {
    "train": {method: tensors[train_idx] for method, tensors in X["train"].items()},
    "val": {method: tensors[val_idx] for method, tensors in X["train"].items()},
    "test": X["test"]
}

print_info_features(X)

{
	train: {
		tfidf: 
			<class 'torch.Tensor'>
			shape = torch.Size([1547, 300]), 
		bert: 
			<class 'torch.Tensor'>
			shape = torch.Size([1547, 768]), 
		cannyhog: 
			<class 'torch.Tensor'>
			shape = torch.Size([1547, 2917]), 
		resnet: 
			<class 'torch.Tensor'>
			shape = torch.Size([1547, 2048]), 
		vit: 
			<class 'torch.Tensor'>
			shape = torch.Size([1547, 768]), 
	},
	val: {
		tfidf: 
			<class 'torch.Tensor'>
			shape = torch.Size([172, 300]), 
		bert: 
			<class 'torch.Tensor'>
			shape = torch.Size([172, 768]), 
		cannyhog: 
			<class 'torch.Tensor'>
			shape = torch.Size([172, 2917]), 
		resnet: 
			<class 'torch.Tensor'>
			shape = torch.Size([172, 2048]), 
		vit: 
			<class 'torch.Tensor'>
			shape = torch.Size([172, 768]), 
	},
	test: {
		tfidf: 
			<class 'torch.Tensor'>
			shape = torch.Size([191, 300]), 
		bert: 
			<class 'torch.Tensor'>
			shape = torch.Size([191, 768]), 
		cannyhog: 
			<class 'torch.Tensor'>
			shape = torch.Size([191, 2917]), 
		resnet: 
		

In [12]:
y = {
    "train": y["train"][train_idx],
    "val": y["train"][val_idx],
    "test": y["test"]
}

print_info_targets(y)

{
	train: 
		<class 'torch.Tensor'>
		shape   = torch.Size([1547, 2])
	val: 
		<class 'torch.Tensor'>
		shape   = torch.Size([172, 2])
	test: 
		<class 'torch.Tensor'>
		shape   = torch.Size([191, 2])
}


## Torch Datasets and Dataloaders

In [13]:
class PotteryDataset(Dataset):
    def __init__(self, X_list, y):
        """
        X_list: list of tensors, each [N, d] (can be 1 or more feature sets)
        y: tensor of targets [N] or [N, t] (t = number of targets)
        """
        self.X_list = X_list
        self.y = y

    def __len__(self):
        # Return number of samples in dataset
        return self.y.shape[0]

    def __getitem__(self, idx):
        # Return one sample (features and target) at position idx
        return [X[idx] for X in self.X_list], self.y[idx]

In [14]:
feature_types = d_types_methods["text"] + d_types_methods["image"]
feature_type_combos = tuple(itertools.product(d_types_methods["text"], d_types_methods["image"]))

datasets = {
    subset: {
                ft:
                    PotteryDataset([X[subset][ft]], y[subset])
                for ft in feature_types
            } | {
                f"{ft_txt} + {ft_img}":
                    PotteryDataset([X[subset][ft_txt], X[subset][ft_img]], y[subset])
                for ft_txt, ft_img in feature_type_combos
            }
    for subset in X.keys()
}

In [15]:
loaders = {
    subset: {
        ft:
            DataLoader(dataset, batch_size=64, shuffle=(subset == "train"))
        for ft, dataset in datasets[subset].items()
    }
    for subset in datasets.keys()
}

In [16]:
datasets["train"]["bert"].__getitem__(0)

([tensor([-8.6254e-01,  2.2365e-02, -2.1014e-01, -1.2335e-02, -2.8407e-01,
           7.0454e-01, -1.7067e-01,  1.1946e+00, -5.3572e-01, -5.8065e-02,
          -1.8781e-01, -2.9218e-01, -5.4114e-01,  5.1873e-01, -2.3959e-01,
           4.4317e-01, -4.1814e-03,  1.0288e-01,  3.6834e-01,  4.6326e-01,
           2.9681e-01, -7.2920e-01,  9.1575e-02, -2.3819e-01, -3.1571e-01,
          -2.7823e-01, -4.8755e-01, -2.1281e-01, -1.8366e-01, -2.2698e-01,
          -1.8676e-01,  8.1518e-01, -2.9955e-01, -1.2661e+00,  7.4185e-01,
          -3.0797e-01,  5.2518e-01,  1.5000e-01,  9.3622e-02,  3.7078e-01,
          -1.2061e-01,  4.2290e-01,  4.4087e-01, -3.8067e-01, -4.4775e-01,
           5.3737e-02, -4.0323e+00, -3.2970e-01, -9.8180e-01, -5.4768e-01,
           5.4878e-01, -3.0711e-01,  1.1223e-02,  5.3643e-01,  4.5783e-01,
           9.5549e-01, -8.7916e-01,  8.3941e-02,  1.0419e+00,  8.1601e-01,
           1.6511e-01,  5.0036e-02, -3.4703e-01,  4.4456e-02,  5.2390e-02,
           6.2310e-01, -2

## MLP NN Architecture

Hidden Layer Block:

Linear → Activation → Dropout



In [66]:
class PotteryChronologyPredictor(nn.Module):
    def __init__(self,
                 input_sizes,
                 hidden_size,
                 output_size,
                 activation=nn.ReLU,
                 dropout=0.3,
                 blocks=3,
                 hidden_size_pattern="decreasing",
                 output_type="years"
                 ):

        super(PotteryChronologyPredictor, self).__init__()

        self.input_sizes = input_sizes
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.output_type = output_type

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        n_inputs = len(input_sizes)

        # Per-modality encoders
        self.encoders = nn.ModuleList([
            nn.Sequential(
                nn.Linear(input_size, hidden_size),
                activation(),
                nn.Dropout(dropout),
            ) for input_size in input_sizes
        ])

        # Fusion network

        self.model_input_size = hidden_size * n_inputs

        block_input_size = self.model_input_size
        block_output_size = hidden_size

        self.model = nn.Sequential()

        for i in range(blocks):
            self.model.add_module(f"dense{i+1}", nn.Linear(block_input_size, block_output_size))
            self.model.add_module(f"activation{i+1}", activation())
            self.model.add_module(f"dropout{i+1}", nn.Dropout(dropout))

            block_input_size = block_output_size
            if hidden_size_pattern == "decreasing" and block_output_size > 8:
                block_output_size //= 2

        self.model.add_module("output", nn.Linear(block_input_size, output_size))

        self.to(self.device)

    def forward(self, inputs):
        """
        inputs: list of tensors, one per modality
                e.g. [X_text, X_image, ...]
                each of shape [batch_size, input_size_i]
        """
        if not isinstance(inputs, list):
            inputs = [inputs]

        # Pass each modality through its encoder
        encoded_inputs = [encoder(X) for X, encoder in zip(inputs, self.encoders)]

        # Concatenate encoded modalities
        X = torch.cat(encoded_inputs, dim=1)  # [batch_size, hidden_size * n_inputs]

        # Pass through fusion network
        y = self.model(X)

        return y

    def summary(self):
        # for idx, encoder in enumerate(self.encoders):
        #     print(f"Encoder #{idx + 1}")
        #     summary(encoder, input_size=(self.encoder_input_sizes[idx],), device=self.device.type)
        #     print()
        # print("\nMain Model")
        # summary(self.model, input_size=(self.model_input_size,), device=self.device.type)
        # dummy_inputs = [torch.zeros(1, size).to(self.device) for size in self.input_sizes]
        # print("\nFull Model")
        # summary(self, input_size=self.input_sizes, device=self.device.type)
        print(torchinfo.summary(self, input_size=[(input_size,) for input_size in self.input_sizes], batch_dim = 0, device=self.device, col_names=("input_size", "output_size", "num_params", "mult_adds")))

In [71]:
test = PotteryChronologyPredictor([X["train"]["bert"].shape[1]], 512, y["train"].shape[1], output_type="years")

In [72]:
test.summary()

Layer (type:depth-idx)                   Input Shape               Output Shape              Param #                   Mult-Adds
PotteryChronologyPredictor               [1, 768]                  [1, 2]                    --                        --
├─ModuleList: 1-1                        --                        --                        --                        --
│    └─Sequential: 2-1                   [1, 768]                  [1, 512]                  --                        --
│    │    └─Linear: 3-1                  [1, 768]                  [1, 512]                  393,728                   393,728
│    │    └─ReLU: 3-2                    [1, 512]                  [1, 512]                  --                        --
│    │    └─Dropout: 3-3                 [1, 512]                  [1, 512]                  --                        --
├─Sequential: 1-2                        [1, 512]                  [1, 2]                    --                        --
│    └─Linea