In [1]:
from google.colab import drive
drive.mount('/content/drive')
drive_folder = '/content/drive/MyDrive/Transformer/'
%cd /content/drive/MyDrive/Transformer/


Mounted at /content/drive
/content/drive/MyDrive/Transformer


In [2]:
%%capture
!pip install datasets
!pip install tokenizers
!pip install torchmetrics

In [3]:
import torch
import torch.nn as nn
from model import Transformer
from config import get_config, get_weights_file_path
from train import get_model, get_ds, greedy_decode
import altair as alt # visualization for chart
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings("ignore")

In [4]:
# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


In [5]:
config = get_config()
config['model_folder'] = drive_folder + 'weights/'
config['tokenizer_file'] = drive_folder + 'vocab/tokenizer_{0}.json'
train_dataloader, val_dataloader, vocab_src, vocab_tgt = get_ds(config)
model = get_model(config, vocab_src.get_vocab_size(), vocab_tgt.get_vocab_size()).to(device)

# Load the pretrained weights
model_filename = get_weights_file_path(config, f"21")
state = torch.load(model_filename, map_location=torch.device('cpu'))
model.load_state_dict(state['model_state_dict'])

Downloading readme:   0%|          | 0.00/25.2k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/5.73M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/32332 [00:00<?, ? examples/s]

Max length of source sentence: 309
Max length of target sentence: 274


<All keys matched successfully>

In [6]:
def load_next_batch():
    # Load a sample batch from the validation set
    batch = next(iter(val_dataloader))
    encoder_input = batch["encoder_input"].to(device)
    encoder_mask = batch["encoder_mask"].to(device)
    decoder_input = batch["decoder_input"].to(device)
    decoder_mask = batch["decoder_mask"].to(device)

    encoder_input_tokens = [vocab_src.id_to_token(idx) for idx in encoder_input[0].cpu().numpy()]
    decoder_input_tokens = [vocab_tgt.id_to_token(idx) for idx in decoder_input[0].cpu().numpy()]

    # check that the batch size is 1
    assert encoder_input.size(
        0) == 1, "Batch size must be 1 for validation"

    model_out = greedy_decode(
        model, encoder_input, encoder_mask, vocab_src, vocab_tgt, config['seq_len'], device)

    return batch, encoder_input_tokens, decoder_input_tokens

In [7]:
def mtx2df(m, max_row, max_col, row_tokens, col_tokens):
    return pd.DataFrame(
        [
            (
                r,
                c,
                float(m[r, c]),
                "%.3d %s" % (r, row_tokens[r] if len(row_tokens) > r else "<blank>"),
                "%.3d %s" % (c, col_tokens[c] if len(col_tokens) > c else "<blank>"),
            )
            for r in range(m.shape[0])
            for c in range(m.shape[1])
            if r < max_row and c < max_col
        ],
        columns=["row", "column", "value", "row_token", "col_token"],
    )

def get_attn_map(attn_type: str, layer: int, head: int): # based on which attention we want from which layer and head.
    if attn_type == "encoder":
        attn = model.encoder.layers[layer].self_attention_block.attention_scores
    elif attn_type == "decoder":
        attn = model.decoder.layers[layer].self_attention_block.attention_scores
    elif attn_type == "encoder-decoder": # Cross attention
        attn = model.decoder.layers[layer].cross_attention_block.attention_scores
    return attn[0, head].data # this is a matrix

def attn_map(attn_type, layer, head, row_tokens, col_tokens, max_sentence_len):
    df = mtx2df(
        get_attn_map(attn_type, layer, head),
        max_sentence_len,
        max_sentence_len,
        row_tokens,
        col_tokens,
    )
    return (
        alt.Chart(data=df)
        .mark_rect()
        .encode(
            x=alt.X("col_token", axis=alt.Axis(title="")),
            y=alt.Y("row_token", axis=alt.Axis(title="")),
            color="value",
            tooltip=["row", "column", "value", "row_token", "col_token"],
        )
        #.title(f"Layer {layer} Head {head}")
        .properties(height=400, width=400, title=f"Layer {layer} Head {head}")
        .interactive()
    )

def get_all_attention_maps(attn_type: str, layers: list[int], heads: list[int], row_tokens: list, col_tokens, max_sentence_len: int):
    charts = []
    for layer in layers:
        rowCharts = []
        for head in heads:
            rowCharts.append(attn_map(attn_type, layer, head, row_tokens, col_tokens, max_sentence_len))
        charts.append(alt.hconcat(*rowCharts))
    return alt.vconcat(*charts)

In [9]:
batch, encoder_input_tokens, decoder_input_tokens = load_next_batch()
print(f'Source: {batch["src_text"][0]}')
print(f'Target: {batch["tgt_text"][0]}')
sentence_len = encoder_input_tokens.index("[PAD]") # get the number of word before padding

Source: Dolly had just entered the study and suggested that the doctor should lie down.
Target: Dolly era stata allora allora nello studio e aveva proposto al dottore di coricarsi un po’.


In [10]:
layers = [0] # we have six of them actual
heads = [0,1,2,3,4,5,6,7] # all the heads

# Encoder Self-Attention
get_all_attention_maps("encoder", layers, heads, encoder_input_tokens, encoder_input_tokens, min(20, sentence_len))


In [None]:
# Decoder Self-Attention
get_all_attention_maps("decoder", layers, heads, decoder_input_tokens, decoder_input_tokens, min(20, sentence_len))

In [13]:
import numpy as np

def get_attn_weights(attn_type: str, layer: int, heads: list):
    attn_weights_list = []
    for head in heads:
        if attn_type == "encoder":
            attn = model.encoder.layers[layer].self_attention_block.attention_scores
        attn_weights_list.append(attn[0, head].data.numpy())
    return attn_weights_list

def compute_norm_and_rank(attn_type, layer, heads):
    attn_weights_list = get_attn_weights(attn_type, layer, heads)
    norms = []
    ranks = []
    for attn_weights in attn_weights_list:
        norm = np.linalg.norm(attn_weights)
        rank = np.linalg.matrix_rank(attn_weights)
        norms.append(norm)
        ranks.append(rank)
    return norms, ranks

layer = 0
heads = [0, 1, 2, 3, 4, 5, 6, 7]
for head in heads:
    norm, rank = compute_norm_and_rank("encoder", layer, [head])
    print(f"Norm of attention weights for head {head}: {norm}")
    print(f"Rank of attention weights for head {head}: {rank}")

Norm of attention weights for head 0: [9.32351]
Rank of attention weights for head 0: [17]
Norm of attention weights for head 1: [9.479674]
Rank of attention weights for head 1: [17]
Norm of attention weights for head 2: [9.330517]
Rank of attention weights for head 2: [17]
Norm of attention weights for head 3: [8.143506]
Rank of attention weights for head 3: [17]
Norm of attention weights for head 4: [7.180328]
Rank of attention weights for head 4: [17]
Norm of attention weights for head 5: [7.9106765]
Rank of attention weights for head 5: [17]
Norm of attention weights for head 6: [8.424968]
Rank of attention weights for head 6: [17]
Norm of attention weights for head 7: [9.648959]
Rank of attention weights for head 7: [17]


In [None]:
# Cross attention
get_all_attention_maps("encoder-decoder", layers, heads, encoder_input_tokens, decoder_input_tokens, min(20, sentence_len))