## 权重可视化

### 导入依赖

In [21]:
import torch
import torch.nn as nn
from model import Transformer
from config.config import get_config,latest_weights_file_path
from train import get_model,get_dataset,greedy_decode
import altair as alt
import pandas as pd
import numpy as np
import warnings 
warnings.filterwarnings("ignore")

### 获取设备

In [22]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


### 加载数据集和预训练模型

In [23]:
config = get_config()
train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt = get_dataset(config)
print('👌 数据集加载成功！')

model = get_model(config, tokenizer_src.get_vocab_size(), tokenizer_tgt.get_vocab_size()).to(device)

# Load the pretrained weights
model_filename = latest_weights_file_path(config)
print(f'😁 从{model_filename}加载预训练权重!')
state = torch.load(model_filename)
model.load_state_dict(state['model_state_dict'])
print('❤️ 预训练权重加载成功！')

Using the latest cached version of the dataset since opus_books couldn't be found on the Hugging Face Hub
Found the latest cached dataset configuration 'en-it' at C:\Users\18135\.cache\huggingface\datasets\opus_books\en-it\0.0.0\1f9f6191d0e91a3c539c2595e2fe48fc1420de9b (last modified on Wed Apr  9 14:21:22 2025).


Max length of source sentence: 309 Max length of target sentence: 274
👌 数据集加载成功！
😜 初始化权重完成！
😁 从weights\tmodel_49.pt加载预训练权重!
❤️ 预训练权重加载成功！


### 加载样本并进行预测

In [24]:
def load_next_batch():
    '''
    加载下一个小批量，每个小批量中只有一个样本
    '''
    batch = next(iter(val_dataloader))  
    encoder_input = batch["encoder_input"].to(device)   # shape: (batch_size, seq_len)
    encoder_mask = batch["encoder_mask"].to(device)     # shape: (batch_size, 1, 1, seq_len)
    decoder_input = batch["decoder_input"].to(device)   # shape: (batch_size, seq_len)
    decoder_mask = batch["decoder_mask"].to(device)     # shape: (batch_size, 1, seq_len, seq_len)

    # 将模型的输入的索引序列转回文字
    # encoder_input[0] ：获取批量中仅有的一个样本的编码器输入序列（形状为[seq_len]）
    encoder_input_tokens = [tokenizer_src.id_to_token(x) for x in encoder_input[0]]
    decoder_input_tokens = [tokenizer_tgt.id_to_token(x) for x in decoder_input[0]]

    # greedy 解码
    model_out = greedy_decode(
        model,
        encoder_input,
        encoder_mask,
        tokenizer_src,
        tokenizer_tgt,
        max_len=config['seq_len'],
        device=device,
    )
    return batch,encoder_input_tokens,decoder_input_tokens

### 注意力可视化

In [28]:
def mtx2df(m, max_row, max_col, row_tokens, col_tokens):
    return pd.DataFrame(
        [
            (
                r,
                c,
                float(m[r, c]),
                "%.3d %s" % (r, row_tokens[r] if len(row_tokens) > r else "<blank>"),
                "%.3d %s" % (c, col_tokens[c] if len(col_tokens) > c else "<blank>"),
            )
            for r in range(m.shape[0])
            for c in range(m.shape[1])
            if r < max_row and c < max_col
        ],
        columns=["row", "column", "value", "row_token", "col_token"],
    )

def get_attn_map(attn_type: str, layer: int, head: int):
    if attn_type == "encoder":
        attn = model.encoder.layers[layer].self_attention_block.attention_weights
    elif attn_type == "decoder":
        attn = model.decoder.layers[layer].self_attention_block.attention_weights
    elif attn_type == "encoder-decoder":
        attn = model.decoder.layers[layer].cross_attention_block.attention_weights
    return attn[0, head].data

def attn_map(attn_type, layer, head, row_tokens, col_tokens, max_sentence_len):
    df = mtx2df(
        get_attn_map(attn_type, layer, head),
        max_sentence_len,
        max_sentence_len,
        row_tokens,
        col_tokens,
    )
    return (
        alt.Chart(data=df)
        .mark_rect()
        .encode(
            x=alt.X("col_token", axis=alt.Axis(title="")),
            y=alt.Y("row_token", axis=alt.Axis(title="")),
            color="value",
            tooltip=["row", "column", "value", "row_token", "col_token"],
        )
        #.title(f"Layer {layer} Head {head}")
        .properties(height=400, width=400, title=f"Layer {layer} Head {head}")
        .interactive()
    )

def get_all_attention_maps(attn_type: str, layers: list[int], heads: list[int], row_tokens: list, col_tokens, max_sentence_len: int):
    charts = []
    for layer in layers:
        rowCharts = []
        for head in heads:
            rowCharts.append(attn_map(attn_type, layer, head, row_tokens, col_tokens, max_sentence_len))
        charts.append(alt.hconcat(*rowCharts))
    return alt.vconcat(*charts)

#### 进行一次预测

In [None]:
batch, encoder_input_tokens, decoder_input_tokens = load_next_batch()
print(f'Source: {batch["src_text"][0]}')
print(f'Target: {batch["tgt_text"][0]}')
# sentence_len: 句子的实际长度，不包括 [PAD] 标记。
sentence_len = encoder_input_tokens.index("[PAD]")

Source: You were from home last night?"
Target: La notte scorsa non eravate a casa?


In [None]:
# layers 表示 0-2 个编码器/解码器层
layers = [0, 1, 2]

# heads 表示 0-7 个注意力头
heads = [0, 1, 2, 3, 4, 5, 6, 7]

#### 编码器的自注意力权重可视化

In [34]:
# 编码器的自注意力权重可视化
get_all_attention_maps("encoder", layers, heads, encoder_input_tokens, encoder_input_tokens, min(20, sentence_len))

#### 解码器的自注意力可视化

In [None]:
# 解码器的自注意力可视化
get_all_attention_maps("decoder", layers, heads, decoder_input_tokens, decoder_input_tokens, min(20, sentence_len))

#### 编码器-解码器交叉注意力可视化

In [None]:
# 编码器-解码器交叉注意力可视化
get_all_attention_maps("encoder-decoder", layers, heads, encoder_input_tokens, decoder_input_tokens, min(20, sentence_len))