In [None]:
pip install transformers

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import VisualBertForPreTraining, BertTokenizer, VisualBertModel
from transformers import VisualBertConfig

import pickle
import pandas as pd


Définition des classes dont on a besoin 

In [None]:
class Model(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.visual_bert = VisualBertModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.linear = nn.Linear(config.hidden_size, 1)
        self.attentions = None  # Add an `attentions` attribute to the class

        
    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        token_type_ids=None,
        visual_embeds=None,
        visual_attention_mask=None,
        visual_token_type_ids=None,
    ):
        outputs  = self.visual_bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            visual_embeds=visual_embeds,
            visual_attention_mask=visual_attention_mask,
            visual_token_type_ids=visual_token_type_ids,
            output_attentions=True
        )
        pooled_output = self.dropout(outputs[1])
        linear_output = self.linear(pooled_output)
        output=torch.sigmoid(linear_output)
        attentions = outputs.attentions # extract attention scores from the model outputs

        return output, attentions


In [None]:
# Define the training dataset
class MyDataset(Dataset):
    def __init__(self, data):
        self.data = data
        self.indices = list(range(len(data)))  # set indices attribute
        print(self.data.keys())  
        print(f"Number of indices: {len(self.indices)}")


    def __getitem__(self, index):
        index = self.indices[index]  # get the actual index from self.indices
        text = self.data['text'][index]
        label = self.data['label'][index]
        embedded = self.data['embedded'][index]
        
        return text, label, embedded

    def __len__(self):
        return len(self.data)



Chargement du modèle pré entrainé

In [None]:
# Define the pre-trained Visual-Bert model

config = VisualBertConfig.from_pretrained('uclanlp/visualbert-nlvr2-coco-pre')
model = Model(config)
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

Downloading (…)lve/main/config.json:   0%|          | 0.00/631 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


Définition des fonctions d'entrainement et de test

In [None]:
# Define the training loop

def train(model, tokenizer, train_dataset, optimizer, criterion, device, batch_size, epochs):
    model.train()
    model.to(device)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, drop_last=False)
    for epoch in range(epochs):
        running_loss = 0.0
        running_accuracy = 0.0
        for batch in train_loader:
            try : 
              text, label, embedded = batch
              optimizer.zero_grad()
              text_encoded = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
              text_encoded = {k: v.to(device) for k, v in text_encoded.items()}
              label = label.float().unsqueeze(1).to(device)
              inputs_ids=text_encoded['input_ids'].to(device)
              visual_embeds = embedded.to(device)
              attention_mask = text_encoded['attention_mask'].to(device)
              outputs , attentions = model(input_ids=inputs_ids, attention_mask=attention_mask, visual_embeds=visual_embeds)
              loss = criterion(outputs, label)
              loss.backward()
              optimizer.step()
              pred_labels = torch.round(outputs) # round the probabilities to obtain predicted labels
              correct_preds = (pred_labels == label).sum().item() # count the number of correct predictions
              accuracy = correct_preds / batch_size # calculate accuracy
              running_loss += loss.item()
              running_accuracy += accuracy
            except :
              continue
           
        
            
        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = running_accuracy / len(train_loader)
        print('Epoch [%d] - loss: %.4f - accuracy: %.4f' % (epoch+1, epoch_loss, epoch_accuracy))
            



In [None]:

def test(model, tokenizer, test_dataset, criterion, device, batch_size):
    model.eval()
    model.to(device)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=False)
    running_loss = 0.0
    running_accuracy = 0.0
    with torch.no_grad():
        for batch in test_loader:
            try:
                text, label, embedded = batch
                text_encoded = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
                text_encoded = {k: v.to(device) for k, v in text_encoded.items()}
                label = label.float().unsqueeze(1).to(device)
                inputs_ids=text_encoded['input_ids'].to(device)
                visual_embeds = embedded.to(device)
                attention_mask = text_encoded['attention_mask'].to(device)
                outputs , attentions = model(input_ids=inputs_ids, attention_mask=attention_mask, visual_embeds=visual_embeds)
                loss = criterion(outputs, label)
                pred_labels = torch.round(outputs) # round the probabilities to obtain predicted labels
                correct_preds = (pred_labels == label).sum().item() # count the number of correct predictions
                accuracy = correct_preds / batch_size # calculate accuracy
                running_loss += loss.item()
                running_accuracy += accuracy
            except:
                continue

    test_loss = running_loss / len(test_loader)
    test_accuracy = running_accuracy / len(test_loader)
    print('Test loss: %.4f - Test accuracy: %.4f' % (test_loss, test_accuracy))


In [None]:

# Define the optimizer and loss function
optimizer = optim.AdamW(model.parameters(), lr=0.00001)
criterion = nn.BCELoss()


Chunk pour google colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Création des bases de données

In [None]:
df_test=pd.read_json("/content/drive/MyDrive/data/dev.jsonl",lines=True)

m = []

with open(r"/content/drive/MyDrive/data/file_val.pkl","rb") as g:
    m = pickle.load(g)
g.close()
df_test.loc[:, 'embedded'] = m

In [None]:
df_train=pd.read_json("/content/drive/MyDrive/data/train.jsonl",lines=True)

l = []

with open(r"/content/drive/MyDrive/data/file.pkl","rb") as f:
    l = pickle.load(f)
f.close()
df_train.loc[:, 'embedded'] = l

Suppresion des lignes posant problème

In [None]:
mask_train = pd.to_numeric(df_train['embedded'], errors='coerce').isna()
df_train = df_train[mask_train]
print(len(df_train))

8464


In [None]:
df_train = df_train.reset_index(drop=True)


In [None]:
train_dataset = MyDataset(df_train)
print(len(train_dataset))  # should print the length of your train data

Index(['id', 'img', 'label', 'text', 'embedded'], dtype='object')
Number of indices: 8464
8464


In [None]:
train(model, tokenizer, train_dataset, optimizer, criterion, device, batch_size=16, epochs=10)

Epoch [1] - loss: 0.6446 - accuracy: 0.6508
Epoch [2] - loss: 0.6635 - accuracy: 0.6412
Epoch [3] - loss: 0.6518 - accuracy: 0.6412
Epoch [4] - loss: 0.6342 - accuracy: 0.6581
Epoch [5] - loss: 0.5999 - accuracy: 0.6935
Epoch [6] - loss: 0.5579 - accuracy: 0.7325
Epoch [7] - loss: 0.5366 - accuracy: 0.7461
Epoch [8] - loss: 0.5106 - accuracy: 0.7551
Epoch [9] - loss: 0.5024 - accuracy: 0.7693
Epoch [10] - loss: 0.4867 - accuracy: 0.7766


In [None]:
mask_test = pd.to_numeric(df_test['embedded'], errors='coerce').isna()
df_test = df_test[mask_test]
print(len(df_test))

498


In [None]:
df_test = df_test.reset_index(drop=True)


In [None]:
test_dataset = MyDataset(df_test)
print(len(test_dataset))  # should print the length of your train data

Index(['id', 'img', 'label', 'text', 'embedded'], dtype='object')
Number of indices: 498
498


In [None]:
test(model, tokenizer, test_dataset, criterion, device, batch_size=16)

Test loss: 1.0460 - Test accuracy: 0.5293


Calcul et affichage de l'Auroc

In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, roc_auc_score


test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, drop_last=False)

model.eval()
with torch.no_grad():
    y_pred = []
    y_true = []
   
    for batch in test_loader:
        try : 
              text, label, embedded = batch
              text_encoded = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
              text_encoded = {k: v.to(device) for k, v in text_encoded.items()}
              label = label.float().unsqueeze(1).to(device)
              inputs_ids=text_encoded['input_ids'].to(device)
              visual_embeds = embedded.to(device)
              attention_mask = text_encoded['attention_mask'].to(device)
              outputs, attentions = model(input_ids=inputs_ids, attention_mask=attention_mask, visual_embeds=visual_embeds)
              outputs_cpu=outputs[0:].cpu()
              label_cpu=label[0:].cpu()
              #print(outputs[0:])
              y_pred.extend(outputs_cpu.numpy())
              y_true.extend(label_cpu.numpy())
              pred_labels = torch.round(outputs[0]) # round the probabilities to obtain predicted labels
              correct_preds = (pred_labels == label).sum().item() # count the number of correct predictions
    
        except :
              continue
print(len(y_pred))
print(len(y_true))
# Calculer l'AUC-ROC

auc_roc = roc_auc_score(y_true, y_pred)
print("AUC-ROC : {:.4f}".format(auc_roc))

# Calculer la courbe ROC
fpr, tpr, _ = roc_curve(y_true, y_pred)
roc_auc = auc(fpr, tpr)

# Tracer la courbe ROC
plt.figure(figsize=(4,4))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.4f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic curve')
plt.legend(loc="lower right")
plt.show()

NameError: ignored

Extraction de l'attention

In [None]:
  model.to(device)
  with torch.no_grad():
  
    for batch in test_loader:
    
              text, label, embedded = batch
              text_encoded = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
              text_encoded = {k: v.to(device) for k, v in text_encoded.items()}
              label = label.float().unsqueeze(1).to(device)
              inputs_ids=text_encoded['input_ids'].to(device)
              visual_embeds = embedded.to(device)
              attention_mask = text_encoded['attention_mask'].to(device)
              outputs, attentions = model(input_ids=inputs_ids, attention_mask=attention_mask, visual_embeds=visual_embeds)
     

              # Extraire les scores d'attention par couche
              #attentions = outputs.attentions
              for layer, attention in enumerate(attentions):
                print(f"Layer {layer+1} attention shape: {attention.shape}")
              # Extraire les scores d'attention par tête et par couche
              #multi_head_attention = outputs.multi_head_attention_outputs
              #for layer, attention_layer in enumerate(multi_head_attention):
                 #print(f"Layer {layer+1} attention shape: {[attention.shape for attention in attention_layer]}")
              



Visualisation de l'attention

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import cv2


test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, drop_last=False)
for batch in test_loader  : 
    text, label, embedded = batch

  # Load the image

 
    outputs, attentions = model(input_ids=inputs_ids, attention_mask=attention_mask, visual_embeds=embedded)
    break
  #Resize the image to match the input size of the model
  #img = cv2.resize(img, (224, 224))
  #img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

img_path = df_test['img'][1]
img = cv2.imread('/content/drive/MyDrive/ENSAE/Informatique/Statapp/data/'+img_path)
# Get the attention scores for the image
attention_scores = attentions[0][0][0].detach().cpu().numpy()

attention_scores = cv2.resize(attention_scores, (img.shape[1], img.shape[0]))
attention_scores = (attention_scores - attention_scores.min()) / (attention_scores.max() - attention_scores.min())
heatmap = cv2.applyColorMap(np.uint8(255*attention_scores), cv2.COLORMAP_JET)

result = cv2.addWeighted(img, 0.6, heatmap, 0.4, 0)

plt.imshow(result)
plt.axis('off')
plt.show()


RuntimeError: ignored