In [None]:
# !pip install faiss-gpu-cu12

In [None]:
from datasets import load_dataset, load_from_disk

from transformers import AutoTokenizer, AutoModel
from sentence_transformers import CrossEncoder, InputExample, SentenceTransformer, losses
from sentence_transformers.cross_encoder.evaluation import CrossEncoderCorrelationEvaluator
from huggingface_hub import notebook_login

from torch.utils.data import DataLoader
import torch

from sklearn.model_selection import train_test_split
from sklearn.cluster import MiniBatchKMeans
from sklearn.decomposition import PCA

import matplotlib.pyplot as plt
import numpy as np
import math

from itertools import combinations
import random
from collections import defaultdict


In [None]:
artworks_ds = load_dataset("anna-bozhenko/artworks", split="train")
artworks_ds

In [None]:
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)


In [None]:
def get_embeddings(text_list, tokenizer, model, device="cuda"):
    encoded_input = tokenizer(
        text_list, padding=True, truncation=True, return_tensors="pt"
    )
    encoded_input = {k: v.to(device) for k, v in encoded_input.items()}
    model_output = model(**encoded_input)
    return mean_pooling(model_output, encoded_input['attention_mask'])


# Embedd with sentence-transformers/multi-qa-MiniLM-L6-cos-v1


In [None]:
# 'sentence-transformers/multi-qa-mpnet-base-cos-v1'
model_checkpoint = 'sentence-transformers/multi-qa-MiniLM-L6-cos-v1'
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
bi_encoder = AutoModel.from_pretrained(model_checkpoint).cuda()



In [None]:
artworks_ds = artworks_ds.map(
    lambda batch: {"embeddings": [get_embeddings(x, tokenizer, bi_encoder) for x in batch["full_info"]]},
    batched = True,
    batch_size = 10
)


# Perform bi-encoder augmentation

## Create dataset clustering most relative `full_info`s descriptions

In [None]:
chicago_ds = load_from_disk("/content/drive/MyDrive/artistic_styles/paintings/chicago_ds")

In [None]:
paintings_drawings_chicago_ds = chicago_ds.map(lambda x, i: {"is_paint_draw": i if sum(
    [clasf in ' '.join(x['classification']).lower()
     for clasf in ["drawing", "painting"]]
    ) > 0 else -1},
                                               with_indices=True
                                                  )

In [None]:
painting_drawings_idxs = paintings_drawings_chicago_ds.filter(lambda x: x['is_paint_draw'] >= 0)['is_paint_draw']

In [None]:
# find range of indices of paper and canvases artworks, from Chicago Institute of Arts 
chicago_ds_len = len(chicago_ds)
louvre_paintings = len(artworks_ds) - chicago_ds_len
chicago_draw_paint_idxs_within_artworks_ds = [i+louvre_paintings for i in painting_drawings_idxs]
embedding_idxs = list(range(louvre_paintings)) + chicago_draw_paint_idxs_within_artworks_ds


### Finally, extract from the collective `artworks` necessary "Chicago" drawings and paintins

In [None]:
# dataset for creating labels
ds0 = artworks_ds.select(embedding_idxs)

# Perform dimensions reduction [paper](https://arxiv.org/pdf/1708.03629)

In [None]:
n_components = len(ds0[0]["embeddings"])
n_components

In [None]:
embeddings = ds0["embeddings"]
pca = PCA(n_components=n_components)
pca.fit(embeddings)


In [None]:
# Visualise explained variance of clusters' info
explained_variance = np.cumsum(pca.explained_variance_ratio_)
plt.plot(range(1, 385), explained_variance)
plt.xlabel('Number of Principal Components')
plt.ylabel('Cumulative Explained Variance')
plt.grid(True)
plt.show()

In [None]:
# **Let**'s choose 150 PCs

# perform dimensionality reduction

In [None]:
N = 150 # PCs, for PCA
D = 3 # dimension to reduce (post-preprocessing )
embeddings = np.array(embeddings)

In [None]:
def post_process(X: np.ndarray, d: int) -> np.ndarray:
  """PPA: Subtract mean and remove top-d components via PCA projection."""
  X_centered = X - X.mean(axis=0)

  pca = PCA(n_components=D)
  pca.fit(X_centered)
  U = pca.components_

  for u in U:
    X_centered -= X_centered @ u[:, None] * u[None, :]

  return X_centered


In [None]:
def reduce_embeddings(X: np.ndarray, n_components: int, d: int = 1, plot_variance: bool = True) -> np.ndarray:
    """
    X - array with initial dimensions
    n_components - number of PCs to retain
    d - nember of PCs to remove (these PCs bring "noisy" data)
    """
    # Step 1: PPA on original embeddings
    X_purified = post_process(X, d)

    # Step 2: PCA dimensionality reduction
    pca = PCA(n_components=n_components)
    X_reduced = pca.fit_transform(X_purified)

    if plot_variance:
        explained = np.cumsum(pca.explained_variance_ratio_) * 100
        plt.figure(figsize=(10, 4))
        plt.plot(range(1, n_components + 1), explained, marker='o')
        plt.xlabel('Number of Components')
        plt.ylabel('Cumulative Explained Variance (%)')
        plt.title('PCA Explained Variance (After First PPA)')
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        for i in range(9, n_components+1, 10):
          print(f"{i+1} componets: {explained[i]:.2f}%")


    # Step 3: PPA on reduced embeddings
    X_final = post_process(X_reduced, d)

    return X_final

In [None]:
reduced_embeddings = reduce_embeddings(X=embeddings,
                                       n_components=N,
                                       d=D,
                                       plot_variance=True)


# Clusterize reduced dimensions embeddings


In [None]:
N_clusters = 1000
kmeans = MiniBatchKMeans(n_clusters=N_clusters, batch_size=512)
labels = kmeans.fit_predict(reduced_embeddings)

In [None]:
ds0 = ds0.add_column("label", labels)

# Let's form positive and negative pairs!

In [None]:
def generate_pairs(dataset, cluster_assignments, max_pos_per_cluster=100, num_negatives=100000):
    """
    dataset: HuggingFace Dataset
    cluster_assignments: List[int],
    returns positive (from one cluster) and negative(from different clusters) pairs"""
    cluster_to_indices = defaultdict(list)
    for idx, cluster_id in enumerate(cluster_assignments):
        cluster_to_indices[cluster_id].append(idx)

    positive_pairs = []
    for cluster_id, indices in cluster_to_indices.items():
        if len(indices) < 2:
            continue
        comb = list(combinations(indices, 2))
        random.shuffle(comb)
        for i, j in comb[:max_pos_per_cluster]:
            positive_pairs.append((dataset[i]['full_info'], dataset[j]['full_info'], 1.0))

    print(f"Generated {len(positive_pairs)} positive pairs")

    # Negative pairs
    all_indices = list(range(len(dataset)))
    negative_pairs = []
    while len(negative_pairs) < num_negatives:
        i, j = random.sample(all_indices, 2)
        if cluster_assignments[i] != cluster_assignments[j]:
            negative_pairs.append((dataset[i]['full_info'], dataset[j]['full_info'], 0.0))

    print(f"Generated {len(negative_pairs)} negative pairs")

    pairs = {
        "positive": positive_pairs,
        "negative": negative_pairs
    }
    return pairs


In [None]:
pairs = generate_pairs(ds0, cluster_assignments=labels,
                       max_pos_per_cluster=4,
                       num_negatives=5_000)

In [None]:
def truncate_text(text, max_tokens=512):
    return " ".join(text.split()[:max_tokens])

In [None]:
train_samples = pairs.get("positive") + pairs.get("negative")

random.shuffle(train_samples)

In [None]:
# cross-encoder expects a tokenised vector size <= 512, so roughly truncate texts
for i in range(len(train_samples)):
  sen_1, sen_2, label = train_samples[i]
  train_samples[i] = (truncate_text(sen_1, 400),
                      truncate_text(sen_2, 400), label)


In [None]:
train_inputs = []

for sent1, sent2, label in train_samples:
  train_inputs.append(InputExample(texts=[sent1, sent2], label=label))

In [None]:
train_data, val_data = train_test_split(train_inputs, test_size=0.1)
train_dataloader = DataLoader(train_data, shuffle=True, batch_size=16)


In [None]:
evaluator = CrossEncoderCorrelationEvaluator.from_input_examples(val_data, name="dev-set")

In [None]:
model_checkpoint = 'cross-encoder/ms-marco-MiniLM-L-6-v2'
cross_encoder = CrossEncoder(model_checkpoint, num_labels=1).cuda()
num_epochs = 3
warmup_steps = math.ceil(len(train_dataloader) * num_epochs * 0.1)
out_cross_encoder_path = "/content/drive/MyDrive/artistic_styles/paintings/augmenting_model/cross-encoder-artworks"

In [None]:
cross_encoder.fit(
    train_dataloader=train_dataloader,
    evaluator=evaluator,
    epochs=num_epochs,
    evaluation_steps=250,
    warmup_steps=warmup_steps,
    output_path="/content/drive/MyDrive/artistic_styles/paintings/augmenting_model/cross-encoder-artworks"
)


# Labeling silver the dataset to predict

In [None]:
silver_pairs = generate_pairs(ds0, cluster_assignments=labels,
                       max_pos_per_cluster=10,
                       num_negatives=10_000)

In [None]:
pos_pairs = random.sample(silver_pairs["positive"], 2000)
neg_pairs = random.sample(silver_pairs["negative"], 2000)

silver_pairs = pos_pairs + neg_pairs

In [None]:
silver_inputs = [[sen1, sen2] for sen1, sen2, score in silver_pairs]

In [None]:
scores = cross_encoder.predict(silver_inputs)

In [None]:
threshold = 0.7

silver_data = [
    InputExample(texts=[pair[0], pair[1]], label=float(score > threshold))
    for pair, score in zip(silver_inputs, scores)
]

## Training bi-encoder on silver data

In [None]:
train_dataloader = DataLoader(silver_data, shuffle=True, batch_size=16)  # або 32, залежно від GPU
checkpoint = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1"
bi_encoder = SentenceTransformer(checkpoint).cuda()
train_loss = losses.CosineSimilarityLoss(model=bi_encoder)
num_epochs = 3  # або 2-3, залежно від розміру silver dataset
warmup_steps = int(len(train_dataloader) * num_epochs * 0.1)


In [None]:
bi_encoder.fit(
    train_objectives=[(train_dataloader, train_loss)],
    epochs=num_epochs,
    warmup_steps=warmup_steps,
    output_path="/content/drive/MyDrive/artistic_styles/paintings/augmenting_model/bi-encoder-art-silver"
)

In [None]:
# !git config --global user.email "ms.anna.bozhenko.03@gmail.com"
# !git config --global user.name "Anna Bozhenko"

notebook_login()
bi_encoder.push_to_hub("anna-bozhenko/artworks-search-MiniLM-L6-cos-v1")