In [None]:
# Author: Yuki Rivera
# This notebook contains the code to evaluate the basic transformer model built and trained in PyTorch

In [None]:
# from google.colab import drive
# drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


### Sets the file paths

In [None]:
trial = "3rd_10k/"
path = "/content/drive/path/to/project/"
output_dir = path + trial + "results"
vocab_path = output_dir + "/vocab.pkl"
model_path = output_dir + "/transformer_model.pt"

In [23]:
import sys
sys.path.append(path)

### Imports necessary libraries

In [24]:
import torch
import zipfile
from tokenizer_utils import encode
import pandas as pd
import pickle
from sentence_transformers import SentenceTransformer, util
from transformer_model import TransformerModel

In [25]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Global variables & hyper parameters

In [26]:
# Special tokens
PAD_TOKEN = "<pad>"
UNK_TOKEN = "<unk>"
BOS_TOKEN = "<bos>"
EOS_TOKEN = "<eos>"

In [None]:
data_size = 500

# max prompt length
input_len = 50
# # max synopsis length
output_len = 150

vocab_length = 8000 

# embedding + hidden dimension size
d_model_size = 128 
# Number of attention heads
num_head = 2 
# Number of encoder-decoder layers
layers = 2 
# size of feedforward hidden layer
forward_dim = 2 * d_model_size
# Regularization (dropout probability)
dropout_prob = 0.1 


### Loads the test data

In [None]:
testing_df = pd.read_csv(path + "/prompt_synopsis_test.csv")

testing_df = testing_df.iloc[[0, 4, 16, 25, 47],0:]
testing_df = testing_df.reset_index(drop=True)

prompts = testing_df["prompt"].tolist()
references = testing_df["synopsis"].tolist()

### Loads the vocab

In [31]:
with open(vocab_path, "rb") as f:
    vocab = pickle.load(f)

### Rebuild id_to_token

In [32]:
id_to_token = {idx: tok for tok, idx in vocab.items()}

### Encodes Prompts

In [33]:
input_tensors = [
    torch.tensor(encode(p, vocab, input_len)) for p in prompts
]
input_tensors = torch.stack(input_tensors).to(device)

### Loads the model

In [34]:
model = TransformerModel(
    vocab_size=len(vocab),
    d_model=d_model_size,
    nhead=num_head,
    num_layers=layers,
    dim_feedforward=forward_dim,
    dropout=dropout_prob,
    max_len=max(input_len, output_len)
).to(device)

In [35]:
model.load_state_dict(torch.load(model_path, map_location=device))

<All keys matched successfully>

### Function to generate a text sequence

In [None]:

def decode(model, src_tensor, max_len, vocab):
    # sets the evaluation mode for the model
    model.eval()

    eos_id = vocab.get("<eos>") or vocab.get("<EOS>") 
    bos_id = vocab.get("<bos>") or vocab.get("<BOS>") 

    if eos_id is None or bos_id is None:
        raise ValueError("Missing <bos> or <eos> in vocab.")


    # disables gradient tracking
    with torch.no_grad():

        # encodes the input prompt
        memory = model.encoder(model.embedding(src_tensor) + model.positional_encoding[:, :src_tensor.size(1), :].to(src_tensor.device))

        # Start with BOS token
        tgt = torch.tensor([[bos_id]], device=src_tensor.device)

        for _ in range(max_len):
            # decodes using current tokens
            tgt_embed = model.embedding(tgt) + model.positional_encoding[:, :tgt.size(1), :].to(tgt.device)
            output = model.decoder(tgt_embed, memory)
            logits = model.fc_out(output)

            # samples next token with temperature
            temperature = 1.2  
            probs = torch.softmax(logits[:, -1, :] / temperature, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)

            # appends the newly generated token to the sequence
            tgt = torch.cat([tgt, next_token], dim=1)
            # if the model predicts the eos token, stop decoding early.
            if next_token[0, 0].item() == eos_id:
                break
    # returns the generated token ids
    return tgt[0].tolist()


### Function to generate output

In [None]:
def evaluate_and_print_samples(model, input_tensors, prompts, references, vocab, id_to_token, print_interval=100, print_count=10):
    generated_texts = []

    for i, input_tensor in enumerate(input_tensors):
        try:
            # transforms the tensor shape to match the model's input expectations
            input_tensor = input_tensor.unsqueeze(0)  

            # gets a list of token ids
            output_ids = decode(model, input_tensor, max_len=output_len, vocab=vocab)

            # converts the output token ids back into words
            tokens = [id_to_token[idx] for idx in output_ids if idx not in {vocab["<pad>"], vocab["<bos>"], vocab["<eos>"]}]

            generated = " ".join(tokens)

            generated_texts.append(generated)

        except Exception as e:
            print(f"[Warning] Skipped sample {i} due to error: {e}")
            generated = "[Error during generation]"


        # Print samples at the given interval
        if i % print_interval == 0 or (i == len(input_tensors) - 1 and len(input_tensors) < print_interval):
            print(f"\n--- Sample group at index {i} ---")
            start = max(0, len(generated_texts) - print_count)
            for j in range(start, len(generated_texts)):
                print(f"[{j}] Prompt     : {prompts[j]}")
                print(f"     Reference : {references[j]}")
                print(f"     Generated : {generated_texts[j]}\n")


    return generated_texts


### Generate sample output

In [49]:
generated_texts = evaluate_and_print_samples(
    model=model,
    input_tensors=input_tensors,
    prompts=prompts,
    references=references,
    vocab=vocab,
    id_to_token=id_to_token
)


--- Sample group at index 0 ---
[0] Prompt     : Write an anime synopsis that matches 'Fantasy and Sci-Fi'.
     Reference : armored beings called garm engage the formless monsters called seal who are destroying the planet annwn.
     Generated : productions rena masters masters masters masters masters earning earning earning earning quality animals animals animals animals animals animals animals animals animals lone masters animals cards cards cards lone lone become animals animals animals animals animals diverse realm music animals music metal music music music music music music animals music music music music music music music music music music music music classical music music music music music music music music music music music music music music yet music music music music music music and classical and and and and dodging and music music music music music music music music music music instrument music music music music music music music music music music music music music music 

### Computes Cosine Similarity

In [None]:
embedder = SentenceTransformer("all-MiniLM-L6-v2")

In [None]:
gen_embeddings = embedder.encode(generated_texts, convert_to_tensor=True)
ref_embeddings = embedder.encode(references, convert_to_tensor=True)

cosine_scores = util.cos_sim(gen_embeddings, ref_embeddings).diagonal()

### Saves the result to a csv file

In [None]:
df_out = pd.DataFrame({
    "prompt": prompts,
    "reference": references,
    "generated": generated_texts,
    "cosine_similarity": cosine_scores.cpu().numpy()
})

df_out.to_csv(output_dir + "/evaluation/evaluation_results.csv", index=False)