In [None]:
import json

import matplotlib.pyplot as plt
import numpy as np
import torch

from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics.pairwise import cosine_distances, euclidean_distances
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC

from torch import Tensor, nn

from Museum import Museum
from params.collections import MUSEUMS

PREFIX = "20250705"
DATA_DIR = "./metadata/json"
DATA_FILE = f"{DATA_DIR}/{PREFIX}_processed.json"
CLUSTER_FILE = f"{DATA_DIR}/{PREFIX}_clusters.json"

## Dataset

In [None]:
class YearClass:
  def __init__(self, min_year, max_year, gran):
    def year2class(year):
      if year < min_year:
        return 0
      elif year >= max_year:
        return int((max_year - min_year) / gran) + 1
      else:
        return int((year - min_year) / gran) + 1

    def class2year(cls):
      if cls == 0 or cls == int((max_year - min_year) / gran) + 1:
        return 9999
        return min_year - 1
      else:
        return ((cls - 1) * gran) + min_year

    self.year2class = year2class
    self.class2year = class2year

In [None]:
mYC = YearClass(1700, 2000, 50)

embedding_data = Museum.combine_all_data(MUSEUMS, "embeddings")

with open(DATA_FILE, "r", encoding="utf-8") as ifp:
  all_data = json.load(ifp)

with open(CLUSTER_FILE, "r", encoding="utf-8") as ifp:
  cluster_data_all = json.load(ifp)
  cluster_data = cluster_data_all["8"]["images"]

class_data = []

for id in all_data.keys():
  if all_data[id]["year"] < 2030:
    class_data.append({
      "id": str(all_data[id]["id"]),
      "year": all_data[id]["year"],
      "cluster": cluster_data[id]["cluster"],
      "class": mYC.year2class(all_data[id]["year"]),
      "embedding": embedding_data[id]["siglip2"]
    })

class_data_train, class_data_test = train_test_split(class_data, test_size=0.25, random_state=101010)

classes_train = np.array([x["class"] for x in class_data_train])
classes_test = np.array([x["class"] for x in class_data_test])

## Classifiers

In [None]:
class AverageClassify:
  @classmethod
  def top_k_accuracy(cls, labels, preds, k=1):
    corrects = [1 for l,ps in zip(labels, preds) if l in ps[:k]]
    return len(corrects) / len(labels)

  @classmethod
  def dist_accuracy(cls, labels, preds):
    correct_idxs = []
    wrong_idxs = []
    for idx,pred in enumerate(preds):
      if pred[0] == labels[idx]:
        correct_idxs.append(idx)
      else:
        wrong_idxs.append(idx)

    correct_dists = preds[correct_idxs][:, 1]
    wrong_dists = preds[wrong_idxs][:, 1]

    print("Correct:", correct_dists.min(), correct_dists.max(), correct_dists.mean())
    if len(wrong_dists) > 0:
      print("Wrong:", wrong_dists.min(), wrong_dists.max(), wrong_dists.mean())

    plt.hist(correct_dists, bins=30)
    plt.title("Correct")
    plt.show()

    plt.hist(wrong_dists, bins=30)
    plt.title("Wrong")
    plt.show()

    return len(correct_idxs) / len(labels)

  def __init__(self, n_averages):
    self.n_averages = n_averages
    self.average_centers = []
    self.idx2class = []

  def fit(self, data):
    classes = np.array([x["class"] for x in data])
    embeddings = np.array([x["embedding"] for x in data])

    self.average_centers = []
    self.idx2class = []

    for mclass in np.sort(np.unique(classes)):
      mclass_embs = embeddings[np.where(classes == mclass)]
      mKMeans = KMeans(n_clusters=min(len(mclass_embs), self.n_averages), random_state=1010)
      mKMeans.fit(mclass_embs)

      for avg_val in mKMeans.cluster_centers_:
        self.average_centers.append(avg_val)
        self.idx2class.append(mclass)

    self.average_centers = np.array(self.average_centers)
    self.idx2class = np.array(self.idx2class)

  def predict(self, data):
    embeddings = np.array([x["embedding"] for x in data])
    dists = euclidean_distances(embeddings, self.average_centers)
    return np.array([[self.idx2class[idx] for idx in ds.reshape(-1).argsort()] for ds in dists])

  def predict_dist(self, data):
    embeddings = np.array([x["embedding"] for x in data])
    dists = euclidean_distances(embeddings, self.average_centers)
    classes_dists = np.array([[[self.idx2class[idx], ds[idx]] for idx in ds.reshape(-1).argsort()] for ds in dists])
    return classes_dists[:, 0]


class ClusterClassify:
  def __init__(self, n_averages, n_clusters=8):
    self.n_averages = n_averages
    self.n_clusters = n_clusters
    self.cluster_average_centers = []
    self.idx2class = []

  def fit(self, data):
    clusters = np.array([x["cluster"] for x in data])

    self.cluster_average_centers = []
    self.idx2class = []

    for mcluster in np.sort(np.unique(clusters)):
      cluster_data = np.array(data)[np.where(clusters == mcluster)]
      cluster_classes = np.array([x["class"] for x in cluster_data])
      cluster_embeddings = np.array([x["embedding"] for x in cluster_data])

      for mclass in np.sort(np.unique(cluster_classes)):
        mclass_embs = cluster_embeddings[np.where(cluster_classes == mclass)]
        mKMeans = KMeans(n_clusters=min(len(mclass_embs), self.n_averages), random_state=1010)
        mKMeans.fit(mclass_embs)

        for avg_val in mKMeans.cluster_centers_:
          self.cluster_average_centers.append(avg_val)
          self.idx2class.append(mclass)

    self.cluster_average_centers = np.array(self.cluster_average_centers)
    self.idx2class = np.array(self.idx2class)

  def predict(self, data):
    embeddings = np.array([x["embedding"] for x in data])
    dists = euclidean_distances(embeddings, self.cluster_average_centers)
    return np.array([[self.idx2class[idx] for idx in ds.reshape(-1).argsort()] for ds in dists])


class SKClassify:
  def fit(self, data):
    classes = np.array([x["class"] for x in data])
    embeddings = np.array([x["embedding"] for x in data])

    if self.pca:
      self.mCC.fit(self.pca.fit_transform(embeddings), classes)
    else:
      self.mCC.fit(embeddings, classes)

  def predict(self, data):
    embeddings = np.array([x["embedding"] for x in data])

    if self.pca:
      return self.mCC.predict(self.pca.transform(embeddings))
    else:
      return self.mCC.predict(embeddings)

  def predict_prob(self, data):
    embeddings = np.array([x["embedding"] for x in data])

    if self.pca:
      embeddings = self.pca.transform(embeddings)

    probs = self.mCC.predict_proba(embeddings)
    classes_probs = np.array([[[idx, ps[idx]] for idx in (-ps).reshape(-1).argsort()] for ps in probs])
    return classes_probs[:, 0]

class RFClassify(SKClassify):
  def __init__(self, n_components=None):
    self.mCC = RandomForestClassifier()
    self.pca = PCA(n_components=n_components) if n_components else None

class KNNClassify(SKClassify):
  def __init__(self, n_neighbors=5, n_components=None):
    self.mCC = KNeighborsClassifier(n_neighbors=n_neighbors)
    self.pca = PCA(n_components=n_components) if n_components else None

class SVClassify(SKClassify):
  def __init__(self, C=1.0, probability=False, n_components=None):
    self.mCC = SVC(C=C, probability=probability)
    self.pca = PCA(n_components=n_components) if n_components else None

class MLPClassify(SKClassify):
  def __init__(self, n_components=None):
    self.mCC = MLPClassifier(hidden_layer_sizes=(128))
    self.pca = PCA(n_components=n_components) if n_components else None


class TorchClassify:
  def __init__(self, lr=1e-6, epochs=32):
    self.learning_rate = lr
    self.epochs = epochs
    self.loss_fn = nn.CrossEntropyLoss()

  def fit(self, data):
    classes = Tensor([x["class"] for x in data]).long()
    embeddings = Tensor([x["embedding"] for x in data])

    self.model =  nn.Sequential(
      nn.Dropout(0.35),
      nn.Linear(embeddings.shape[1], embeddings.shape[1] // 2),
      nn.BatchNorm1d(embeddings.shape[1] // 2),
      nn.LayerNorm(embeddings.shape[1] // 2),
      nn.ReLU(),

      nn.Dropout(0.35),
      nn.Linear(embeddings.shape[1] // 2, len(classes.unique())),
    )

    optim = torch.optim.SGD(self.model.parameters(), lr=self.learning_rate, momentum=0.9)

    for e in range(self.epochs):
      optim.zero_grad()
      classes_pred = self.model(embeddings)
      loss = self.loss_fn(classes_pred, classes)
      loss.backward()
      optim.step()
      if e % (self.epochs // 8) == 0:
        print(f"Epoch: {e} loss: {loss.item():.4f}")

  def predict(self, data):
    embeddings = Tensor([x["embedding"] for x in data])
    self.model.eval()
    with torch.no_grad():
      class_pred = self.model(embeddings).argmax(dim=1)
    return [l.item() for l in class_pred]

In [None]:
mCC = AverageClassify(24)
mCC.fit(class_data_train)

preds_train = mCC.predict(class_data_train)
preds_test = mCC.predict(class_data_test)

print(f"train: {accuracy_score(classes_train, preds_train[:, 0])}")
print(f"test: {accuracy_score(classes_test, preds_test[:, 0])}")
print()
print(f"train: {AverageClassify.top_k_accuracy(classes_train, preds_train, 2)}")
print(f"test: {AverageClassify.top_k_accuracy(classes_test, preds_test, 2)}")

In [None]:
mCC = AverageClassify(24)
mCC.fit(class_data_train)

preds_train = mCC.predict_dist(class_data_train)
preds_test = mCC.predict_dist(class_data_test)

print(f"train: {AverageClassify.dist_accuracy(classes_train, preds_train)}")
print(f"test: {AverageClassify.dist_accuracy(classes_test, preds_test)}")

# thold: <10

In [None]:
mCC = ClusterClassify(20)
mCC.fit(class_data_train)

preds_train = mCC.predict(class_data_train)
preds_test = mCC.predict(class_data_test)

print(f"train: {accuracy_score(classes_train, preds_train[:, 0])}")
print(f"test: {accuracy_score(classes_test, preds_test[:, 0])}")
print()
print(f"train: {AverageClassify.top_k_accuracy(classes_train, preds_train, 2)}")
print(f"test: {AverageClassify.top_k_accuracy(classes_test, preds_test, 2)}")

In [None]:
mCC = KNNClassify(7)
mCC.fit(class_data_train)

preds_train = mCC.predict(class_data_train)
preds_test = mCC.predict(class_data_test)

print(f"train: {accuracy_score(classes_train, preds_train)}")
print(f"test: {accuracy_score(classes_test, preds_test)}")

In [None]:
mCC = KNNClassify(7)
mCC.fit(class_data_train)

preds_train = mCC.predict_prob(class_data_train)
preds_test = mCC.predict_prob(class_data_test)

print(f"train: {AverageClassify.dist_accuracy(classes_train, preds_train)}")
print(f"test: {AverageClassify.dist_accuracy(classes_test, preds_test)}")

# thold: >0.8 (correct mean)

In [None]:
mCC = RFClassify(32)
mCC.fit(class_data_train)

preds_train = mCC.predict(class_data_train)
preds_test = mCC.predict(class_data_test)

print(f"train: {accuracy_score(classes_train, preds_train)}")
print(f"test: {accuracy_score(classes_test, preds_test)}")

In [None]:
mCC = RFClassify(32)
mCC.fit(class_data_train)

preds_train = mCC.predict_prob(class_data_train)
preds_test = mCC.predict_prob(class_data_test)

print(f"train: {AverageClassify.dist_accuracy(classes_train, preds_train)}")
print(f"test: {AverageClassify.dist_accuracy(classes_test, preds_test)}")

# thold: >0.65 (correct mean)

In [28]:
mCC = SVClassify(C=8.0, n_components=128)
mCC.fit(class_data_train)

preds_train = mCC.predict(class_data_train)
preds_test = mCC.predict(class_data_test)

print(f"train: {accuracy_score(classes_train, preds_train)}")
print(f"test: {accuracy_score(classes_test, preds_test)}")

In [None]:
mCC = SVClassify(C=8.0, probability=True, n_components=128)
mCC.fit(class_data_train)

preds_train = mCC.predict_prob(class_data_train)
preds_test = mCC.predict_prob(class_data_test)

print(f"train: {AverageClassify.dist_accuracy(classes_train, preds_train)}")
print(f"test: {AverageClassify.dist_accuracy(classes_test, preds_test)}")

# thold: >0.8

In [26]:
mCC = MLPClassify(128)
mCC.fit(class_data_train)

preds_train = mCC.predict(class_data_train)
preds_test = mCC.predict(class_data_test)

print(f"train: {accuracy_score(classes_train, preds_train)}")
print(f"test: {accuracy_score(classes_test, preds_test)}")

In [None]:
mCC = MLPClassify(128)
mCC.fit(class_data_train)

preds_train = mCC.predict_prob(class_data_train)
preds_test = mCC.predict_prob(class_data_test)

print(f"train: {AverageClassify.dist_accuracy(classes_train, preds_train)}")
print(f"test: {AverageClassify.dist_accuracy(classes_test, preds_test)}")

# thold: >????

In [None]:
mCC = TorchClassify(lr=1e-1, epochs=192)
mCC.fit(class_data_train)

preds_train = mCC.predict(class_data_train)
preds_test = mCC.predict(class_data_test)

print(f"train: {accuracy_score(classes_train, preds_train)}")
print(f"test: {accuracy_score(classes_test, preds_test)}")