# 1. Data preparation

(Consider splitting into train/val/test considering the affordances of the objects instead of randomly, so that e.g. pen, telescope and laptop are in the train set and pencil, microscope and desktop computer in the test set.)


(Also, consider adding multiple images of each object. This way, the model can train on mapping object with its affordances multiple times.)

In [None]:
from transformers import VisualBertModel, BertModel, BertTokenizer
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch.optim as optim

In [None]:
def clean_up(object_name):
    clean_object_name = ''
    for char in object_name:
        if char == '_':
            clean_object_name += ' '
        elif char == '.':
            break
        else:
            clean_object_name += char
    return clean_object_name

In [None]:
file = '../data/affordance_annotations.txt'
df = pd.read_csv(file)
df.rename(columns = {'Unnamed: 0':'Object'}, inplace = True)
df['Object'] = df['Object'].apply(clean_up)
df.columns = ['Object','ImageNet synset','grasp','lift','throw','push','fix','ride','play','watch','sit on','feed','row','pour from','look through','write with', 'type on']

In [None]:
df

In [None]:
unique_objects = list(df['Object'])
unique_affordances = [affordance.lower() for affordance in df.columns[2:]]

In [None]:
unique_objects

In [None]:
unique_affordances

In [None]:
word_to_index = {}
index_to_word = {}
indices = list(range(77))

for i, word in enumerate(unique_objects + unique_affordances):
    word_to_index[word] = i
    index_to_word[i] = word

In [None]:
word_to_index['automobile engine']

In [None]:
index_to_word[0]

In [None]:
df

In [None]:
baseline_dict_objects = dict.fromkeys(unique_objects, 0)
for index, row in df.iterrows():
        for i, value in enumerate(row):
            if type(value) == str:
                pass
            else:
                baseline_dict_objects[row[0]] += value
                
baseline_total_objects = 0
for k,v in baseline_dict_objects.items():
    baseline_dict_objects[k] = np.round((v * 100)/15, 2)
    baseline_total_objects += v

baseline_total_objects = np.round((baseline_total_objects/(15*62))*100,2)
print(f'{100-baseline_total_objects} %')

In [None]:
baseline_dict_objects

In [None]:
baseline_dict_affordances = dict.fromkeys(unique_affordances, 0)

for index, row in df.iterrows():
    for k in baseline_dict_affordances.keys():
        baseline_dict_affordances[k] += row[k]
        
baseline_total_affordances = 0
for k,v in baseline_dict_affordances.items():
    baseline_dict_affordances[k] = np.round((v * 100)/62, 2)
    baseline_total_affordances += v

baseline_total_affordances = np.round((baseline_total_affordances/(15*62))*100,2)
print(f'{baseline_total_objects} %')

In [None]:
baseline_dict_affordances

## 1.1 Pairs of objects and their affordances

In [None]:
df1 = df.sample(frac=1, random_state=42).reset_index(drop=True)

In [None]:
train1 = df1[:42]
val1 = df1[42:52]
test1 = df1[52:]

In [None]:
def get_gold_data_1(table):
    gold_data_pairs = []
    for index, row in table.iterrows():
        for i, value in enumerate(row):
            if type(value) == str:
                pass
            else:
                gold_data_pairs.append((row[0],table.columns[i].lower(),value))
    return gold_data_pairs

In [None]:
train1_pairs = get_gold_data_1(train1)
val1_pairs = get_gold_data_1(val1)
test1_pairs = get_gold_data_1(test1)

In [None]:
train1_pairs

In [None]:
val1_pairs

# 2. Extracting Embeddings

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
def tokenize_string(text):
    marked_text = "[CLS] " + text + " [SEP]"
    return tokenizer.tokenize(marked_text)

## 2.1 BERT Embeddings

In [None]:
bert_model = BertModel.from_pretrained('bert-base-uncased',
                                  output_hidden_states = True, # Whether the model returns all hidden-states.
                                  )

In [None]:
bert_model.eval()

In [None]:
bert_word_to_embedding = {} # I create this embeddings dictionary so I can easily map words to embeddings

with torch.no_grad():
    
    for subset in [train1_pairs + val1_pairs + test1_pairs]:
    
        for obj, affordance, truth_val in subset:

            if obj not in bert_word_to_embedding.keys():
                tokenized_obj = tokenize_string(obj)
                indexed_obj = tokenizer.convert_tokens_to_ids(tokenized_obj)
                segments_ids = [1] * len(tokenized_obj)
                tokens_tensor = torch.tensor([indexed_obj])
                segments_tensor = torch.tensor([segments_ids])

                outputs = bert_model(tokens_tensor, segments_tensor)
                hidden_states = outputs[2]
                token_vecs = hidden_states[-2][0] # I take the penultimate layer
                obj_embedding = torch.mean(token_vecs, dim=0) # I take the mean over the vectors for each token to get a representation of the whole input

                bert_word_to_embedding[obj] = obj_embedding

            if affordance not in bert_word_to_embedding.keys():
                tokenized_affordance = tokenize_string(affordance)
                indexed_affordance = tokenizer.convert_tokens_to_ids(tokenized_affordance)
                segments_ids = [1] * len(tokenized_affordance)
                tokens_tensor = torch.tensor([indexed_affordance])
                segments_tensor = torch.tensor([segments_ids])

                outputs = bert_model(tokens_tensor, segments_tensor)
                hidden_states = outputs[2]
                token_vecs = hidden_states[-2][0]
                affordance_embedding = torch.mean(token_vecs, dim=0)

                bert_word_to_embedding[affordance] = affordance_embedding


In [None]:
bert_word_to_embedding['coffee cup']

In [None]:
bert_word_to_embedding['grasp'].size()

In [None]:
len(bert_word_to_embedding)

In [None]:
bert_embedding_to_word = dict(map(reversed, bert_word_to_embedding.items()))

In [None]:
len(bert_embedding_to_word)

In [None]:
bert_embedding_to_word[bert_word_to_embedding['coffee cup']]

## 2.2 VisualBERT Embeddings

In [None]:
visual_bert_model = VisualBertModel.from_pretrained("uclanlp/visualbert-vqa-coco-pre",output_hidden_states=True)

In [None]:
visual_bert_word_to_embedding = {} # I create this embeddings dictionary so I can easily map words to embeddings

with torch.no_grad():
    
    for subset in [train1_pairs + val1_pairs + test1_pairs]:
    
        for obj, affordance, truth_val in subset:

            if obj not in visual_bert_word_to_embedding.keys():
                tokenized_obj = tokenize_string(obj)
                indexed_obj = tokenizer.convert_tokens_to_ids(tokenized_obj)
                segments_ids = [1] * len(tokenized_obj)
                tokens_tensor = torch.tensor([indexed_obj])
                segments_tensor = torch.tensor([segments_ids])

                outputs = visual_bert_model(tokens_tensor, segments_tensor)
                hidden_states = outputs[2]
                token_vecs = hidden_states[-2][0] # I take the penultimate layer
                obj_embedding = torch.mean(token_vecs, dim=0) # I take the mean over the vectors for each token to get a representation of the whole input

                visual_bert_word_to_embedding[obj] = obj_embedding

            if affordance not in visual_bert_word_to_embedding.keys():
                tokenized_affordance = tokenize_string(affordance)
                indexed_affordance = tokenizer.convert_tokens_to_ids(tokenized_affordance)
                segments_ids = [1] * len(tokenized_affordance)
                tokens_tensor = torch.tensor([indexed_affordance])
                segments_tensor = torch.tensor([segments_ids])

                outputs = visual_bert_model(tokens_tensor, segments_tensor)
                hidden_states = outputs[2]
                token_vecs = hidden_states[-2][0]
                affordance_embedding = torch.mean(token_vecs, dim=0)

                visual_bert_word_to_embedding[affordance] = affordance_embedding

In [None]:
visual_bert_word_to_embedding['coffee cup']

In [None]:
len(visual_bert_word_to_embedding)

In [None]:
visual_bert_embedding_to_word = dict(map(reversed, visual_bert_word_to_embedding.items()))

In [None]:
len(visual_bert_embedding_to_word)

In [None]:
visual_bert_embedding_to_word[visual_bert_word_to_embedding['coffee cup']]

# 3. The probe model

In [None]:
bert_hyperparameters = {
    "model":"bert_probe",
    "epochs":300,
    "batch_size":64,
    "learning_rate":0.005}

In [None]:
visual_bert_hyperparameters = {
    "model":"visual_bert_probe",
    "epochs":300,
    "batch_size":64,
    "learning_rate":0.005}

In [None]:
class Probe(nn.Module):
    def __init__(self):
        super(Probe, self).__init__()
        self.sigmoid = nn.Sigmoid()
        self.fc1 = nn.Linear(768, 128)
        self.fc2 = nn.Linear(128,2)
        self.softmax = nn.LogSoftmax(1)
        
    def forward(self, obj, affordance):
        combined_vector = obj * affordance
        x1 = self.sigmoid(combined_vector)
        x2 = self.fc1(x1)
        x3 = self.fc2(x2)
        output = self.softmax(x3)
        return output

# 4. Training

In [None]:
train_data = [(bert_word_to_embedding[x], bert_word_to_embedding[y], visual_bert_word_to_embedding[x], visual_bert_word_to_embedding[y], z, word_to_index[x], word_to_index[y]) for x,y,z in train1_pairs]
val_data = [(bert_word_to_embedding[x], bert_word_to_embedding[y], visual_bert_word_to_embedding[x], visual_bert_word_to_embedding[y], z, word_to_index[x], word_to_index[y]) for x,y,z in val1_pairs]
test_data = [(bert_word_to_embedding[x], bert_word_to_embedding[y], visual_bert_word_to_embedding[x], visual_bert_word_to_embedding[y],z, word_to_index[x], word_to_index[y]) for x,y,z in test1_pairs]

train_dataloader = DataLoader(train_data, batch_size=bert_hyperparameters["batch_size"], shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=bert_hyperparameters["batch_size"], shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=bert_hyperparameters["batch_size"], shuffle=True)

In [None]:
# The structure in each bach is: 
# bert object embedding, 
# bert affordance embedding, 
# visualbert object embedding, 
# visualbert affordance embedding, 
# truth values,
# object id
# affordance id

In [None]:
next(iter(train_dataloader))

## 4.1 Training the BERT Probe

In [None]:
#device = "cuda:3" if torch.cuda.is_available() else "cpu"
device = "cpu"

In [None]:
def plot_accuracy(epochs, train_acc, val_acc):
    plt.title("Training and Validation Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.plot(epochs, train_acc, label="Training Accuracy")
    plt.plot(epochs, val_acc, label="Validation Accuracy")
    plt.legend()
    plt.show()
    return

In [None]:
def plot_loss(epochs, train_loss, val_loss):
    plt.title("Training and Validation Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.plot(epochs, train_loss, label="Training Loss")
    plt.plot(epochs, val_loss, label="Validation Loss")
    plt.legend()
    plt.show()
    return

In [None]:
bert_probe = Probe()
print(bert_probe)
bert_probe.to(device)

criterion = nn.NLLLoss()
optimizer = optim.Adam(
    bert_probe.parameters(),
    lr=bert_hyperparameters["learning_rate"]
)

epoch_list = []
val_loss_list = []
train_loss_list = []
total_loss = 0

train_accuracy_list = []
val_accuracy_list = []

for epoch in range(bert_hyperparameters["epochs"]):
    
    # TRAIN LOOP
    training_loss = 0
    bert_probe.train()
    
    epoch_accuracy = 0
    
    for i, batch in enumerate(train_dataloader):
        
        obj = batch[0]
        affordance = batch[1]
        truth_value = batch[4]
        
        output = bert_probe(obj, affordance)
        bert_loss = criterion(output,truth_value)
        
        bert_loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        total_loss += bert_loss.item()
        training_loss += bert_loss.item()
        
        # calculate training accuracy
        prediction = torch.argmax(output, dim=1)
        correct_predictions = torch.eq(prediction,truth_value).long()
        batch_accuracy = float(sum(correct_predictions)/len(correct_predictions))
        epoch_accuracy += batch_accuracy
    
    # VALIDATION LOOP
    validation_loss = 0
    bert_probe.eval()
    
    val_epoch_accuracy = 0
    
    for i, batch in enumerate(val_dataloader):
        
        obj = batch[0]
        affordance = batch[1]
        truth_value = batch[4]
        
        output = bert_probe(obj, affordance)
        bert_loss = criterion(output,truth_value)
        validation_loss += bert_loss.item()
        
        # calculate validation accuracy
        prediction = torch.argmax(output, dim=1)
        correct_predictions = torch.eq(prediction,truth_value).long()
        batch_accuracy = float(sum(correct_predictions)/len(correct_predictions))
        val_epoch_accuracy += batch_accuracy
    
    epoch_list.append(epoch+1)
    training_loss_avg = training_loss/len(train_dataloader)
    train_loss_list.append(training_loss_avg)
    validation_loss_avg = validation_loss/len(val_dataloader)
    val_loss_list.append(validation_loss_avg)
    
    train_accuracy_list.append(epoch_accuracy/len(train_dataloader))
    val_accuracy_list.append(val_epoch_accuracy/len(val_dataloader))

    print("Epoch: {}".format(epoch+1))
    print("Training loss: {}".format(training_loss_avg))
    print("Validation loss: {}".format(validation_loss_avg))
    print("Training accuracy: {}".format(epoch_accuracy/len(train_dataloader)))
    print("Validation accuracy: {}".format(val_epoch_accuracy/len(val_dataloader)))
    
plot_loss(epoch_list, train_loss_list, val_loss_list)
plot_accuracy(epoch_list, train_accuracy_list, val_accuracy_list)

In [None]:
torch.save(bert_probe.state_dict(), "|".join([f"{k}_{v}" for k, v in bert_hyperparameters.items()]))

## 4.2 Training the VisualBERT Probe

In [None]:
visual_bert_probe = Probe()
print(visual_bert_probe)
visual_bert_probe.to(device)

criterion = nn.NLLLoss()
optimizer = optim.Adam(
    visual_bert_probe.parameters(),
    lr=visual_bert_hyperparameters["learning_rate"]
)

epoch_list = []
val_loss_list = []
train_loss_list = []
total_loss = 0

train_accuracy_list = []
val_accuracy_list = []

for epoch in range(visual_bert_hyperparameters["epochs"]):
    
    # TRAIN LOOP
    training_loss = 0
    visual_bert_probe.train()
    
    epoch_accuracy = 0
    
    for i, batch in enumerate(train_dataloader):
        
        obj = batch[2]
        affordance = batch[3]
        truth_value = batch[4]
        
        output = visual_bert_probe(obj, affordance)
        visual_bert_loss = criterion(output,truth_value)
        
        visual_bert_loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        total_loss += visual_bert_loss.item()
        training_loss += visual_bert_loss.item()
        
        # calculate training accuracy
        prediction = torch.argmax(output, dim=1)
        correct_predictions = torch.eq(prediction,truth_value).long()
        batch_accuracy = float(sum(correct_predictions)/len(correct_predictions))
        epoch_accuracy += batch_accuracy
    
    # VALIDATION LOOP
    validation_loss = 0
    visual_bert_probe.eval()
    
    val_epoch_accuracy = 0
    
    for i, batch in enumerate(val_dataloader):
        
        
        obj = batch[2]
        affordance = batch[3]
        truth_value = batch[4]
        
        output = visual_bert_probe(obj, affordance)
        visual_bert_loss = criterion(output,truth_value)
        validation_loss += visual_bert_loss.item()
        
        # calculate validation accuracy
        prediction = torch.argmax(output, dim=1)
        correct_predictions = torch.eq(prediction,truth_value).long()
        batch_accuracy = float(sum(correct_predictions)/len(correct_predictions))
        val_epoch_accuracy += batch_accuracy
    
    epoch_list.append(epoch+1)
    training_loss_avg = training_loss/len(train_dataloader)
    train_loss_list.append(training_loss_avg)
    validation_loss_avg = validation_loss/len(val_dataloader)
    val_loss_list.append(validation_loss_avg)
    
    train_accuracy_list.append(epoch_accuracy/len(train_dataloader))
    val_accuracy_list.append(val_epoch_accuracy/len(val_dataloader))

    print("Epoch: {}".format(epoch+1))
    print("Training loss: {}".format(training_loss_avg))
    print("Validation loss: {}".format(validation_loss_avg))
    print("Training accuracy: {}".format(epoch_accuracy/len(train_dataloader)))
    print("Validation accuracy: {}".format(val_epoch_accuracy/len(val_dataloader)))
    
plot_loss(epoch_list, train_loss_list, val_loss_list)
plot_accuracy(epoch_list, train_accuracy_list, val_accuracy_list)

In [None]:
torch.save(visual_bert_probe.state_dict(), "|".join([f"{k}_{v}" for k, v in visual_bert_hyperparameters.items()]))

# 5. Testing

## 5.1 Testing the BERT Probe

In [None]:
bert_probe.load_state_dict(torch.load("model_bert_probe|epochs_300|batch_size_64|learning_rate_0.005"))

In [None]:
bert_probe = bert_probe.to(device)

test_loss = 0
bert_probe.eval()

total = 0
correct = 0

per_word_total = dict.fromkeys(bert_word_to_embedding, 0)
per_word_correct = dict.fromkeys(bert_word_to_embedding, 0)

tp_bert = 0
fp_bert = 0
tn_bert = 0
fn_bert = 0


for i, batch in enumerate(test_dataloader):
    
    obj = batch[0]
    affordance = batch[1]
    target = batch[4]

    with torch.no_grad(): 
        
        output = bert_probe(obj, affordance)
        
        bert_loss = criterion(output, target)
        test_loss += bert_loss.item()

        # Calculate total accuracy
        total += len(batch[0])
        
        prediction = torch.argmax(output, dim=1)
        correct_predictions = torch.eq(prediction,target).long()
        correct += float(sum(correct_predictions))

        # Calculate per-object and per-affordance accuracy
        object_indices = batch[5].tolist()
        objects = [index_to_word[i] for i in object_indices]
        affordance_indices = batch[6].tolist()
        affordances = [index_to_word[i] for i in affordance_indices]
        
        for n,word in enumerate(objects):
            if prediction[n] == target[n]:
                per_word_correct[word] += 1
            per_word_total[word] += 1
            
        for n,word in enumerate(affordances):
            if prediction[n] == target[n]:
                per_word_correct[word] += 1
            per_word_total[word] += 1
            
        # Calculate tp,fp,tn,fn
        for i, value in enumerate(prediction.tolist()):
            if target.tolist()[i] == 1 and prediction.tolist()[i] == 1:
                tp_bert += 1
            elif target.tolist()[i] == 0 and prediction.tolist()[i] == 1:
                fp_bert += 1
            elif target.tolist()[i] == 1 and prediction.tolist()[i] == 0:
                fn_bert += 1
            elif target.tolist()[i] == 0 and prediction.tolist()[i] == 0:
                tn_bert += 1
        

        print('>', np.round(test_loss/(i+1), 4), end='\r')

accuracy_bert_probe = correct / total
per_object_accuracy_bert_probe = {word : (per_word_correct[word] / per_word_total[word]) for word in unique_objects if per_word_total[word] > 0}
per_affordance_accuracy_bert_probe = {word : (per_word_correct[word] / per_word_total[word]) for word in unique_affordances if per_word_total[word] > 0}

print(f'Total accuracy BERT probe: {np.round(accuracy_bert_probe * 100, 2)} %')
print()

print('Per-object accuracy BERT probe:')
for k,v in per_object_accuracy_bert_probe.items():
    print(f'{k} : {np.round(v * 100, 2)} %')
print()
    
print('Per-affordance accuracy BERT probe:')
for k,v in per_affordance_accuracy_bert_probe.items():
    print(f'{k} : {np.round(v * 100, 2)} %')

In [None]:
accuracy_bert = (tp_bert + tn_bert) / (tp_bert + fp_bert + tn_bert + fn_bert)
accuracy_bert

In [None]:
recall_bert = tp_bert / (tp_bert + fn_bert)
recall_bert

In [None]:
precision_bert = tp_bert / (tp_bert + fp_bert)
precision_bert

In [None]:
f1 = (2 * recall_bert * precision_bert) / (recall_bert + precision_bert)
f1

## 5.2 Testing the VisualBERT Probe

In [None]:
visual_bert_probe.load_state_dict(torch.load("model_visual_bert_probe|epochs_300|batch_size_64|learning_rate_0.005"))

In [None]:
visual_bert_probe = visual_bert_probe.to(device)
test_loss = 0
visual_bert_probe.eval()

total = 0
correct = 0

per_word_total = dict.fromkeys(visual_bert_word_to_embedding, 0)
per_word_correct = dict.fromkeys(visual_bert_word_to_embedding, 0)

tp_visual_bert = 0
fp_visual_bert = 0
tn_visual_bert = 0
fn_visual_bert = 0

for i, batch in enumerate(test_dataloader):
    
    obj = batch[2]
    affordance = batch[3]
    target = batch[4]

    with torch.no_grad(): 
        
        output = visual_bert_probe(obj, affordance)
        
        visual_bert_loss = criterion(output, target)
        test_loss += visual_bert_loss.item()

        # Calculate total accuracy
        total += len(batch[0])
        
        prediction = torch.argmax(output, dim=1)
        correct_predictions = torch.eq(prediction,target).long()
        correct += float(sum(correct_predictions))

        # Calculate per word accuracy
        object_indices = batch[5].tolist()
        objects = [index_to_word[i] for i in object_indices]
        affordance_indices = batch[6].tolist()
        affordances = [index_to_word[i] for i in affordance_indices]
        
        for n,word in enumerate(objects):
            if prediction[n] == target[n]:
                per_word_correct[word] += 1
            per_word_total[word] += 1
            
        for n,word in enumerate(affordances):
            if prediction[n] == target[n]:
                per_word_correct[word] += 1
            per_word_total[word] += 1
            
        # Calculate tp,fp,tn,fn
        for i, value in enumerate(prediction.tolist()):
            if target.tolist()[i] == 1 and prediction.tolist()[i] == 1:
                tp_visual_bert += 1
            elif target.tolist()[i] == 0 and prediction.tolist()[i] == 1:
                fp_visual_bert += 1
            elif target.tolist()[i] == 1 and prediction.tolist()[i] == 0:
                fn_visual_bert += 1
            elif target.tolist()[i] == 0 and prediction.tolist()[i] == 0:
                tn_visual_bert += 1

        print('>', np.round(test_loss/(i+1), 4), end='\r')

accuracy_visual_bert_probe = correct / total
per_object_accuracy_visual_bert_probe = {word : (per_word_correct[word] / per_word_total[word]) for word in unique_objects if per_word_total[word] > 0}
per_affordance_accuracy_visual_bert_probe = {word : (per_word_correct[word] / per_word_total[word]) for word in unique_affordances if per_word_total[word] > 0}

print(f'Total accuracy VisualBERT probe: {np.round(accuracy_visual_bert_probe * 100, 2)} %')
print()

print('Per-object accuracy VisualBERT probe:')
for k,v in per_object_accuracy_visual_bert_probe.items():
    print(f'{k} : {np.round(v * 100, 2)} %')
print()

print('Per-affordance accuracy VisualBERT probe:')
for k,v in per_affordance_accuracy_visual_bert_probe.items():
    print(f'{k} : {np.round(v * 100, 2)} %')


In [None]:
accuracy_visual_bert = (tp_visual_bert + tn_visual_bert) / (tp_visual_bert + fp_visual_bert + tn_visual_bert + fn_visual_bert)
accuracy_visual_bert

In [None]:
recall_visual_bert = tp_visual_bert / (tp_visual_bert + fn_visual_bert)
recall_visual_bert

In [None]:
precision_visual_bert = tp_visual_bert / (tp_visual_bert + fp_visual_bert)
precision_visual_bert

## Check why the model is predicting only 0

In [None]:
f1_visual_bert = (2 * recall_visual_bert * precision_visual_bert) / (recall_visual_bert + precision_visual_bert)
f1_visual_bert