# Librerías y configuración del dispositivo

In [None]:
from PIL import Image
import requests
from transformers import CLIPImageProcessor
import os
import pandas as pd
import torch
import numpy as np
import time
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from io import BytesIO
import base64
import gc
#from IPython.display import HTML

In [None]:
print("CUDA disponible:", torch.cuda.is_available())
# mostrar que dispositivo se esta usando
torch.cuda.get_device_name(0)

device = "cuda:0" if torch.cuda.is_available() else "cpu" # If using GPU then use mixed precision training.
device

# Implicación de lenguaje visual (VLE)

Contiene las funciones necesarias para la experimentación de VLE con las funciones de evaluación flexible y estrica


## Evaluación VLE estricta

In [None]:
def obtainUniqueImages(images):
    unique_images = []
    for img in images:
        if img not in unique_images:
            unique_images.append(img)

    return unique_images

def format_VE_strict(dataframe, premises):
    new_dataframe = dataframe.copy()

    for premise in premises:
        premise_data = new_dataframe[new_dataframe['Premise'] == premise]
        # Obtener los los labels de la premisa
        premise_labels = premise_data['Label'].unique()

        if len(premise_labels) == 1:
            # Si hay solo un tipo de etiqueta, eliminamos todas las filas con esa premisa
            new_dataframe = new_dataframe[new_dataframe['Premise'] != premise]
        elif len(premise_labels) == 3:
            # Quedarme únicamente con los ejemplos de "entailment", "neutral" y "contradiction" una única vez
            for label in ["entailment", "neutral", "contradiction"]:
                label_data = premise_data[premise_data['Label'] == label]
                indexes_data = label_data["Index"].unique()
                if len(indexes_data) > 1:
                    for i in range(1, len(indexes_data)):
                        new_dataframe.drop(new_dataframe[new_dataframe['Index'] == indexes_data[i]].index, inplace=True)
        else:
            # Si no cumple con ninguno de los criterios anteriores, eliminamos todas las filas con esa premisa
            new_dataframe = new_dataframe[new_dataframe['Premise'] != premise]

    return new_dataframe

# Evaluación estrica
def evaluateStrictVE(unique_imgs, df):
    preprocess = CLIPImageProcessor()
    correct = 0
    total = 0
    with torch.no_grad():
        for img in unique_imgs:
            preprocessed_img = preprocess(img)
            image = torch.tensor(np.array(preprocessed_img['pixel_values'])).to(device)
            hypothesis = df['Hypothesis'][df['Image'] == img]
            labels = df['Label'][df['Image'] == img]
            image_features = model.encode_image(image).float()
            tokenized_hypothesis = clip.tokenize(hypothesis).to(device)
            text_features = model.encode_text(tokenized_hypothesis).float()
            similarities = compute_similarities(text_features, image_features)

            entailment_similarity = similarities[labels == 'entailment']
            neutral_similarity = similarities[labels == 'neutral']
            contradiction_similarity = similarities[labels == 'contradiction']

            # Se tiene que cumplir que la similitud de entailment sea mayor que la de neutral y que la neutral sea mayor que la de contradicción
            if entailment_similarity > neutral_similarity and neutral_similarity > contradiction_similarity:
                correct += len(hypothesis) # Correcto para el número de hipótesis de la imagen (son 3 hipótesis, entailment, neutral y contradicción)
            total += len(hypothesis)

    print('total:', total)
    print('Aciertos:', correct)
    return correct / total

## Evaluación VLE flexible

In [None]:
def format_VE_flexible(dataframe, premises):
    new_dataframe = dataframe.copy()

    for premise in premises:
        premise_data = new_dataframe[new_dataframe['Premise'] == premise]
        # Obtener los labels únicos de la premisa
        premise_labels = premise_data['Label'].unique()

        if len(premise_labels) == 1:
            # Si hay solo un tipo de etiqueta, eliminamos todas las filas con esa premisa
            new_dataframe = new_dataframe[new_dataframe['Premise'] != premise]
        elif len(premise_labels) >= 2 and "entailment" in premise_labels:
            # Conservar los ejemplos si hay al menos dos etiquetas y una de ellas es "entailment"
            for label in premise_labels:
                label_data = premise_data[premise_data['Label'] == label]
                indexes_data = label_data["Index"].unique()
                if len(indexes_data) > 1:
                    for i in range(1, len(indexes_data)):
                        new_dataframe.drop(new_dataframe[new_dataframe['Index'] == indexes_data[i]].index, inplace=True)
        else:
            # Si no cumple con ninguno de los criterios anteriores, eliminamos todas las filas con esa premisa
            new_dataframe = new_dataframe[new_dataframe['Premise'] != premise]

    return new_dataframe


def evaluateFlexibleVE(unique_imgs, df):
    preprocess = CLIPImageProcessor()
    correct = 0
    total = 0
    with torch.no_grad():
        for img in unique_imgs:
            preprocessed_img = preprocess(img)
            image = torch.tensor(np.array(preprocessed_img['pixel_values'])).to(device)
            hypothesis = df['Hypothesis'][df['Image'] == img]
            labels = df['Label'][df['Image'] == img]
            image_features = model.encode_image(image).float()
            tokenized_hypothesis = clip.tokenize(hypothesis).to(device)
            text_features = model.encode_text(tokenized_hypothesis).float()
            similarities = compute_similarities(text_features, image_features)

            # La imagen puede tener 3 hipótesis cada una con su respectiva etiqueta (entailment, neutral, contradiction) o solo dos hipótesis (entailment y neutral o entailment y contradiction)
            if len(labels) == 3:
                entailment_similarity = similarities[labels == 'entailment']
                neutral_similarity = similarities[labels == 'neutral']
                contradiction_similarity = similarities[labels == 'contradiction']
                """
                max_similarity = max(neutral_similarity, contradiction_similarity)

                if entailment_similarity > max_similarity:
                    correct += len(hypothesis)
                """
                if entailment_similarity > neutral_similarity and entailment_similarity > contradiction_similarity:
                    correct += len(hypothesis) # Correcto para el número de hipótesis de la imagen (son 3 hipótesis, entailment, neutral y contradicción)

            elif len(labels) == 2:
                entailment_similarity = similarities[labels == 'entailment']
                other_similarity = similarities[(labels == 'neutral') | (labels == 'contradiction')]

                if entailment_similarity > other_similarity:
                    correct += len(hypothesis)

            total += len(hypothesis)

    print('total:', total)
    print('Aciertos:', correct)
    return correct / total

# Lectura de las particiones del conjunto de datos y preprocesamiento

Contiene el código implementado para la lectura de las particiones del conjunto de datos que se va a utilizar y la obtención de sus imágenes para la experimentación y su depuración (La ejecución de este código será necesaria para la obtención de las imágenes descodificadas y su almacenamiento de una manera local para cada una de las particiones, proceso que requiere de tiempo y de espacio en memoria, 12GB).

Lectura de los archivos TSV

In [None]:
# Lista para almacenar los fragmentos de datos
data_chunks = []

# Lee los datos en fragmentos y los almacena en la lista. El nombre del archivo deberá ser cambiado dependiendo de que partición se quiera leer y obtener sus imágenes
# Posibles nombres de archivos: snli_ve_train.tsv, snli_ve_dev.tsv, snli_ve_test.tsv
for chunk in pd.read_csv('Datos/snli_ve_test.tsv', chunksize=1000, sep='\t', header=None):
    data_chunks.append(chunk)

# Combina todos los fragmentos en un solo DataFrame si es necesario
tsv_data_test = pd.concat(data_chunks)

# Definir las columnas para cada uno de los archivos

tsv_data_test.columns = ['Index', 'ID', 'Image_Base64', 'Hypothesis', 'Premise', 'Label']

"""
images = []
for i in range(len(tsv_data_train)):
    base64_str = tsv_data_train['Image_Base64'][i]
    img_data = base64.b64decode(base64_str)
    img = Image.open(BytesIO(img_data))
    images.append(img)

tsv_data_train['Image'] = images

del images
"""


In [None]:
print(len(tsv_data_test))

 Código para guardar las imágenes en local (primera vez que se lee de los TSV)

In [None]:
def saveTSVImages(tsv_data, dir):
    # guardar las imágenes en un directorio
    img_dir = dir
    os.makedirs(img_dir, exist_ok=True)
    images = tsv_data['Image']

    for i, img in enumerate(images):
        img.save(os.path.join(img_dir, f'image_{i}.jpg'))

# Guardar las imágenes en un directorio local de nombre "dir" (las imágenes ocupan mucho espacio y es un proceso que tarda tiempo). Al final se crea un archivo csv con las metáforas 
# y la ruta de la imagen al directorio local, para su posterior uso en la experimentación.
def createNewDF(tsv_data, dir, name):

    saveTSVImages(tsv_data, dir)
    # Guardar las premisas
    metaphors = tsv_data['Premise']

    # Liberar la memoria
    del tsv_data
    gc.collect()

    new_data = pd.DataFrame({'Metaphor': metaphors, 'Image': [os.path.join(dir, f'image_{i}.jpg') for i in range(len(metaphors))]})
    new_data.to_csv(f'Datos/{name}.csv', index=False)


In [None]:

# Para la obtención de las tres particiones de datos (train, dev, test) se debe ejecutar el siguiente código, cambiando el nombre del archivo y el directorio de guardado de las imágenes
createNewDF(tsv_data_test, 'Datos/ImagesTest', 'testData')

# FINE-TUNING

Contiene las funciones necesarias para la codificación de imágenes, de texto, cálculo de similitudes y una clase Dataset personalizada para este contexto. Adicionalmente, se encuentra el código necesario para el entrenamiento sobre HAIVMet y la obtención del mejor modelo. Por último, también se encuentra implementado el código necesario para la evaluación de recuperación de imágenes basada en texto.

In [None]:
import torch
import torch.nn as nn
import time
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
!pip install git+https://github.com/openai/CLIP.git --quiet
import clip
import random
import matplotlib.pyplot as plt
import torch.optim.lr_scheduler as lr_scheduler
import optuna

Utils

In [None]:
class image_metaphor_dataset(Dataset):
    def __init__(self, df):

        self.images = df["Image"].tolist()
        self.metaphors = df["Metaphor"].tolist()

    def __len__(self):
        return len(self.metaphors)

    def __getitem__(self, idx):

        image = preprocess(Image.open(self.images[idx]))
        metaphor = self.metaphors[idx]

        return image, metaphor

# Codifica todas las imágenes del dataloader y devuelve un tensor
def encode_images(dataloader, model):
    image_features = None
    with torch.no_grad():
        for i, batch in enumerate(dataloader):
            images, _ = batch
            images = torch.squeeze(images).to(device)
            encoded_images = model.encode_image(images).float()
            if image_features is None:
                image_features = encoded_images
            else:
                image_features = torch.cat((image_features, encoded_images))
            #print("batch:", i, "de", len(dataloader) - 1, "batches procesados.")

    return image_features

# Normaliza los embeddings y calcula las similitudes coseno
def compute_similarities(text_features, image_features):
    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)
    similarities = text_features.cpu().numpy() @ image_features.cpu().numpy().T

    return similarities


# Evaluación de recuperación de imágenes basa en texto
def evaluate(model, dataloader, df):
    predicted = False
    correct_predicted = []
    incorrect_predicted = []
    correct = 0
    total = 0
    image_features = encode_images(dataloader, model)
    model.eval()
    with torch.no_grad():
        for batch in dataloader:
            _, metaphors = batch
            tokenized_metaphors = clip.tokenize(metaphors).to(device)
            text_features = model.encode_text(tokenized_metaphors).float()
            similarities = compute_similarities(text_features, image_features)
            for i, metaphor in enumerate(metaphors):
                # Obtener el índice de la imagen con mayor similitud
                index = np.argmax(similarities[i])
                met, _ = df.iloc[index]

                # Si la metáfora que consigo con el índice de la imagen de mayour similitud es igual a la metáfora que estoy procesando, entonces acierto
                if met == metaphor:
                    correct += 1
                    predicted = True
                    if metaphor not in correct_predicted:
                        correct_predicted.append(metaphor)
                total += 1
                if not predicted and metaphor not in incorrect_predicted:
                    incorrect_predicted.append(metaphor)
                predicted = False


    #print('total de metáforas procesadas:', total)

    return correct / total, correct_predicted, incorrect_predicted


#https://github.com/openai/CLIP/issues/57
def convert_models_to_fp32(model):
    for p in model.parameters():
        p.data = p.data.float()
        p.grad.data = p.grad.data.float()

# Función para evaluar el loss en el conjunto de validación
def evaluate_loss(model, dataloader, loss_img, loss_txt):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for batch in dataloader:
            images, texts = batch
            images = images.to(device)
            texts = clip.tokenize(texts).to(device)

            logits_per_image, logits_per_text = model(images, texts)

            ground_truth = torch.arange(len(images), dtype=torch.long, device=device)

            loss = (loss_img(logits_per_image, ground_truth) + loss_txt(logits_per_text, ground_truth)) / 2
            total_loss += loss.item()

    return total_loss / len(dataloader)


Bucle de entrenamiento

In [None]:
def train(model, epochs, df_train, df_dev):
    
    # establecer seed
    torch.manual_seed(0)
    np.random.seed(0)
    random.seed(0)

    training_dataset = image_metaphor_dataset(df_train)
    training_dataloader = DataLoader(training_dataset, batch_size=64, shuffle=True)

    dev_dataset = image_metaphor_dataset(df_dev)
    dev_dataloader = DataLoader(dev_dataset, batch_size=64, shuffle=True)

    if device == "cpu":
      model.float()
    else :
      clip.model.convert_weights(model) # Actually this line is unnecessary since clip by default already on float16

    loss_img = nn.CrossEntropyLoss()
    loss_txt = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-6, betas=(0.9,0.98), eps=1e-6, weight_decay=0.4)

    #scheduler = lr_scheduler.CosineAnnealingLR(optimizer, epochs, eta_min=0)

    train_losses = []
    dev_losses = []
    
    losses_epoch = 0.0

    best_dev_loss = float('inf')
    best_dev_epoch = 0
    start_time=time.time()
    model.train()
    for epoch in range(epochs):
        losses_epoch = 0.0
        pbar = tqdm(training_dataloader, total=len(training_dataloader))
        for batch in pbar:
            optimizer.zero_grad()

            images,texts = batch

            images= images.to(device)
            texts = clip.tokenize(texts).to(device)

            logits_per_image, logits_per_text = model(images, texts)

            ground_truth = torch.arange(len(images),dtype=torch.long,device=device)

            total_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2
            losses_epoch += total_loss.item()
            total_loss.backward()

            if device == "cpu":
              optimizer.step()
            else :
              convert_models_to_fp32(model)
              optimizer.step()
              clip.model.convert_weights(model)

            pbar.set_description(f"Epoch {epoch}/{epochs-1}")

        #scheduler.step()
        
        # print el loss promedio de cada epoch
        print("el loss promedio de la epoch", epoch, "es:", losses_epoch/len(training_dataloader))
        train_losses.append(losses_epoch/len(training_dataloader))

        dev_loss = evaluate_loss(model, dev_dataloader, loss_img, loss_txt)
        print("el loss de dev en la epoch", epoch, "es:", dev_loss)
        print("\n")
        dev_losses.append(dev_loss)
        
        if dev_loss < best_dev_loss:
            best_dev_loss = dev_loss
            best_dev_epoch = epoch
            torch.save(model.state_dict(), 'best_model.pth')

    print("\nBest Performing Model achieves dev loss of : {:.4f} in epoch: {:.1f}".format(best_dev_loss, best_dev_epoch))
    print("Time: {:.3f} seconds".format(time.time() - start_time))

  
    # Plot los valores de loss de train y dev por épocas
    plt.plot(train_losses, label='train')
    plt.plot(dev_losses, label='dev')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
    

In [None]:
test_data = pd.read_csv('Datos/testData.csv')
dev_data = pd.read_csv('Datos/devData.csv')
train_data = pd.read_csv('Datos/trainData.csv')

print("test data:", len(test_data))
print("dev data:", len(dev_data))
print("train data:", len(train_data))

In [None]:
model, preprocess = clip.load('ViT-B/32', device=device, jit=False) #Must set jit=False for training

# Train
train(model=model, epochs=20, df_train=train_data, df_dev=dev_data)

Evaluación del mejor modelo obtenido

In [None]:
model.load_state_dict(torch.load('best_model.pth'))

test_dataset = image_metaphor_dataset(test_data)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

train_dataset = image_metaphor_dataset(train_data)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=False)

dev_dataset = image_metaphor_dataset(dev_data)
dev_dataloader = DataLoader(dev_dataset, batch_size=64, shuffle=False)

train_acc = evaluate(model, train_dataloader, train_data)
print('Train accuracy: {:.2f}'.format(train_acc))

dev_acc = evaluate(model, dev_dataloader, dev_data)
print('Dev accuracy: {:.2f}'.format(dev_acc))

test_acc = evaluate(model, test_dataloader, test_data)
print('Test accuracy: {:.2f}'.format(test_acc))

# Paráfrasis

Generación de las paráfrasis literales sobre las metáforas del conjunto de test, comprende la lectura, generación de paráfrasis y creación de un TSV con los pares metáforas-paráfrasis.

## Lectura de las metáforas textuales del conjunto de prueba

In [None]:
test_data = pd.read_csv('Datos/testData.csv')
print("test data:", len(test_data))
test_data.head(171)

In [None]:
unique_test_metaphors = test_data['Metaphor'].unique()
print("Número de metáforas únicas en el conjunto de test:", len(unique_test_metaphors))
#print(unique_test_metaphors)

## Cohere

Proceso de generación de las paráfrasis literal para cada una de las metáforas textuales del conjunto de test utilizando Command R-Plus de Cohere(aquí no ejecutar nada, se utilizará el tsv ya creado con las metáforas-paráfrasis en su lugar).

In [None]:
# Para los values de las metáforas nos conectamos a la API de Coral para obtener las paráfrasis de las metáforas únicas
%pip install cohere --quiet

Código para conectarse con la API de Cohere y obtener las paráfrasis de las metáforas de test

In [None]:
import cohere
co = cohere.Client('placeholder') # Pon tu API key de Coral

test_paraphrases = {}
chat_history = []

for met in unique_test_metaphors:
    #met = 'We were sinking in an ocean of grass'
    # Mensaje al bot de Coral
    message = 'Please provide a short literal paraphrase of the following metaphor: ' + met + ". The answer that you provide me with must only contain the literal paraphrase that you generate, nothing else."

    # Generar una respuesta con el historial de chat

    response = co.chat(
        message = message,
        model="command",
        temperature=0.1,
    )

    # Obtener la respuesta del bot
    answer = response.text

    test_paraphrases[met] = answer

    time.sleep(3)


Genero un TSV con las columnas de Metaphor y Paraphrase

In [None]:
# generar un archivo CSV con las metáforas únicas y sus paráfrasis
test_paraphrases_df = pd.DataFrame(test_paraphrases.items(), columns=['Metaphor', 'Paraphrase'])
# Guardar como TSV
test_paraphrases_df.to_csv('Datos/testParaphrases.tsv', sep='\t', index=False)

## Lectura TSV

Lectura del TSV de pares metáforas-paráfrasis. Sobre estos también se encuentra realiza el análisis manual de objetos relevantes.

In [None]:
test_paraphrases_df = pd.read_csv('Datos/testParaphrases.tsv', sep='\t')
print(len(test_paraphrases_df))
test_paraphrases_df.head()

In [None]:
objetos_niguno = test_paraphrases_df[test_paraphrases_df['Ninguno'] == 'X']
objetos_al_menos_1 = test_paraphrases_df[test_paraphrases_df['al menos 1'] == 'X']
objetos_todos = test_paraphrases_df[test_paraphrases_df['Todos'] == 'X']

print("Número de paráfrasis con ningún objeto:", len(objetos_niguno))
print("Número de paráfrasis con al menos 1 objeto:", len(objetos_al_menos_1))
print("Número de paráfrasis con todos los objetos:", len(objetos_todos))

print("Porcentaje de paráfrasis con ningún objeto:", round((len(objetos_niguno) / len(test_paraphrases_df)) * 100, 2))
print("Porcentaje de paráfrasis con al menos 1 objeto:", round((len(objetos_al_menos_1) / len(test_paraphrases_df)) * 100, 2))
print("Porcentaje de paráfrasis con todos los objetos:", round((len(objetos_todos) / len(test_paraphrases_df)) * 100, 2))

Sustituyo las metáforas por las paráfrasis en un nuevo dataframe constituido de paráfrasis-imagen para la experimentación de recuperación de imágenes basada en texto con paráfrasis.

In [None]:

# Obtener las paráfrasis
test_paraphrases = test_paraphrases_df['Paraphrase'].tolist()
test_metaphors = test_paraphrases_df['Metaphor'].tolist()

# Leo del TSV
test_data_paraphrased = pd.read_csv('Datos/testParaphrasesPrueba.tsv', sep='\t')

test_data_paraphrased.head()

# Experimentación

Incluye las pruebas realizadas para recuperación de imágenes basada en texto con los pares metáfora-imagen y paráfrasis-imagen. También contiene las pruebas para VLE y sus diferentes tipos de evaluación. Toda la experimentación se realiza sobre los modelos CLIP zero-shot y fine-tuned en HAIVMet. adicionalmente, comprende la generación del subconjunto diferencia y su análisis.

In [None]:
%pip install git+https://github.com/openai/CLIP.git --quiet
import clip

## Evaluación de recuperación de imágenes basada en texto

Función para la obtención del conjunto diferencia

In [None]:
def diferenciaConjuntos(correct_metaphors, correct_paraphrases, test_paraphrases_df):
    diferencia = []
    paraphrases_to_metaphors = []

    # Obtener las metáforas correspondientes de correct_paraphrases
    for i, paraphrase in enumerate(correct_paraphrases):
        metafora = test_paraphrases_df[test_paraphrases_df['Paraphrase'] == paraphrase].values[0][0]
        paraphrases_to_metaphors.append(metafora)
        #print(metafora)
        

    # obtener la diferencia de conjuntos entre las listas correct_metaphors y paraphrases_to_metaphors
    correct_metaphors = set(correct_metaphors)
    paraphrases_to_metaphors = set(paraphrases_to_metaphors)
    diferencia = correct_metaphors.difference(paraphrases_to_metaphors)

    return diferencia


Evaluación zero-shot y de los conjuntos metáforasTextuales/imágenes originales y después paráfrasis/imágenes

In [None]:
# Cargar un CLIP preentrenado
model, preprocess = clip.load('ViT-B/32', device=device)

test_data = pd.read_csv('Datos/testData.csv')

test_dataset = image_metaphor_dataset(test_data)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# evaluate test data
test_acc, correct_metaphors, incorrect_metaphors = evaluate(model, test_dataloader, test_data)
print("Zero-shot Test accuracy pares originales metáforas/imágenes: ", test_acc)

In [None]:
# Cargar un CLIP preentrenado
model, preprocess = clip.load('ViT-B/32', device=device)

# Crear un dataset con las metáforas de test_data_paraphrased
test_dataset_paraphrased = image_metaphor_dataset(test_data_paraphrased)

test_dataloader_paraphrased = DataLoader(test_dataset_paraphrased, batch_size=64, shuffle=False)

# Evaluar el modelo con las metáforas de test_data_paraphrased
test_acc_paraphrased, correct_paraphrases, incorrect_paraphrases = evaluate(model, test_dataloader_paraphrased, test_data_paraphrased)
print("Zero-shot Test accuracy con las paráfrasis: ", test_acc_paraphrased)

Obtención del subconjunto diferencia para el enfoque zero-shot

In [None]:
diferencia = diferenciaConjuntos(correct_metaphors, correct_paraphrases, test_paraphrases_df)
print("Diferencia entre las metáforas correctas y las paráfrasis correctas:", diferencia)
print(len(diferencia))


# Crear un TSV con las columnas correct metaphors, correct paraphrases y diferencia
diferencia_df = pd.DataFrame(diferencia ,columns=['Diferencia de Metáforas'])
diferencia_df.head(30)

Evaluación de los conjuntos metáforasTextuales/imágenes originales y después paráfrasis/imágenes con el modelo Fine-tuned

In [None]:
# Cargar el CLIP finetuneado
model, _ = clip.load('ViT-B/32', device=device)
model.load_state_dict(torch.load('best_model.pth'))

# Evaluación con el modelo finetuneado con las metáforas de test_data
test_acc_finetuned = evaluate(model, test_dataloader, test_data)
print("Test accuracy con los pares originales metáforas/imágenes después del fine-tuning: ", test_acc_finetuned)

In [None]:
# Evaluación con el modelo finetuneado con las paráfrasis de test_data_paraphrased
model, _ = clip.load('ViT-B/32', device=device)
model.load_state_dict(torch.load('best_model.pth'))

test_acc_finetuned = evaluate(model, test_dataloader_paraphrased, test_data_paraphrased)
print("Test accuracy con las paráfrasis después del fine-tuning: ", test_acc_finetuned)

## VLE Evaluation

Preprocesamiento de los datos de VLE

In [None]:
# Lectura del archivo TSV
tsv_data_test = pd.read_csv('Datos/snli_ve_test.tsv', sep='\t', header=None)

In [None]:
# Definir los nombres de las columnas
tsv_data_test.columns = ['Index', 'ID', 'Image_Base64', 'Hypothesis', 'Premise', 'Label']

# Decodificar las imágenes base64 y guardarlas en una lista
images = []
for i in range(len(tsv_data_test)):
    base64_str = tsv_data_test['Image_Base64'][i]
    img_data = base64.b64decode(base64_str)
    img = Image.open(BytesIO(img_data))
    images.append(img)

# Agregar la lista de imágenes al DataFrame
tsv_data_test['Image'] = images

# Eliminar la columna 'Image_Base64'
tsv_data_test.drop(columns=['Image_Base64'], inplace=True)

print(len(tsv_data_test))

Evaluación Zero-shot primero de manera Estricta

In [None]:
# Conseguir premisas únicas
premises = tsv_data_test['Premise'].unique()

# Obtain the strict VE data
strict_VE_data = format_VE_strict(tsv_data_test, premises)
print("pares texto-imagen: ", len(strict_VE_data))
strict_VE_data_premises = strict_VE_data['Premise'].unique()
print("premisas textuales únicas: ", len(strict_VE_data_premises))

# obtain unique images
images = strict_VE_data['Image']
images = images.tolist()
unique_imgs_strict = obtainUniqueImages(images)
print(len(unique_imgs_strict))

In [None]:
# Cargar el modelo CLIP
model, preprocess = clip.load('ViT-B/32', device=device)

# evaluate test data
strict_test_acc = evaluateStrictVE(unique_imgs_strict, strict_VE_data)
print(f"Zero-shot Test accuracy VLE Estricto: {strict_test_acc:.2f}")

Evaluación Zero-shot de manera Flexible

In [None]:
# Obtain the flexible VE data
flexible_VE_data = format_VE_flexible(tsv_data_test, premises)
print("pares texto-imagen: ", len(flexible_VE_data))
flexible_VE_data_premises = flexible_VE_data['Premise'].unique()
print("premisas textuales únicas: ", len(flexible_VE_data_premises))

# Obtain unique images
images = flexible_VE_data['Image']
images = images.tolist()
unique_imgs_flexible = obtainUniqueImages(images)
print(len(unique_imgs_flexible))

In [None]:
# Cargar el modelo CLIP
model, preprocess = clip.load('ViT-B/32', device=device)

flexible_test_acc = evaluateFlexibleVE(unique_imgs_flexible, flexible_VE_data)
print(f"Zero-shot Test accuracy VLE Flexible: {flexible_test_acc:.2f}")

Evaluación con el modelo Fine-tuned, primero de manera estricta y luego flexible

In [None]:
# Cargar el modelo fine-tuneado
model, preprocess = clip.load('ViT-B/32', device=device)
model.load_state_dict(torch.load('best_model.pth'))


# Evaluación con el modelo finetuneado Estricta
strict_test_acc_finetuned = evaluateStrictVE(unique_imgs_strict, strict_VE_data)
print(f"Test accuracy VLE Estricto después del fine-tuning: {strict_test_acc_finetuned:.2f}")


# Evaluación con el modelo finetuneado Flexible
flexible_test_acc_finetuned = evaluateFlexibleVE(unique_imgs_flexible, flexible_VE_data)
print(f"Test accuracy VLE Flexible después del fine-tuning: {flexible_test_acc_finetuned:.2f}")