In [None]:
%load_ext autoreload
%autoreload 2
%load_ext jupyter_black

In [None]:
import os

os.chdir("../")

In [None]:
### SETTINGS ###

DATASETS = {
    "FB15k-237-DECODE-ONLY-LABEL": "data/data_processed/FB15k-237/decode_only_label/",
}
MODELS = {
    "bart-small": "lucadiliello/bart-small",
    "bart-base": "facebook/bart-base",
    "bart-large": "facebook/bart-large",
}

# Dataset
DATASET = "FB15k-237-DECODE-ONLY-LABEL"
MODEL = "bart-small"
MODEL_NAME = MODEL + "_" + DATASET
MODEL_PATH = f"models/{MODEL_NAME}"
MAX_LENGTH = 50

# Training
params = {
    # Dir
    "output_dir": f"models/{MODEL_NAME}/",
    # Batch
    "per_device_train_batch_size": 2,
    "per_device_eval_batch_size": 2,
    # Learning rate
    "learning_rate": 5e-5,
    "seed": 42,
    # Epochs
    "num_train_epochs": 50,
    # Logging
    "logging_dir": "model/logs",
    "logging_strategy": "epoch",
    "logging_steps": 10,
    # Evaluation
    "evaluation_strategy": "epoch",
    "eval_steps": 1,
    # Checkpoint
    "save_strategy": "epoch",
    "save_steps": 2,
    "save_total_limit": 2,
    "ddp_find_unused_parameters": False,
    "warmup_steps": 2,
}

### Load the model

In [None]:
from transformers import (
    BartForConditionalGeneration,
    BartTokenizer,
)

import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load model and tokenizer
model = (
    BartForConditionalGeneration.from_pretrained(MODEL_PATH).cuda().float().to(device)
)
tokenizer = BartTokenizer.from_pretrained(MODELS[MODEL])

### Load Data

In [None]:
from src.datasetkgc import DatasetKGC

In [None]:
import torch

train_ds, valid_ds = torch.load(DATASETS[DATASET] + "train_ds.pth"), torch.load(
    DATASETS[DATASET] + "valid_ds.pth"
)

In [None]:
from torch.utils.data import DataLoader

train_ds, valid_ds = (
    DataLoader(
        train_ds, batch_size=params["per_device_train_batch_size"], shuffle=False
    ),
    DataLoader(
        valid_ds, batch_size=params["per_device_eval_batch_size"], shuffle=False
    ),
)

### Evaluation - Iterative Playground

define idx variable and can run standlone predict.

In [None]:
# Sample
idx = 0
_valid_ds = next(iter(valid_ds))
input_ids = _valid_ds["input_ids"][idx].to(device)
attention_mask = _valid_ds["attention_mask"][idx].to(device)
labels = _valid_ds["labels"][idx].to(device)

In [None]:
with torch.no_grad():
    print("Sample input: ")
    print(tokenizer.decode(input_ids).replace("<pad>", ""), "\n")

    print("Expected output: ")
    print(tokenizer.decode(labels, skip_special_tokens=True), "\n")

    print("Model Output: ")
    print(
        tokenizer.decode(
            model.generate(input_ids.reshape(1, -1), max_length=MAX_LENGTH)[0]
        )
    )

### Hits

This metric measure the distance between the representation of label and output tokens.

In [None]:
import pandas as pd


def cosine_similarity(embd_i, embd_j):
    cos = torch.nn.CosineSimilarity(dim=1, eps=1e-6)
    return cos(embd_i, embd_j)


def hits_sim(model, tokenizer, valid_ds: dict):
    results = []

    data_size = valid_ds["input_ids"].shape[0]

    for i in range(data_size):
        text_label = tokenizer.decode(valid_ds["labels"][i], skip_special_tokens=True)
        embedding_label = model(
            tokenizer.encode(
                text_label, padding="max_length", max_length=128, return_tensors="pt"
            )
            .reshape(1, -1)
            .to(device)
        ).encoder_last_hidden_state
        embedding_label = torch.mean(embedding_label, dim=1)

        token_ids_output = model.generate(
            valid_ds["input_ids"][i].to(device).reshape(1, -1), max_length=MAX_LENGTH
        )[0]
        text_output = tokenizer.decode(token_ids_output, skip_special_tokens=True)

        embeddings_output = model(
            tokenizer.encode(
                text_output, padding="max_length", max_length=128, return_tensors="pt"
            )
            .to(device)
            .reshape(1, -1)
        ).encoder_last_hidden_state
        embeddings_output = torch.mean(embeddings_output, dim=1)

        similarity = cosine_similarity(embedding_label, embeddings_output)[0].item()

        # print("Label: ", text_label)
        # print("Output: ", text_output)
        # print("Similarity: ", similarity)
        # print("---------------------------------------------------------")

        results.append((text_label, text_output, similarity))

    results = pd.DataFrame(results, columns=["label", "output", "similarity"])
    return results

#### Analysis of cosine similarity of label and output encoded by model.

In [None]:
# Sample
idx = 0
_valid_ds = next(iter(valid_ds))
input_ids = _valid_ds["input_ids"][idx].to(device)
attention_mask = _valid_ds["attention_mask"][idx].to(device)
labels = _valid_ds["labels"][idx].to(device)

In [None]:
text_label = tokenizer.decode(labels, skip_special_tokens=True)
print(text_label)

embedding_label = model(
    tokenizer.encode(
        text_label, padding="max_length", max_length=MAX_LENGTH, return_tensors="pt"
    )
    .reshape(1, -1)
    .to(device)
).encoder_last_hidden_state
embedding_label = torch.mean(embedding_label, dim=1)

In [None]:
token_ids_output = model.generate(input_ids.reshape(1, -1), max_length=MAX_LENGTH)[0]
text_output = tokenizer.decode(token_ids_output, skip_special_tokens=True)
print(text_output)

embeddings_output = model(
    tokenizer.encode(
        text_output, padding="max_length", max_length=128, return_tensors="pt"
    )
    .to(device)
    .reshape(1, -1)
).encoder_last_hidden_state
embeddings_output = torch.mean(embeddings_output, dim=1)

In [None]:
cosine_similarity(embedding_label, embeddings_output)

In [None]:
validation_batch = 50
results = hits(model, tokenizer, valid_ds[:validation_batch])

In [None]:
_t = tokenizer.decode(valid_ds[0]["input_ids"], skip_special_tokens=False)

### Original Hits Metric

In [None]:
def training_data_to_text(input_ids, tokenizer=tokenizer):
    text = tokenizer.decode(input_ids, skip_special_tokens=False)
    # 3 -> len(<s>), 7 -> len(<mask>.)

    return text[3 : text.find(tokenizer.mask_token) + 7]


def generate_beam_search(model, tokenizer, text, beam_size=5, max_length=128):
    # Text Example: My name is <mask>.
    input_ids = tokenizer.encode(
        text, padding="max_length", max_length=max_length, return_tensors="pt"
    ).to(device)

    beam_outputs = model.generate(
        input_ids,
        max_length=max_length,
        num_beams=beam_size,
        num_return_sequences=beam_size,
        early_stopping=True,
    )

    return [
        tokenizer.decode(beam_output, skip_special_tokens=True)
        for beam_output in beam_outputs
    ]


def compute_hits(model, tokenizer, valid_ds, beam_size=5, max_length=128, debug=False):
    results = []
    hits = 0
    data_size = valid_ds["input_ids"].shape[0]

    for i in range(data_size):
        text = training_data_to_text(valid_ds["input_ids"][i], tokenizer=tokenizer)
        label = tokenizer.decode(valid_ds["labels"][i], skip_special_tokens=True)
        output_list = generate_beam_search(
            text=text,
            model=model,
            tokenizer=tokenizer,
            beam_size=beam_size,
            max_length=max_length,
        )

        if label.lower() in list(map(lambda x: x.lower(), output_list)):
            hits += 1

        results.append((text, label, output_list))

        if debug:
            print("Text: ", text)
            print("Label: ", label)
            print("Output: ", output_list)
            print()

    return pd.DataFrame(results, columns=["text", "label", "output_list"]), hits

In [None]:
train_ds, valid_ds = torch.load(DATASETS[DATASET] + "train_ds.pth"), torch.load(
    DATASETS[DATASET] + "valid_ds.pth"
)

In [None]:
# Results beam size 1
batch = 1000  # Max -> len(valid_ds)
results, hits = compute_hits(
    model, tokenizer, valid_ds[:batch], beam_size=1, debug=False
)

print("Hits: ", hits)

In [None]:
# Results beam size 3
batch = 1000  # Max -> len(valid_ds)
results, hits = compute_hits(
    model, tokenizer, valid_ds[:batch], beam_size=3, debug=False
)

print("Hits: ", hits)

In [None]:
# Results beam size 10
batch = 1000  # Max -> len(valid_ds)
results, hits = compute_hits(
    model, tokenizer, valid_ds[:batch], beam_size=10, debug=False
)

print("Hits: ", hits)

### Beam Search Standalone

In [None]:
from transformers import BartTokenizer, BartForConditionalGeneration

TXT = "Brazil capital is São <mask>."

input_ids = tokenizer([TXT], return_tensors="pt")["input_ids"]
logits = model(input_ids.to(device)).logits

masked_index = (input_ids[0] == tokenizer.mask_token_id).nonzero().item()
probs = logits[0, masked_index].softmax(dim=0)
values, predictions = probs.topk(10)

tokenizer.decode(predictions).split()

In [None]:
from transformers import BartTokenizer, BartForConditionalGeneration

TXT = "Piano is a type of <mask>."

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

input_ids = tokenizer([TXT], return_tensors="pt")["input_ids"]
logits = model(input_ids.to(device)).logits

masked_index = (input_ids[0] == tokenizer.mask_token_id).nonzero().item()
probs = logits[0, masked_index].softmax(dim=0)
values, predictions = probs.topk(5)

predicted_tokens = tokenizer.batch_decode(predictions)

suggested_words = tokenizer.decode(predictions).split()

# Gerar 5 preenchimentos de 2 tokens cada
combinations = [
    (suggested_words[i], suggested_words[j]) for i in range(5) for j in range(i + 1, 5)
]

print(combinations)

In [None]:
TXT = "Piano is a type of <mask>."

# Definir hiperparâmetros da busca beam search
num_beams = 5
num_tokens = 10

# Tokenizar a entrada
input_ids = tokenizer([TXT], return_tensors="pt")["input_ids"]

# Gerar preenchimentos usando busca beam search
output = model.generate(
    input_ids=input_ids.to(device),
    max_length=input_ids.shape[-1] + num_tokens,
    num_beams=num_beams,
    num_return_sequences=num_beams,
    pad_token_id=tokenizer.pad_token_id,
    eos_token_id=tokenizer.eos_token_id,
    do_sample=False,
)

# Decodificar as sequências geradas
decoded_outputs = tokenizer.batch_decode(output, skip_special_tokens=True)

# Gerar combinações de 2 tokens para as sequências decodificadas
print(decoded_outputs)