In [1]:
import torch
import torch.nn as nn
from modules.TransformerModule import TransformerModule
from config.core import config
import altair as alt
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [3]:
print(config.model_transformer.TR_model)

{'src_vocab_size': 1024, 'tgt_vocab_size': 100, 'src_seq_len': 350, 'tgt_seq_len': 500, 'seq_len': 350, 'lang_src': 'en', 'lang_tgt': 'it', 'd_model': 512, 'num_layer': 6, 'num_heads': 8, 'dropout': 0.0, 'd_ff': 2048, 'lr': 0.0001, 'batch_size': 16, 'epochs': 20, 'ckpt_file': '', 'tokenizer_file': 'tokenizer_{}.json'}


In [4]:
# Load model
from pathlib import Path
model_ckpt_file = Path(
    r"D:\ML_AI_DL_Projects\projects_repo\transformer\transformer_app\test_dir\test_01\version_1\checkpoints\model-epoch=09-val_loss=4.56244.ckpt"
)

model = TransformerModule.load_from_checkpoint(model_ckpt_file)
model.to(device)

DS SIZE: 32332
Max lengh of src seq: 309
Max lengh of src tgt: 274
--------------------------------------------------------------------------------
src_vocab_size:15698
tgt_vocab_size:22463
--------------------------------------------------------------------------------


TransformerModule(
  (src_embeded): InputEmbeddings(
    (embedding): Embedding(15698, 512)
  )
  (tgt_embeded): InputEmbeddings(
    (embedding): Embedding(22463, 512)
  )
  (src_pos_enc): PositionalEncoding(
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (tgt_pos_enc): PositionalEncoding(
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (encoder_blocks): ModuleList(
    (0-5): 6 x EncoderBlock(
      (self_attn_block): MultiHeadAttentionBlock(
        (w_q): Linear(in_features=512, out_features=512, bias=False)
        (w_k): Linear(in_features=512, out_features=512, bias=False)
        (w_v): Linear(in_features=512, out_features=512, bias=False)
        (w_o): Linear(in_features=512, out_features=512, bias=False)
        (dropout): Dropout(p=0.0, inplace=False)
      )
      (feed_forward_net): PositionWiseFFN(
        (linear_1): Linear(in_features=512, out_features=2048, bias=True)
        (dropout): Dropout(p=0.0, inplace=False)
        (linear_2): Linear(in_features=2048, 

In [6]:
val_ds = model.val_dataloader()

In [23]:
def load_next_batch():
    # Load a sample batch from the validation set
    batch = next(iter(val_ds))
    encoder_input = batch["encoder_input"][0]
    encoder_mask = batch["encoder_mask"][0]
    decoder_input = batch["decoder_input"][0]
    decoder_mask = batch["decoder_mask"][0]
    encoder_input = encoder_input.view(1,encoder_input.shape[0])
    decoder_input = decoder_input.view(1,decoder_input.shape[0])
    print(f"encoder_input:{encoder_input.shape}")
    encoder_input_tokens = [model.tokenizer_src.id_to_token(idx) for idx in encoder_input[0].cpu().numpy()]
    decoder_input_tokens = [model.tokenizer_tgt.id_to_token(idx) for idx in decoder_input[0].cpu().numpy()]

    # check that the batch size is 1
    assert encoder_input.size(
        0) == 1, "Batch size must be 1 for validation"
    model_out = model._greedy_decode(
        encoder_input.to(device), encoder_mask.to(device), model.tokenizer_src, model.tokenizer_tgt, config.model_transformer.TR_model['seq_len'], device)
    
    return batch, encoder_input_tokens, decoder_input_tokens

In [8]:
# Visualize data, takeover fun
def mtx2df(m, max_row, max_col, row_tokens, col_tokens):
    return pd.DataFrame(
        [
            (
                r,
                c,
                float(m[r, c]),
                "%.3d %s" % (r, row_tokens[r] if len(row_tokens) > r else "<blank>"),
                "%.3d %s" % (c, col_tokens[c] if len(col_tokens) > c else "<blank>"),
            )
            for r in range(m.shape[0])
            for c in range(m.shape[1])
            if r < max_row and c < max_col
        ],
        columns=["row", "column", "value", "row_token", "col_token"],
    )

In [30]:
#Threee attention encoder, decoder,cross-attn
def get_attn_map(attn_type: str, layer: int, head: int):
    if attn_type == "encoder":
        attn = model.encoder.layers[layer].self_attn_block.attn_scores
    elif attn_type == "decoder":
        attn = model.decoder.layers[layer].self_attn_block.attn_scores
    elif attn_type == "encoder-decoder":
        attn = model.decoder.layers[layer].cross_attn_block.attn_scores
    return attn[0, head].data

In [10]:
#Visualize attn map/ takeover fun
def attn_map(attn_type, layer, head, row_tokens, col_tokens, max_sentence_len):
    df = mtx2df(
        get_attn_map(attn_type, layer, head),
        max_sentence_len,
        max_sentence_len,
        row_tokens,
        col_tokens,
    )
    return (
        alt.Chart(data=df)
        .mark_rect()
        .encode(
            x=alt.X("col_token", axis=alt.Axis(title="")),
            y=alt.Y("row_token", axis=alt.Axis(title="")),
            color="value",
            tooltip=["row", "column", "value", "row_token", "col_token"],
        )
        #.title(f"Layer {layer} Head {head}")
        .properties(height=400, width=400, title=f"Layer {layer} Head {head}")
        .interactive()
    )

In [11]:
def get_all_attention_maps(attn_type: str, layers: list[int], heads: list[int], row_tokens: list, col_tokens, max_sentence_len: int):
    charts = []
    for layer in layers:
        rowCharts = []
        for head in heads:
            rowCharts.append(attn_map(attn_type, layer, head, row_tokens, col_tokens, max_sentence_len))
        charts.append(alt.hconcat(*rowCharts))
    return alt.vconcat(*charts)

In [26]:
batch, encoder_input_tokens, decoder_input_tokens = load_next_batch()
print(f'Source: {batch["src_text"][0]}')
print(f'Target: {batch["tgt_text"][0]}')
sentence_len = encoder_input_tokens.index("[PAD]")

encoder_input:torch.Size([1, 350])
Source: But before he had finished writing she read it under his hand, finished the sentence herself, and wrote the answer: 'Yes.'
Target: Ma non aveva ancora finito di scriverle e lei già leggeva dietro il suo braccio e terminava lei stessa e scriveva la risposta: “Sì”.


In [31]:
layers = [0, 1, 2]
heads = [0, 1, 2, 3, 4, 5, 6, 7]

# Encoder Self-Attention
get_all_attention_maps("encoder", layers, heads, encoder_input_tokens, encoder_input_tokens, min(20, sentence_len))


In [32]:
# Encoder Self-Attention
get_all_attention_maps("decoder", layers, heads, decoder_input_tokens, decoder_input_tokens, min(20, sentence_len))

In [33]:
# Encoder Self-Attention
get_all_attention_maps("encoder-decoder", layers, heads, encoder_input_tokens, decoder_input_tokens, min(20, sentence_len))