# FomulaBEAT

変更点
- デコーダのみで学習させる
- TransformerDecoderLayerをスクラッチで書く


In [1]:
version = '02-3'
model_dir = './model/' + version
data_path = 'data/eq02.txt'

In [2]:
from pathlib import Path
import math
import time
from collections import Counter
from tqdm import tqdm
import torch
from torch.utils.data import random_split
import torch.nn as nn
from torch import Tensor
from torch.nn import (
    TransformerEncoder, TransformerDecoder,
    TransformerEncoderLayer, TransformerDecoderLayer
)
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import vocab
from torchtext.utils import download_from_url, extract_archive
import torch.nn.functional as F




パラメータの事前設定

In [3]:
%load_ext autoreload
%autoreload 2
torch.set_printoptions(linewidth=100)

In [4]:

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

model_dir_path = Path(model_dir)
if not model_dir_path.exists():
    model_dir_path.mkdir(parents=True)

データの取得

In [5]:
def read_equation_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    src_data, tgt_data = [], []
    for line in lines:
        src, tgt = line.strip().split('=')
        src_data.append(src)
        tgt_data.append(tgt)
    return src_data, tgt_data


In [6]:
# ファイルを読み込み、数式データを取得
src_data, tgt_data = read_equation_file(data_path)
print(src_data[:3], tgt_data[:3])


['8+0', '5+2', '5+1'] ['8', '7', '6']


辞書データの作成

In [7]:

SPECIALS = ['<unk>', '<pad>', '<start>', '<end>']

def build_vocab(texts):
    vocab = {}
    idx = 0
    # 数字の語彙定義
    for i in range(10):
        vocab[str(i)] = idx
        idx += 1
    # 特別語の語彙定義
    for sp in SPECIALS:
        vocab[sp] = idx
        idx += 1
    # その他の文字の語彙定義
    for text in texts:
        for char in text:
            if char not in vocab:
                vocab[char] = idx
                idx += 1
    return vocab


def convert_text_to_indexes(text, vocab):
    # <start> と <end> トークンを追加して数値化
    return [vocab['<start>']] + [vocab[char] if char in vocab else vocab['<unk>'] for char in text] + [vocab['<end>']]

# データを処理して Train と Valid に分ける関数
# データを処理して Train と Valid に分ける関数
def data_process_split(src_texts, tgt_texts, vocab_src, vocab_tgt, valid_size=0.2):
    # データを数値化
    data = []
    for (src, tgt) in zip(src_texts, tgt_texts):
        src_tensor = torch.tensor(convert_text_to_indexes(src, vocab_src), dtype=torch.long)
        tgt_tensor = torch.tensor(convert_text_to_indexes(tgt, vocab_tgt), dtype=torch.long)
        data.append((src_tensor, tgt_tensor))
    
    # データのサイズを計算して、訓練データと検証データに分割
    data_size = len(data)
    valid_size = int(valid_size * data_size)
    train_size = data_size - valid_size

    # PyTorchのrandom_splitを使って分割
    train_data, valid_data = random_split(data, [train_size, valid_size])
    
    return train_data, valid_data



In [8]:
# 辞書と逆辞書を構築
vocab_src = build_vocab(src_data)
vocab_tgt = build_vocab(tgt_data)

print(vocab_tgt)

{'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, '<unk>': 10, '<pad>': 11, '<start>': 12, '<end>': 13}


In [71]:

# データを数値化
train_data, valid_data = data_process_split(src_data, tgt_data, vocab_src, vocab_tgt)

# 結果の確認
print('インデックス化された文章')
print(f"Input: {train_data[0][0]}\nOutput: {train_data[0][1]}")

# インデックスから元の文字列に戻す
def convert_indexes_to_text(indexes:list, vocab):
    reverse_vocab = {idx: token for token, idx in vocab.items()}
    return ''.join([reverse_vocab[idx] for idx in indexes if idx in reverse_vocab and reverse_vocab[idx] not in ['<start>', '<end>', '<pad>']])

print('元に戻した文章')
print(f"Input: {convert_indexes_to_text(train_data[0][0].tolist(), vocab_src)}")
print(f"Output: {convert_indexes_to_text(train_data[0][1].tolist(), vocab_tgt)}")


インデックス化された文章
Input: tensor([12,  9, 14,  1, 13])
Output: tensor([12,  1,  0, 13])
元に戻した文章
Input: 9+1
Output: 10


In [10]:
batch_size = 128
PAD_IDX = vocab_src['<pad>']
START_IDX = vocab_src['<start>']
END_IDX = vocab_src['<end>']

def generate_batch(data_batch):
    
    batch_src, batch_tgt = [], []
    for src, tgt in data_batch:
        batch_src.append(src)
        batch_tgt.append(tgt)
        
    batch_src = pad_sequence(batch_src, padding_value=PAD_IDX)
    batch_tgt = pad_sequence(batch_tgt, padding_value=PAD_IDX)
    
    return batch_src, batch_tgt

train_iter = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=generate_batch)
valid_iter = DataLoader(valid_data, batch_size=batch_size, shuffle=True, collate_fn=generate_batch)

In [11]:
len(train_data)

8000

Transoformerの設定

In [12]:
class TokenEmbedding(nn.Module):
    
    def __init__(self, vocab_size, embedding_size):
        
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_size, padding_idx=PAD_IDX)
        self.embedding_size = embedding_size
        
    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.embedding_size)
    
    
class PositionalEncoding(nn.Module):
    
    def __init__(self, embedding_size: int, dropout: float, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        
        den = torch.exp(-torch.arange(0, embedding_size, 2) * math.log(10000) / embedding_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        embedding_pos = torch.zeros((maxlen, embedding_size))
        embedding_pos[:, 0::2] = torch.sin(pos * den)
        embedding_pos[:, 1::2] = torch.cos(pos * den)
        embedding_pos = embedding_pos.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('embedding_pos', embedding_pos)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.embedding_pos[: token_embedding.size(0), :])


In [34]:

class TransformerDecoderLayerScratch(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
        super(TransformerDecoderLayerScratch, self).__init__()
        # Self-attention for the decoder
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Multihead attention for attending to encoder outputs (memory)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Feedforward layers
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        # Layer normalization layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        # Dropout
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None):
        # Self-attention
        tgt2, _ = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)

        print('tgt:', tgt.shape)
        print('memory:', memory.shape)
        print('tgt_mask:', tgt_mask.shape)
        
        # Attention with the encoder outputs (memory)
        tgt2, _ = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask, key_padding_mask=memory_key_padding_mask)
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)
        
        # Feedforward network
        tgt2 = self.linear2(self.dropout(F.relu(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)

        return tgt


In [35]:

class Seq2SeqTransformer(nn.Module):
    
    def __init__(
        self, num_encoder_layers: int, num_decoder_layers: int,
        embedding_size: int, vocab_size_src: int, vocab_size_tgt: int,
        dim_feedforward:int = 512, dropout:float = 0.1, nhead:int = 8
    ):
        
        super(Seq2SeqTransformer, self).__init__()

        self.token_embedding_src = TokenEmbedding(vocab_size_src, embedding_size)
        self.positional_encoding = PositionalEncoding(embedding_size, dropout=dropout)
        # encoder_layer = TransformerEncoderLayer(
        #     d_model=embedding_size, nhead=nhead, dim_feedforward=dim_feedforward
        # )
        # self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        
        self.token_embedding_tgt = TokenEmbedding(vocab_size_tgt, embedding_size)
        self.decoder_layer = TransformerDecoderLayerScratch(
            d_model=embedding_size, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout
        )
        # self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)
        
        self.output = nn.Linear(embedding_size, vocab_size_tgt)

    def forward(
        self, src: Tensor, tgt: Tensor,
        mask_src: Tensor, mask_tgt: Tensor,
        padding_mask_src: Tensor, padding_mask_tgt: Tensor,
        memory_key_padding_mask: Tensor
    ):
        embedding_src = self.positional_encoding(self.token_embedding_src(src))
        # memory = self.transformer_encoder(embedding_src, mask_src, padding_mask_src)
        embedding_tgt = self.positional_encoding(self.token_embedding_tgt(tgt))
        outs = self.decoder_layer(
            embedding_tgt, embedding_src, mask_tgt, None,
            padding_mask_tgt, memory_key_padding_mask
        )
        return self.output(outs)

    # def encode(self, src: Tensor, mask_src: Tensor):
    #     return self.transformer_encoder(self.positional_encoding(self.token_embedding_src(src)), mask_src)

    def decode(self, tgt: Tensor, memory: Tensor, mask_tgt: Tensor):
        return self.decoder_layer(self.positional_encoding(self.token_embedding_tgt(tgt)), memory, mask_tgt)

In [15]:
def create_mask(src, tgt, PAD_IDX):
    
    seq_len_src = src.shape[0]
    seq_len_tgt = tgt.shape[0]

    mask_src = torch.zeros((seq_len_src, seq_len_src), device=device).type(torch.bool)
    mask_tgt = generate_square_subsequent_mask(seq_len_tgt)

    padding_mask_src = (src == PAD_IDX).transpose(0, 1)
    padding_mask_tgt = (tgt == PAD_IDX).transpose(0, 1)
    
    return mask_src, mask_tgt, padding_mask_src, padding_mask_tgt


def generate_square_subsequent_mask(seq_len):
    mask = (torch.triu(torch.ones((seq_len, seq_len), device=device)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

学習の定義

In [16]:
def train(model, data, optimizer, criterion, PAD_IDX):
    
    model.train()
    losses = 0
    for src, tgt in tqdm(data):
        
        src = src.to(device)
        tgt = tgt.to(device)

        input_tgt = tgt[:-1, :]

        mask_src, mask_tgt, padding_mask_src, padding_mask_tgt = create_mask(src, input_tgt, PAD_IDX)

        logits = model(
            src=src, tgt=input_tgt,
            mask_src=mask_src, mask_tgt=mask_tgt,
            padding_mask_src=padding_mask_src, padding_mask_tgt=padding_mask_tgt,
            memory_key_padding_mask=padding_mask_src
        )

        optimizer.zero_grad()
        output_tgt = tgt[1:, :]
        loss = criterion(logits.reshape(-1, logits.shape[-1]), output_tgt.reshape(-1))
        loss.backward()

        optimizer.step()
        losses += loss.item()
        
    return losses / len(data)

In [17]:

def evaluate(model, data, criterion, PAD_IDX):
    
    model.eval()
    losses = 0
    for src, tgt in data:
        
        src = src.to(device)
        tgt = tgt.to(device)

        input_tgt = tgt[:-1, :]

        mask_src, mask_tgt, padding_mask_src, padding_mask_tgt = create_mask(src, input_tgt, PAD_IDX)

        logits = model(
            src=src, tgt=input_tgt,
            mask_src=mask_src, mask_tgt=mask_tgt,
            padding_mask_src=padding_mask_src, padding_mask_tgt=padding_mask_tgt,
            memory_key_padding_mask=padding_mask_src
        )
        
        output_tgt = tgt[1:, :]
        loss = criterion(logits.reshape(-1, logits.shape[-1]), output_tgt.reshape(-1))
        losses += loss.item()
        
    return losses / len(data)

設定

In [18]:
vocab_size_src = len(vocab_src)
vocab_size_tgt = len(vocab_tgt)
embedding_size = 4
nhead = 1
dim_feedforward = 4
num_encoder_layers = 1
num_decoder_layers = 1
dropout = 0
# vocab_size_src = len(vocab_src)
# vocab_size_tgt = len(vocab_tgt)
# embedding_size = 240
# nhead = 8
# dim_feedforward = 100
# num_encoder_layers = 2
# num_decoder_layers = 2
# dropout = 0.1

model = Seq2SeqTransformer(
    num_encoder_layers=num_encoder_layers,
    num_decoder_layers=num_decoder_layers,
    embedding_size=embedding_size,
    vocab_size_src=vocab_size_src, vocab_size_tgt=vocab_size_tgt,
    dim_feedforward=dim_feedforward,
    dropout=dropout, nhead=nhead
)

for p in model.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

model = model.to(device)

criterion = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)

optimizer = torch.optim.Adam(model.parameters())

モデルの調査

In [19]:
print(model)

Seq2SeqTransformer(
  (token_embedding_src): TokenEmbedding(
    (embedding): Embedding(15, 4, padding_idx=11)
  )
  (positional_encoding): PositionalEncoding(
    (dropout): Dropout(p=0, inplace=False)
  )
  (token_embedding_tgt): TokenEmbedding(
    (embedding): Embedding(14, 4, padding_idx=11)
  )
  (decoder_layer): TransformerDecoderLayerScratch(
    (self_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=4, out_features=4, bias=True)
    )
    (multihead_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=4, out_features=4, bias=True)
    )
    (linear1): Linear(in_features=4, out_features=4, bias=True)
    (dropout): Dropout(p=0, inplace=False)
    (linear2): Linear(in_features=4, out_features=4, bias=True)
    (norm1): LayerNorm((4,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((4,), eps=1e-05, elementwise_affine=True)
    (norm3): LayerNorm((4,), eps=1e-05, elementwise_affine=True)
    (d

In [20]:
# モデル内の層の名前とパラメータ情報を表示
LP = list(model.named_parameters())
lp = len(LP)
print(f"{lp} 層")
for p in range(0, lp):
    print(f"\n層名: {LP[p][0]}")
    print(f"形状: {LP[p][1].shape}")
    print(f"値: {LP[p][1]}")


22 層

層名: token_embedding_src.embedding.weight
形状: torch.Size([15, 4])
値: Parameter containing:
tensor([[ 0.1285,  0.1298, -0.0098,  0.1101],
        [-0.3056,  0.4552, -0.3491,  0.1926],
        [-0.5449, -0.2922,  0.0095, -0.3097],
        [ 0.0407, -0.3994,  0.0943, -0.0571],
        [-0.1552,  0.2905, -0.4873,  0.2420],
        [-0.1616,  0.3126,  0.3157, -0.0112],
        [ 0.2244,  0.0142, -0.1428,  0.1542],
        [-0.5551,  0.0057,  0.5166, -0.0226],
        [-0.1913,  0.1071,  0.4781,  0.3912],
        [ 0.1904,  0.1718, -0.1164, -0.0400],
        [ 0.4855,  0.2643,  0.3522, -0.3119],
        [-0.3131,  0.0244,  0.4814, -0.2200],
        [-0.1714,  0.1881, -0.3099,  0.4421],
        [-0.1883,  0.4162, -0.4407,  0.4258],
        [ 0.3741,  0.3094, -0.0601,  0.0800]], device='cuda:0', requires_grad=True)

層名: token_embedding_tgt.embedding.weight
形状: torch.Size([14, 4])
値: Parameter containing:
tensor([[-0.2096,  0.5733, -0.0593,  0.0449],
        [ 0.4260,  0.3456, -0.2983, -0.

## 学習実行

In [21]:
# epoch = 100
# best_loss = float('Inf')
# best_model = None
# patience = 10
# counter = 0

# for loop in range(1, epoch + 1):
    
#     start_time = time.time()
    
#     loss_train = train(
#         model=model, data=train_iter, optimizer=optimizer,
#         criterion=criterion, PAD_IDX=PAD_IDX
#     )
    
#     elapsed_time = time.time() - start_time
    
#     loss_valid = evaluate(
#         model=model, data=valid_iter, criterion=criterion, PAD_IDX=PAD_IDX
#     )
    
#     print('[{}/{}] train loss: {:.2f}, valid loss: {:.2f}  [{}{:.0f}s] counter: {} {}'.format(
#         loop, epoch,
#         loss_train, loss_valid,
#         str(int(math.floor(elapsed_time / 60))) + 'm' if math.floor(elapsed_time / 60) > 0 else '',
#         elapsed_time % 60,
#         counter,
#         '**' if best_loss > loss_valid else ''
#     ))
    
#     if best_loss > loss_valid:
#         best_loss = loss_valid
#         best_model = model
#         counter = 0
        
#     if counter > patience:
#         break
    
#     counter += 1

学習したモデルの保存

In [22]:
# torch.save(best_model.state_dict(), model_dir_path.joinpath(version + 'translation_transfomer.pth'))

学習したモデルを使って翻訳をする

In [23]:
# def translate(
#     model, text, vocab_src, vocab_tgt, seq_len_tgt,
#     START_IDX, END_IDX
# ):
#     model.eval()
#     tokens_src = convert_text_to_indexes(text, vocab=vocab_src)
#     num_tokens_src = len(tokens_src)

#     # Tensorに変換
#     src = torch.LongTensor(tokens_src).reshape(num_tokens_src, 1).to(device)
#     mask_src = torch.zeros((num_tokens_src, num_tokens_src), device=device).type(torch.bool)

#     # デコード
#     predicts = greedy_decode(
#         model=model, src=src,
#         mask_src=mask_src, seq_len_tgt=seq_len_tgt,
#         START_IDX=START_IDX, END_IDX=END_IDX
#     ).flatten()

#     return convert_indexes_to_text(predicts, vocab=vocab_tgt)

# def greedy_decode(model, src, mask_src, seq_len_tgt, START_IDX, END_IDX):
#     src = src.to(device)
#     mask_src = mask_src.to(device)

#     # ソースの埋め込みをメモリとして利用
#     memory = model.positional_encoding(model.token_embedding_src(src))
    
#     ys = torch.ones(1, 1).fill_(START_IDX).type(torch.long).to(device)
    
#     for i in range(seq_len_tgt - 1):
#         memory = memory.to(device)
#         mask_tgt = generate_square_subsequent_mask(ys.size(0)).to(device).type(torch.bool)
        
#         output = model.decode(ys, memory, mask_tgt)
#         output = output.transpose(0, 1)
#         output = model.output(output[:, -1])
        
#         # 最も高いスコアのトークンを取得
#         _, next_word = torch.max(output, dim=1)
#         next_word = next_word.item()

#         # 生成されたトークンを追加
#         ys = torch.cat([ys, torch.ones(1, 1).fill_(next_word).type_as(src.data)], dim=0)
#         if next_word == END_IDX:
#             break
    
#     return ys


In [24]:
# seq_len_tgt = max([len(x[1]) for x in train_data])
# text = '4+3'

# # 翻訳を実行
# translation = translate(
#     model=best_model, text=text, vocab_src=vocab_src, vocab_tgt=vocab_tgt,
#     seq_len_tgt=seq_len_tgt,
#     START_IDX=START_IDX, END_IDX=END_IDX
# )

# print(f"Input: {text}")
# print(f"Output: {translation}")

## モデルの動作を分析

In [131]:
import torch

# モデルのロード
model_path = model_dir_path.joinpath(version + 'translation_transfomer.pth')
loaded_model = Seq2SeqTransformer(
    num_encoder_layers=num_encoder_layers,
    num_decoder_layers=num_decoder_layers,
    embedding_size=embedding_size,
    vocab_size_src=vocab_size_src, vocab_size_tgt=vocab_size_tgt,
    dim_feedforward=dim_feedforward,
    dropout=dropout, nhead=nhead
).to(device)
loaded_model.load_state_dict(torch.load(model_path))
loaded_model.eval()


Seq2SeqTransformer(
  (token_embedding_src): TokenEmbedding(
    (embedding): Embedding(15, 4, padding_idx=11)
  )
  (positional_encoding): PositionalEncoding(
    (dropout): Dropout(p=0, inplace=False)
  )
  (token_embedding_tgt): TokenEmbedding(
    (embedding): Embedding(14, 4, padding_idx=11)
  )
  (decoder_layer): TransformerDecoderLayerScratch(
    (self_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=4, out_features=4, bias=True)
    )
    (multihead_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=4, out_features=4, bias=True)
    )
    (linear1): Linear(in_features=4, out_features=4, bias=True)
    (dropout): Dropout(p=0, inplace=False)
    (linear2): Linear(in_features=4, out_features=4, bias=True)
    (norm1): LayerNorm((4,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((4,), eps=1e-05, elementwise_affine=True)
    (norm3): LayerNorm((4,), eps=1e-05, elementwise_affine=True)
    (d

In [132]:
# パラメータを取り出す


# モデルのパラメータを取得
params = dict(loaded_model.named_parameters())
print(params.keys())

# 埋め込み行列を取得
embedding_src_weight = params['token_embedding_src.embedding.weight'].data
embedding_tgt_weight = params['token_embedding_tgt.embedding.weight'].data

# 線形層の重みとバイアス
output_weight = params['output.weight'].data
output_bias = params['output.bias'].data

# デコーダの自己注意の重みとバイアス
self_attn_in_proj_weight = params['decoder_layer.self_attn.in_proj_weight'].data
self_attn_in_proj_bias = params['decoder_layer.self_attn.in_proj_bias'].data
self_attn_out_proj_weight = params['decoder_layer.self_attn.out_proj.weight'].data
self_attn_out_proj_bias = params['decoder_layer.self_attn.out_proj.bias'].data

# メモリー注意の重みとバイアス
multihead_attn_in_proj_weight = params['decoder_layer.multihead_attn.in_proj_weight'].data
multihead_attn_in_proj_bias = params['decoder_layer.multihead_attn.in_proj_bias'].data
multihead_attn_out_proj_weight = params['decoder_layer.multihead_attn.out_proj.weight'].data
multihead_attn_out_proj_bias = params['decoder_layer.multihead_attn.out_proj.bias'].data

# フィードフォワードネットワークの重みとバイアス
linear1_weight = params['decoder_layer.linear1.weight'].data
linear1_bias = params['decoder_layer.linear1.bias'].data
linear2_weight = params['decoder_layer.linear2.weight'].data
linear2_bias = params['decoder_layer.linear2.bias'].data

# LayerNormのパラメータ
norm1_weight = params['decoder_layer.norm1.weight'].data
norm1_bias = params['decoder_layer.norm1.bias'].data
norm2_weight = params['decoder_layer.norm2.weight'].data
norm2_bias = params['decoder_layer.norm2.bias'].data
norm3_weight = params['decoder_layer.norm3.weight'].data
norm3_bias = params['decoder_layer.norm3.bias'].data


dict_keys(['token_embedding_src.embedding.weight', 'token_embedding_tgt.embedding.weight', 'decoder_layer.self_attn.in_proj_weight', 'decoder_layer.self_attn.in_proj_bias', 'decoder_layer.self_attn.out_proj.weight', 'decoder_layer.self_attn.out_proj.bias', 'decoder_layer.multihead_attn.in_proj_weight', 'decoder_layer.multihead_attn.in_proj_bias', 'decoder_layer.multihead_attn.out_proj.weight', 'decoder_layer.multihead_attn.out_proj.bias', 'decoder_layer.linear1.weight', 'decoder_layer.linear1.bias', 'decoder_layer.linear2.weight', 'decoder_layer.linear2.bias', 'decoder_layer.norm1.weight', 'decoder_layer.norm1.bias', 'decoder_layer.norm2.weight', 'decoder_layer.norm2.bias', 'decoder_layer.norm3.weight', 'decoder_layer.norm3.bias', 'output.weight', 'output.bias'])


In [133]:

# Positional Encoding
def positional_encoding(tensor: Tensor, maxlen=5000):
    embedding_size = tensor.size(-1)
    den = torch.exp(-torch.arange(0, embedding_size, 2) * math.log(10000) / embedding_size)
    pos = torch.arange(0, maxlen).reshape(maxlen, 1)
    embedding_pos = torch.zeros((maxlen, embedding_size))
    embedding_pos[:, 0::2] = torch.sin(pos * den)
    embedding_pos[:, 1::2] = torch.cos(pos * den)
    embedding_pos = embedding_pos.unsqueeze(-2)
    return tensor + embedding_pos[: tensor.size(0), :].to(tensor.device)

In [169]:

# 翻訳処理を実行
seq_len_tgt = max([len(x[1]) for x in train_data])
text = '4+0'

tokens_src = convert_text_to_indexes(text, vocab=vocab_src)
src = torch.LongTensor(tokens_src).reshape(len(tokens_src), 1).to(device)
memory = positional_encoding(embedding_src_weight[src] * math.sqrt(embedding_size))
ys = torch.ones(1, 1).fill_(START_IDX).type(torch.long).to(device)

for i in range(10):
    tgt_embed = positional_encoding(embedding_tgt_weight[ys] * math.sqrt(embedding_size))
    tgt_mask = generate_square_subsequent_mask(ys.size(0)).to(device).type(torch.bool)

    tgt2, self_attn_weight = loaded_model.decoder_layer.self_attn(tgt_embed, tgt_embed, tgt_embed)
    tgt = tgt_embed + tgt2
    tgt = loaded_model.decoder_layer.norm1(tgt)

    # Attention with the encoder outputs (memory)
    tgt2, multi_attn_weight = loaded_model.decoder_layer.multihead_attn(tgt, memory, memory)
    tgt = tgt + tgt2
    tgt = loaded_model.decoder_layer.norm2(tgt)
    
    # Feedforward network

    # decoder linear1, 2
    tgt2 = tgt.matmul(linear1_weight.T) + linear1_bias
    tgt2 = F.relu(tgt2)
    tgt2 = tgt2.matmul(linear2_weight.T) + linear2_bias
    tgt = tgt + tgt2

    # LayerNorm
    tgt = loaded_model.decoder_layer.norm3(tgt)

    print(output)
    print(output[: , -1])
    output = tgt.transpose(0, 1)
    output = loaded_model.output(output[:, -1])

    _, next_word = torch.max(output, dim=1)
    next_word = next_word.item()

    ys = torch.cat([ys, torch.ones(1, 1).fill_(next_word).type_as(src.data)], dim=0)
    print(next_word)
    
    if next_word == END_IDX:
        break


flat_indexes = [idx for sublist in ys.tolist() for idx in sublist] if isinstance(ys.tolist()[0], list) else ys.tolist()

print(f"Input: {text}")
print(f"Decoded sequence: {convert_indexes_to_text(flat_indexes, vocab_tgt)}")

tensor([[  8.0768,   8.1649, -11.4503, -21.2393, -22.3936, -15.2151,  -2.2355,   9.7388,   2.5360,
          -0.7041,   0.1431,  -0.1674,   0.4346,  18.2473]], device='cuda:0',
       grad_fn=<AddmmBackward0>)
tensor([18.2473], device='cuda:0', grad_fn=<SelectBackward0>)
4
tensor([[  1.5946,  -5.9857,   7.7078,  18.7305,  22.7645,  19.4159,   8.7318, -10.0034, -15.7477,
         -16.7061,  -0.6714,   0.3011,   1.5128, -10.4346]], device='cuda:0',
       grad_fn=<AddmmBackward0>)
tensor([-10.4346], device='cuda:0', grad_fn=<SelectBackward0>)
13
Input: 4+0
Decoded sequence: 4


In [135]:
print(type(self_attn_weight))
print(self_attn_weight.shape)
print(self_attn_weight)
print(type(multi_attn_weight))
print(multi_attn_weight.shape)
print(multi_attn_weight)

<class 'torch.Tensor'>
torch.Size([1, 2, 2])
tensor([[[0.5330, 0.4670],
         [0.4987, 0.5013]]], device='cuda:0', grad_fn=<MeanBackward1>)
<class 'torch.Tensor'>
torch.Size([1, 2, 5])
tensor([[[0.2246, 0.2003, 0.0877, 0.0921, 0.3953],
         [0.1011, 0.1382, 0.2386, 0.4479, 0.0742]]], device='cuda:0', grad_fn=<MeanBackward1>)


# Transformerの検算

スクラッチで書くための検算

## Multihead Attention

Multihead Attentionの動作をスクラッチで書きたいので、ここで検算する

参考サイト
https://blog.amedama.jp/entry/pytorch-multi-head-attention-verify

In [118]:
import torch
from torch import nn
import torch.nn.functional as F


In [120]:
edim = 4 # 埋め込み次元
num_heads = 1 # ヘッド数
model = nn.MultiheadAttention(edim, num_heads, bias=True, batch_first=True)

In [121]:
batch_size = 2
L=5
X = torch.randn(batch_size, L, edim) # 入力

Q = K = V = X # クエリ、キー、バリューは全て入力とする
print(Q.shape)
print(Q)

torch.Size([2, 5, 4])
tensor([[[ 0.1554,  1.2360,  1.0639, -0.3720],
         [-0.3173, -0.3807, -0.1323,  0.9814],
         [ 0.2685,  0.1682,  0.6541, -1.4650],
         [-0.5318,  0.3384, -0.7755, -0.8227],
         [ 1.0088, -0.0763,  1.5746, -1.5231]],

        [[-0.1761,  0.0773, -0.2156,  2.3514],
         [-1.1603,  0.2220,  0.6173, -0.0333],
         [-0.6132, -1.6530, -0.0637,  0.2078],
         [ 0.0238,  0.1894, -0.7039, -0.3044],
         [-0.3339,  0.5039,  0.1053, -1.0818]]])


In [122]:

attn_output, attn_output_weights = model(Q, K, V)

print(attn_output.shape)
print(attn_output)



torch.Size([2, 5, 4])
tensor([[[ 0.1358,  0.0877, -0.0675,  0.2377],
         [ 0.2223,  0.1380, -0.0535,  0.3154],
         [ 0.1997,  0.1280, -0.0665,  0.3035],
         [ 0.1347,  0.0820, -0.0598,  0.2552],
         [ 0.2354,  0.1519, -0.0651,  0.3224]],

        [[-0.0021, -0.0743,  0.0718, -0.0541],
         [ 0.0871,  0.0165, -0.0137,  0.0552],
         [ 0.1145,  0.0278,  0.0143,  0.0071],
         [-0.0942, -0.1198,  0.0600, -0.0701],
         [-0.0115, -0.0509,  0.0139, -0.0049]]], grad_fn=<TransposeBackward0>)


In [124]:
from pprint import pprint
pprint(list(model.named_parameters()))

[('in_proj_weight',
  Parameter containing:
tensor([[ 0.3135, -0.4200,  0.1098,  0.1657],
        [-0.3305, -0.5894,  0.2564, -0.4990],
        [ 0.3443,  0.3706, -0.5312, -0.5492],
        [-0.0969,  0.0883, -0.5059, -0.2797],
        [ 0.2573, -0.5490, -0.0139,  0.4898],
        [ 0.5063, -0.0761,  0.1915, -0.6119],
        [-0.0861,  0.3069, -0.5451,  0.4586],
        [-0.0363, -0.1677,  0.4143,  0.2315],
        [-0.0870, -0.4917, -0.1023,  0.3188],
        [-0.2304,  0.3617, -0.1264,  0.4168],
        [ 0.1843, -0.2980,  0.2385,  0.2357],
        [ 0.2304,  0.1970, -0.5439,  0.1361]], requires_grad=True)),
 ('in_proj_bias',
  Parameter containing:
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.], requires_grad=True)),
 ('out_proj.weight',
  Parameter containing:
tensor([[ 0.2063, -0.4731, -0.3537, -0.3683],
        [ 0.0500, -0.4336, -0.2108,  0.0188],
        [ 0.1191,  0.0621,  0.2418, -0.1074],
        [-0.4875,  0.1371,  0.1110, -0.4857]], requires_grad=True)),
 ('out_p

In [126]:
model_weight = {name: param.data for name, param in model.named_parameters()}
Wi = model_weight['in_proj_weight']
Wo = model_weight['out_proj.weight']
Wbi = model_weight['in_proj_bias']
Wbo = model_weight['out_proj.bias']

In [127]:
Wi_q, Wi_k, Wi_v = Wi.chunk(3, dim=0)
Wbi_q, Wbi_k, Wbi_v = Wbi.chunk(3, dim=0)
QW = torch.matmul(Q, Wi_q.T) + Wbi_q
KW = torch.matmul(K, Wi_k.T) + Wbi_k
VW = torch.matmul(V, Wi_v.T) + Wbi_v

KW_t = KW.transpose(-2, -1)
QK_t = torch.bmm(QW, KW_t)
QK_scaled = QK_t / (edim ** 0.5)
attn_weights_ = F.softmax(QK_scaled, dim=-1)

In [128]:
print(attn_weights_)
print(attn_output_weights)

tensor([[[0.2136, 0.1996, 0.1859, 0.2592, 0.1417],
         [0.1816, 0.1744, 0.2186, 0.1662, 0.2592],
         [0.2044, 0.1549, 0.2161, 0.2081, 0.2165],
         [0.2381, 0.2097, 0.1783, 0.2209, 0.1530],
         [0.1737, 0.1402, 0.2324, 0.1952, 0.2584]],

        [[0.2170, 0.2222, 0.2612, 0.1480, 0.1516],
         [0.0793, 0.2149, 0.1782, 0.2091, 0.3185],
         [0.0741, 0.1713, 0.2763, 0.1980, 0.2803],
         [0.2957, 0.1953, 0.1651, 0.1846, 0.1593],
         [0.1836, 0.2054, 0.1493, 0.2246, 0.2371]]])
tensor([[[0.2136, 0.1996, 0.1859, 0.2592, 0.1417],
         [0.1816, 0.1744, 0.2186, 0.1662, 0.2592],
         [0.2044, 0.1549, 0.2161, 0.2081, 0.2165],
         [0.2381, 0.2097, 0.1783, 0.2209, 0.1530],
         [0.1737, 0.1402, 0.2324, 0.1952, 0.2584]],

        [[0.2170, 0.2222, 0.2612, 0.1480, 0.1516],
         [0.0793, 0.2149, 0.1782, 0.2091, 0.3185],
         [0.0741, 0.1713, 0.2763, 0.1980, 0.2803],
         [0.2957, 0.1953, 0.1651, 0.1846, 0.1593],
         [0.1836, 0.2054,

In [129]:
AV = torch.matmul(attn_weights_, VW)
attn_output_ = torch.matmul(AV, Wo.T) + Wbo

In [130]:
print(attn_output_)
print(attn_output)

tensor([[[ 0.1358,  0.0877, -0.0675,  0.2377],
         [ 0.2223,  0.1380, -0.0535,  0.3154],
         [ 0.1997,  0.1280, -0.0665,  0.3035],
         [ 0.1347,  0.0820, -0.0598,  0.2552],
         [ 0.2354,  0.1519, -0.0651,  0.3224]],

        [[-0.0021, -0.0743,  0.0718, -0.0541],
         [ 0.0871,  0.0165, -0.0137,  0.0552],
         [ 0.1145,  0.0278,  0.0143,  0.0071],
         [-0.0942, -0.1198,  0.0600, -0.0701],
         [-0.0115, -0.0509,  0.0139, -0.0049]]])
tensor([[[ 0.1358,  0.0877, -0.0675,  0.2377],
         [ 0.2223,  0.1380, -0.0535,  0.3154],
         [ 0.1997,  0.1280, -0.0665,  0.3035],
         [ 0.1347,  0.0820, -0.0598,  0.2552],
         [ 0.2354,  0.1519, -0.0651,  0.3224]],

        [[-0.0021, -0.0743,  0.0718, -0.0541],
         [ 0.0871,  0.0165, -0.0137,  0.0552],
         [ 0.1145,  0.0278,  0.0143,  0.0071],
         [-0.0942, -0.1198,  0.0600, -0.0701],
         [-0.0115, -0.0509,  0.0139, -0.0049]]], grad_fn=<TransposeBackward0>)


## nn.Linear

In [142]:
model = nn.Linear(4, 4)
model

Linear(in_features=4, out_features=4, bias=True)

In [143]:
pprint(list(model.named_parameters()))

[('weight',
  Parameter containing:
tensor([[-0.3068, -0.1941, -0.0057,  0.2137],
        [ 0.2596, -0.1518, -0.3444,  0.1489],
        [ 0.3061, -0.2957, -0.3799, -0.2315],
        [ 0.2787, -0.1753, -0.1904, -0.0134]], requires_grad=True)),
 ('bias',
  Parameter containing:
tensor([-0.2181,  0.4180,  0.0857, -0.1765], requires_grad=True))]


In [145]:
model_weight = {name: param.data for name, param in model.named_parameters()}
W = model_weight['weight']
B = model_weight['bias']

X = torch.randn(4) 
print(X.shape)
print(X)
output = model(X)
print(output.shape)
print(output)


torch.Size([4])
tensor([ 0.5931,  1.4933, -2.1964, -0.0622])
torch.Size([4])
tensor([-0.6907,  1.0925,  0.6744,  0.1461], grad_fn=<ViewBackward0>)


In [146]:
output_ = X.matmul(W.T) + B
print(output_)
print(output)

tensor([-0.6907,  1.0925,  0.6744,  0.1461])
tensor([-0.6907,  1.0925,  0.6744,  0.1461], grad_fn=<ViewBackward0>)


## nn.LayerNorm

参考サイト
https://qiita.com/dl_from_scratch/items/133fe741b67ed14f1856

In [151]:
model = nn.LayerNorm(4)
model

LayerNorm((4,), eps=1e-05, elementwise_affine=True)

In [152]:
pprint(list(model.named_parameters()))

[('weight', Parameter containing:
tensor([1., 1., 1., 1.], requires_grad=True)),
 ('bias', Parameter containing:
tensor([0., 0., 0., 0.], requires_grad=True))]


In [153]:
model_weight = {name: param.data for name, param in model.named_parameters()}
W = model_weight['weight']
B = model_weight['bias']

X = torch.randn(4) 
print(X.shape)
print(X)
output = model(X)
print(output.shape)
print(output)


torch.Size([4])
tensor([1.1853, 0.6058, 0.7668, 0.0579])
torch.Size([4])
tensor([ 1.3154, -0.1192,  0.2793, -1.4755], grad_fn=<NativeLayerNormBackward0>)


In [154]:
output_ = X.matmul(W.T) + B
print(output_)
print(output)

tensor([2.6159, 2.6159, 2.6159, 2.6159])
tensor([ 1.3154, -0.1192,  0.2793, -1.4755], grad_fn=<NativeLayerNormBackward0>)
