In [1]:
import pandas as pd

# Loading dataset
ind_df = pd.read_csv('data/indigenous_collection_processed.csv', index_col='id')
print(f'Dataframe columns: \n{ind_df.columns}')

Dataframe columns: 
Index(['url', 'thumbnail', 'creation_date', 'modification_date',
       'numero_do_item', 'tripticos', 'categoria', 'nome_do_item',
       'nome_do_item_dic', 'colecao', 'coletor', 'doador', 'modo_de_aquisicao',
       'data_de_aquisicao', 'ano_de_aquisicao', 'data_de_confeccao', 'autoria',
       'nome_etnico', 'descricao', 'dimensoes', 'funcao', 'materia_prima',
       'tecnica_confeccao', 'descritor_tematico', 'descritor_comum',
       'numero_de_pecas', 'itens_relacionados', 'responsavel_guarda',
       'inst_detentora', 'povo', 'autoidentificacao', 'lingua',
       'estado_de_origem', 'geolocalizacao', 'pais_de_origem', 'exposicao',
       'referencias', 'disponibilidade', 'qualificacao', 'historia_adm',
       'notas_gerais', 'observacao', 'conservacao', 'image_path'],
      dtype='object')


In [2]:
from IPython.core.magic import register_cell_magic

# Creating skip cell command
@register_cell_magic
def skip(line, cell):
    return

# Image Clustering

Clustering experiments with image feature extractors. The idea is to fine-tune some pre-trained models on our dataset and then remove the last layer of the model to cluster on the embedding space projections.

## Dataset Preparation

For fine-tuning the model on our dataset, we are going to try a few different labels and study how they affect the generated emebdding space. For now, we focus *povo* and *categoria*.

In [3]:
from PIL import Image

# Filtering out corrupted images
corrupted_images = []
for index, row in ind_df.loc[ind_df['image_path'].notna()].iterrows():
    try:
        Image.open(row['image_path'])
    except Exception as e:
        corrupted_images.append(row['image_path'])
        ind_df.loc[index, 'image_path'] = pd.NA
print(f'{len(corrupted_images)} corrupted images')

# Creating 'image_path_br' column
ind_df['image_path_br'] = ind_df['image_path'].values
ind_df.loc[ind_df['image_path_br'].notna(), 'image_path_br'] = \
    ind_df.loc[ind_df['image_path_br'].notna(), \
               'image_path'].apply(lambda path: \
                                   f"data/br_images/{path.split('/')[-1].split('.')[0]}.png")

1 corrupted images


## ViT Base Patch-16

### Pre-trained Embedding Space

In [4]:
import torch
from torchvision import transforms
from training_utils import preparing_image_labels, ImageDataset

# Getting the proper device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Building dataset for column 'povo' (though no specific column is used on off-the-shelf model)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
labels, name_to_num, num_to_name = preparing_image_labels(ind_df, 'povo')
dataset = ImageDataset(labels, transform=transform, augment=False)

In [None]:
# Projecting data onto the off-the-shelf pre-trained embedding space from ViT
from tqdm import tqdm
import numpy as np
from torch.utils.data import DataLoader
from transformers import ViTImageProcessor, ViTModel
from training_utils import get_vit_embeddings, data_projections

# Loading model
model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
model.to(device)

# Getting data
dataloader = DataLoader(dataset, batch_size=512, shuffle=True, num_workers=0, pin_memory=True)

# Computing image embeddings
image_embeddings = np.concatenate(get_vit_embeddings(model, dataloader, device), axis=0)

# Computing data projection
vanilla_vit_trimap, vanilla_vit_tsne, vanilla_vit_umap = data_projections(image_embeddings)

Computing embeddings: 100%|████| 23/23 [02:40<00:00,  6.98s/it]


In [None]:
from training_utils import clean_mem

# Cleaning up memory
clean_mem([model, image_embeddings])

### Fine-tuning Embedding Space

In [None]:
# Creating our own ViT classifier head for fine-tuning
import torch.nn as nn

class ViTClassifier(nn.Module):
    def __init__(self, num_classes):
        super(ViTClassifier, self).__init__()
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
        self.classifier = nn.Linear(self.vit.config.hidden_size, num_classes)

    def forward(self, x):
        outputs = self.vit(x)
        
        # Do I get the last_hidden_state of CLS token or the pooler_output?
        embeddings = outputs['last_hidden_state'][:, 0, :]
        # embeddings = outputs['pooler_output']

        logits = self.classifier(embeddings)
        return logits

#### *povo* Column

In [None]:
# Counting categories
categories = {}
for l in labels.values():
    try:
        categories[l] += 1
    except:
        categories[l] = 1
categories = dict(sorted(categories.items()))
categories_keys = list(categories.keys())
categories_freq = np.array(list(categories.values()))

# Studying data distribution to filter out rare classes
total_data = categories_freq.sum()
q_25, q_50, q_75, q_90 = np.quantile(categories_freq, 0.25), \
np.quantile(categories_freq, 0.50), np.quantile(categories_freq, 0.75), \
np.quantile(categories_freq, 0.90)
mask_25, mask_50, mask_75, mask_90 = np.where(categories_freq > q_25), \
np.where(categories_freq > q_50), np.where(categories_freq > q_75), \
np.where(categories_freq > q_90)

print('Quantile X Data Percentage:')
print(f'''Q-25: {q_25:.2f}, {categories_freq[mask_25].sum()/total_data*100:.2f}% of data''')
print(f'''Q-50: {q_50:.2f}, {categories_freq[mask_50].sum()/total_data*100:.2f}% of data''')
print(f'''Q-75: {q_75:.2f}, {categories_freq[mask_75].sum()/total_data*100:.2f}% of data''')
print(f'''Q-90: {q_90:.2f}, {categories_freq[mask_90].sum()/total_data*100:.2f}% of data\n''')

# Filtering classes so that we retain around 85% of data
filtered_categories = {}
filtered_categories_names = {}
for c in mask_75[0]:
    filtered_categories[categories_keys[c]] = categories[categories_keys[c]]
    filtered_categories_names[num_to_name[categories_keys[c]]] = categories[categories_keys[c]]

In [None]:
from torch.utils.data import ConcatDataset

# Filtering dataframe for selected categories
filtered_povo_ind_df = ind_df[ind_df['povo'].isin(list(filtered_categories_names.keys())) & \
                              ind_df['image_path'].notna()]

# Selecting minority and majority classes
filtered_categories_freq = np.array(list(filtered_categories_names.values()))
threshold = 2*np.median(filtered_categories_freq)

minority_classes = []
majority_classes = []
for k, v in filtered_categories_names.items():
    if v <= threshold:
        minority_classes.append(k)
    else:
        majority_classes.append(k)

minority_povo_ind_df=filtered_povo_ind_df[filtered_povo_ind_df['povo'].isin(minority_classes)]
majority_povo_ind_df=filtered_povo_ind_df[filtered_povo_ind_df['povo'].isin(majority_classes)]

# Undersampling majority classes
undersampled_majority_povo_ind_df = (
    majority_povo_ind_df
    .groupby('povo', group_keys=False)
    .apply(lambda x: x.sample(n=min(300, len(x)), replace=False))
)

# Creating augmented dataset for training
labels_minority, _, _ = preparing_image_labels(minority_povo_ind_df, 'povo')
labels_majority, _, _ = preparing_image_labels(undersampled_majority_povo_ind_df, 'povo')

minority_multiplier = 2
minority_datasets = [ImageDataset(labels_minority, transform=transform, augment=True) \
                     for i in range(minority_multiplier)]
minority_datasets.append(ImageDataset(labels_minority, transform=transform, augment=False))

majority_multiplier = 1
majority_datasets = [ImageDataset(labels_majority, transform=transform, augment=True) \
                     for i in range(majority_multiplier)]
majority_datasets.append(ImageDataset(labels_majority, transform=transform, augment=False))

augmented_dataset = ConcatDataset(minority_datasets + majority_datasets)

In [None]:
# labels_majority

In [None]:
import matplotlib.pyplot as plt
dataloader = DataLoader(augmented_dataset, batch_size=16, shuffle=True, \
                        num_workers=0, pin_memory=True)
print(len(dataloader))
for batch_images, batch_labels, batch_idx in dataloader:
    print(batch_idx[0])
    # print()
    mean=torch.tensor([0.5, 0.5, 0.5]).view(3,1,1)
    std=torch.tensor([0.5, 0.5, 0.5]).view(3,1,1)
    image = batch_images[0]*std + mean
    plt.imshow(image.permute(1, 2, 0).numpy())
    plt.axis('off')
    plt.show
    break

In [None]:
from sklearn.utils.class_weight import compute_class_weight

# Class weights in loss function
num_classes = len(filtered_categories)
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique())

In [11]:
from training_utils import get_train_val_test_split, train_loop
from training_utils import plot_train_curves, evaluate_model
from torch.utils.data import random_split
import torch.optim as optim

# Creating training, validation and test datasets
train_size = int(0.75*len(dataset))
val_size = int(0.15*len(dataset))
batch_size = 32
train_dataloader, val_dataloader, \
test_dataloader = get_train_val_test_split(dataset, train_size, val_size, batch_size)

# Training set-up and execution for 'povo'
num_classes = ind_df['povo'].nunique()
model = ViTClassifier(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters(), lr=5e-5, weight_decay=0)
epochs = 30

losses, accuracies, class_precisions, class_recalls = train_loop(model, num_classes, \
                                                                 train_dataloader, \
                                                                 val_dataloader, device, \
                                                                 criterion, opt, \
                                                                 'vit_povo', epochs)
plot_train_curves(losses, accuracies, "ViT Fine-Tuned on 'povo'")
print(f'Per class precision: {class_precisions[-1]}')
print(f'Per class recall: {class_recalls[-1]}')

# Evaluating model on test dataset
test_acc, test_prec, test_rec = evaluate_model(model, 'vit_povo', num_classes, \
                                               test_dataloader, device)
print(f'Test accuracy: {test_acc}')
print(f'Test per class precisions: {test_prec}')
print(f'Test per class recalls: {test_rec}')

# Computing image embeddings
model.classifier = nn.Identity()
image_embeddings = np.concatenate(get_vit_embeddings(model, dataloader, device, True), axis=0)

# Computing data projection
povo_vit_trimap, povo_vit_tsne, povo_vit_umap = data_projections(image_embeddings)

Training model:   0%|                   | 0/30 [04:10<?, ?it/s]


ValueError: not enough values to unpack (expected 3, got 2)

In [None]:
# Retraining model with augmented dataset to see the difference in the results
train_size = int(0.75*len(augmented_dataset))
val_size = int(0.15*len(augmented_dataset))
batch_size = 32
train_dataloader, val_dataloader, \
test_dataloader = get_train_val_test_split(augmented_dataset, train_size, val_size, batch_size)

# Training set-up and execution for 'povo'
num_classes = ind_df['povo'].nunique()
model = ViTClassifier(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters(), lr=5e-5, weight_decay=0)
epochs = 30

losses, accuracies, class_precisions, class_recalls = train_loop(model, num_classes, \
                                                                 train_dataloader, \
                                                                 val_dataloader, device, \
                                                                 criterion, opt, \
                                                                 'balanced_vit_povo', epochs)
plot_train_curves(losses, accuracies, "ViT Fine-Tuned on 'povo'")
print(f'Per class precision: {class_precisions[-1]}')
print(f'Per class recall: {class_recalls[-1]}')

# Evaluating model on test dataset
test_acc, test_prec, test_rec = evaluate_model(model, 'vit_povo', num_classes, \
                                               test_dataloader, device)
print(f'Test accuracy: {test_acc}')
print(f'Test per class precisions: {test_prec}')
print(f'Test per class recalls: {test_rec}')

# Computing image embeddings
model.classifier = nn.Identity()
image_embeddings = np.concatenate(get_vit_embeddings(model, dataloader, device, True), axis=0)

# Computing data projection
povo_vit_trimap, povo_vit_tsne, povo_vit_umap = data_projections(image_embeddings)

#### *categoria* Column

In [None]:
# Cleaning up memory
clean_mem([model, image_embeddings])

# Preparing dataset for next training process
labels, name_to_num, num_to_name = preparing_image_labels(ind_df, 'categoria')
dataset = ImageDataset(labels, transform=transform)

train_size = int(0.75*len(dataset))
val_size = int(0.15*len(dataset))
batch_size = 32
train_dataloader, val_dataloader, \
test_dataloader = get_train_val_test_split(dataset, train_size, val_size, batch_size)

# Training set-up and execution for 'categoria'
num_classes = ind_df['categoria'].nunique()
model = ViTClassifier(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters(), lr=2e-5, weight_decay=0)
epochs = 30

losses, accuracies, class_precisions, class_recalls = train_loop(model, num_classes, \
                                                                 train_dataloader, \
                                                                 val_dataloader, device, \
                                                                 criterion, opt, \
                                                                 'vit_categoria', epochs)
plot_train_curves(losses, accuracies, "ViT Fine-Tuned on 'categoria'")
print(f'Per class precision: {class_precisions[-1]}')
print(f'Per class recall: {class_recalls[-1]}')

# Evaluating model on test dataset
test_acc, test_prec, test_rec = evaluate_model(model, 'vit_categoria', num_classes, \
                                               test_dataloader, device)
print(f'Test accuracy: {test_acc}')
print(f'Test per class precisions: {test_prec}')
print(f'Test per class recalls: {test_rec}')

# Computing image embeddings
model.classifier = nn.Identity()
image_embeddings = np.concatenate(get_vit_embeddings(model, dataloader, device, True), axis=0)

# Computing data projection
categoria_vit_trimap, categoria_vit_tsne, \
categoria_vit_umap = data_projections(image_embeddings)

In [None]:
# Cleaning up memory
clean_mem([model, image_embeddings])

### Visualizing and Comparing Projections

In [None]:
from training_utils import normalize

# Normalizing data for later plot on tool
norm_factor = 12
vanilla_vit_trimap = normalize(vanilla_vit_trimap, norm_factor)
vanilla_vit_tsne = normalize(vanilla_vit_tsne, norm_factor)
vanilla_vit_umap = normalize(vanilla_vit_umap, norm_factor)

povo_vit_trimap = normalize(povo_vit_trimap, norm_factor)
povo_vit_tsne = normalize(povo_vit_tsne, norm_factor)
povo_vit_umap = normalize(povo_vit_umap, norm_factor)

categoria_vit_trimap = normalize(categoria_vit_trimap, norm_factor)
categoria_vit_tsne = normalize(categoria_vit_tsne, norm_factor)
categoria_vit_umap = normalize(categoria_vit_umap, norm_factor)

In [None]:
# Visualizing resulting projections
plt.figure(figsize=(12,8))
plt.suptitle('Comparing Projections of ViT Models')

# Plotting vanilla ViT projections
for i, (vanilla_vit, proj_name) in enumerate(zip([vanilla_vit_trimap, \
                                                  vanilla_vit_tsne, vanilla_vit_umap], \
                                                 ['TriMap', 't-SNE', 'UMAP'])):
    plt.subplot(3, 3, i+1)
    plt.scatter(vanilla_vit[:, 0], vanilla_vit[:, 1], c='b')
    plt.title("Vanilla ViT with " + proj_name)
    plt.xlabel("")
    plt.ylabel("")
    plt.xticks([])
    plt.yticks([])

# Plotting ViT fine-tuned on 'povo' projections
for i, (povo_vit, proj_name) in enumerate(zip([povo_vit_trimap, \
                                               povo_vit_tsne, povo_vit_umap], \
                                              ['TriMap', 't-SNE', 'UMAP'])):
    plt.subplot(3, 3, i+4)
    plt.scatter(povo_vit[:, 0], povo_vit[:, 1], c='r')
    plt.title("ViT Fine-Tuned on 'povo' with " + proj_name)
    plt.xlabel("")
    plt.ylabel("")
    plt.xticks([])
    plt.yticks([])

# Plotting ViT fine-tuned on 'categoria' projections
for i, (categoria_vit, proj_name) in enumerate(zip([categoria_vit_trimap, \
                                                    categoria_vit_tsne, categoria_vit_umap], \
                                                   ['TriMap', 't-SNE', 'UMAP'])):
    plt.subplot(3, 3, i+7)
    plt.scatter(categoria_vit[:, 0], categoria_vit[:, 1], c='g')
    plt.title("ViT Fine-Tuned on 'categoria' with " + proj_name)
    plt.xlabel("")
    plt.ylabel("")
    plt.xticks([])
    plt.yticks([])

plt.tight_layout()
plt.show()

### Visualizing Clusters

In [None]:
# Filtering dataframe to get only the part that contains images
filtered_df = ind_df.loc[ind_df['image_path'].notna()]

# Building colormap for cluster visualization
column = 'categoria' # 'povo', 'categoria', 'ano_de_aquisicao'
unique_values = filtered_df[column].unique()
colors = plt.cm.gnuplot(np.linspace(0, 1, len(unique_values)))
color_dict = {cluster: colors[i] for i, cluster in enumerate(unique_values)}

# Plotting projections with clusters
plt.figure(figsize=(10,4))

for cluster in unique_values:
    mask = filtered_df.index[filtered_df[column] == cluster].tolist()
    sequential_indices = np.array([filtered_df.index.get_loc(idx) for idx in mask])
    plt.scatter(categoria_vit_umap[sequential_indices, 0], \
                categoria_vit_umap[sequential_indices, 1], 
                color=color_dict[cluster], label=f"{cluster.title()}", alpha=0.7)

plt.title(f"Visualizing Clusters for Categoria on UMAP Projection")
plt.xlabel("")
plt.ylabel("")
plt.xticks([])
plt.yticks([])
plt.legend(title="Clusters", bbox_to_anchor=(1.05, 1), loc="upper left", \
           fontsize=8, frameon=True)

plt.tight_layout()
plt.show()

In [44]:
for a, b, c in dataloader:
    print(c)
    break

ValueError: not enough values to unpack (expected 3, got 2)

In [40]:
# Saving outputs for visualization tool
# print(labels)
mask1 = filtered_df.index[filtered_df[column] == unique_values[5]].tolist()
mask2 = filtered_df.index[filtered_df[column] == unique_values[8]].tolist()

sequence1 = np.array([filtered_df.index.get_loc(idx) for idx in mask1])
sequence2 = np.array([filtered_df.index.get_loc(idx) for idx in mask2])

print(len(set(sequence1).union(set(sequence2))) == len(sequence1)+len(sequence2))

True
