In [None]:
import torch
from torch import nn
import numpy as np
import scipy.spatial.distance 

from tqdm import tqdm
from sklearn.preprocessing import StandardScaler

In [None]:
#If you're working with Colab mount your drive or skip this step
from google.colab import drive
drive.mount('/content/drive')

In [None]:
torch.cuda.get_device_name() #check if you have chosen a gpu (otherwise use 'cpu' instead of 'cuda')

In [None]:
class TripletCosineLoss(torch.nn.Module):

    def __init__(self, margin=0.2, triplet=True):

        super(TripletCosineLoss, self).__init__()
        self.margin = margin
        self.triplet = triplet
        self.similarity = torch.nn.functional.cosine_similarity

    def forward(self, anchor, positive, negative):

        dp = self.similarity(anchor, positive)
        dn = self.similarity(anchor, negative)

        if self.triplet:
            loss = torch.mean(torch.clamp(dn - dp + self.margin, min=0.0))
        else:
            raise NotImplementedError

        return loss

In [None]:
class Imagedescription(torch.nn.Module):
    
    def __init__(self, input_size, hidden_size, output_size):
        super(Imagedescription, self).__init__()       # Inherited from the parent class nn.Module
        self.fc1 = nn.Linear(input_size, hidden_size)  # 1st Full-Connected Layer: (input data) -> (hidden node)
        self.relu = nn.ReLU()                          
        self.fc2 = nn.Linear(hidden_size, output_size) # 2nd Full-Connected Layer: (hidden node) -> (output code)
        self.tanh = nn.Tanh()

    def forward(self, x):                              # Forward pass: stacking each layer together
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return 0.5*x + self.tanh(out)

In [None]:
annotations = open('/path/to/file/trainingdata.txt').readlines()
annotations = [l.strip().split(',') for l in annotations]

In [None]:
feature_file = '/path/to/file/BiT-m-r152x4_feature.npz'
feature_dict = np.load(feature_file)

feature_names = [i.strip("'") for i in list(feature_dict.keys())]
features = np.array(list(feature_dict.values()))

scaler = StandardScaler()
features = scaler.fit_transform(features)

feature_dict = {n: f for n, f in zip(feature_names, features)}

In [None]:
class KuratorDataset(torch.utils.data.Dataset):

    def __init__(self, kuration_list, feature_dict):
        self.kuration_list = kuration_list
        self.feature_dict = feature_dict

    def __len__(self):
        return len(self.kuration_list)

    def __getitem__(self, idx):
        annotation = self.kuration_list[idx]
        a, p, n = annotation[0], annotation[1],  np.random.choice(annotation[2:])
        return self.feature_dict[a], self.feature_dict[p], self.feature_dict[n] 

In [None]:
len(annotations)

In [None]:
ds = KuratorDataset(annotations, feature_dict)

train_set, val_set = torch.utils.data.random_split(ds, [750, 250]) #split your data into training- and testset.

In [None]:
dataloader = torch.utils.data.DataLoader(
    train_set,
    shuffle = True,
    batch_size = 50,
    num_workers = 4
    )

dataloader_val = torch.utils.data.DataLoader(
    val_set,
    shuffle = True,
    batch_size = 50,
    num_workers = 4
    )

In [None]:
loss_func = TripletCosineLoss()

In [None]:
features.shape

In [None]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        m.weight.data.normal_(0.0, 0.01)
        m.bias.data.fill_(0)

In [None]:
model = Imagedescription(8192, 8, 8192) #check the shape of your features
model.apply(weights_init)

model.to('cpu') 

train_history = []
val_history = []

In [None]:
optimizer = torch.optim.Adam(
    model.parameters(), 
    lr=1E-4, 
    weight_decay=0.01 #the higher the more conservative
)

In [None]:
def mean_rank():
    ranks = []
    for index in val_set.indices:
        anno = annotations[index]
        with torch.no_grad():
            features = np.vstack([feature_dict[k] for k in anno])
            new_features = model(torch.tensor(features).to('cpu')).cpu().detach().numpy() 
            simil = []
            for i in range(1, len(anno)):
                simil.append(1 - scipy.spatial.distance.cosine(new_features[0], new_features[i]))
            rank = np.argsort(simil)[0]
            ranks.append(rank)
    return np.mean(ranks)

def val_loss():
    val_loss = 0
    nk = 10
    for k in range(nk):
        for a, p, n in dataloader_val:

            code_a = model(a.to('cpu')) 
            code_p = model(p.to('cpu'))
            code_n = model(n.to('cpu'))

            loss = loss_func(code_a, code_p, code_n)
            val_loss += loss.item()
    return val_loss/nk/len(dataloader_val)

In [None]:
for epoch in range(300):
    
    with tqdm(total=len(dataloader), desc=f'Epoch') as pbar:
        model.train()

        train_loss = 0
        for a, p, n in dataloader:

            optimizer.zero_grad()

            code_a = model(a.to('cpu'))
            code_p = model(p.to('cpu'))
            code_n = model(n.to('cpu'))

            loss = loss_func(code_a, code_p, code_n)
            train_loss += loss.item()

            pbar.set_postfix_str(f'Loss: {loss.item():.3f}')
            pbar.update(1)
            
            loss.backward()
            optimizer.step()
        
        model.eval()
        train_history.append(train_loss/len(dataloader))
        val_history.append(val_loss())
        pbar.set_postfix_str(f'Loss: {train_history[-1]:.3f}, Val Loss: {val_history[-1]:.3f}, Val Rank: {mean_rank():.3f}')


In [None]:
import matplotlib.pyplot as plt

plt.plot(train_history)
plt.plot(val_history)

plt.grid()

In [None]:
new_features = {}

model.eval()
for k, v in feature_dict.items():
    new_features[k] = model(torch.tensor(v).to('cpu')).cpu().detach().numpy()

In [None]:
len(new_features)

In [None]:
feature_file = '/create/a/path/to/save/file/relearned_BiT-M152x4.npz'
np.savez(feature_file, **new_features)

In [None]:
#review the scores of the network to evaluate the success of the training

In [None]:
import os
from tensorflow.keras.preprocessing import image
scraped_images_folder = '/set/the/path/to/your/scraped/images/'

In [None]:
def vis(image_name, feature2compare):
    im_name = os.path.join(scraped_images_folder, image_name.strip("'"))
    im = image.load_img(im_name, target_size=(224,224))
    plt.imshow(im)
    feature = model(torch.tensor(feature_dict[image_name]).to('cpu')) #'cuda' für gpu 
    score = torch.nn.functional.cosine_similarity(feature, feature2compare, dim=0).cpu().detach().numpy()
    plt.title(f'score: {score:.02f}')
    plt.axis(False)

In [None]:
# untrained
np.sort(val_set.indices)

In [None]:
to_show = annotations[25]

a, b = to_show[:2]
main_feature = model(torch.tensor(feature_dict[a]).to('cpu'))

plt.figure(figsize=(5,10))
plt.subplot(121)
vis(a, main_feature)
plt.subplot(122)
vis(b, main_feature)

plt.figure(figsize=(20,3))
for i, c in enumerate(to_show[2:]):
    plt.subplot(1,8,i+1)
    vis(c, main_feature)