# Transformerの詳細

In [None]:
from transformers import AutoTokenizer

In [None]:
model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
text = "time flies like an arrow"

In [None]:
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
inputs.input_ids

## スケール化ドット積アテンション

In [None]:
from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

In [None]:
# nn.Embeddingの埋め込みは文脈が考慮されない
# 表記が同じトークンは同じベクトルを出力する
# セルフアテンションを導入することで文脈を考慮したベクトルを求めるのが狙い

# (batch_size, seq_len, hidden_dim)
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size()

In [None]:
import torch
from math import sqrt

# 実際はquery, key, valueを求めるときに独立した重み行列 W_Q, W_K, W_V を適用するが省略
query = key = value = inputs_embeds
dim_k = key.size(-1)

# 各トークン間の距離を計算した行列
scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
scores.size()

In [None]:
query.shape, key.transpose(1, 2).shape

In [None]:
import torch.nn.functional as F

# attention weights
weights = F.softmax(scores, dim=1)
weights.shape

In [None]:
weights.sum(dim=-1)

In [None]:
# attention weightsをvalueにかける
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape

In [None]:
weights.shape, value.shape

In [None]:
# 上記の処理を関数にまとめると
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

In [None]:
# attentionの出力は入力のベクトルと同じ形状
attn_outputs = scaled_dot_product_attention(query, key, value)
attn_outputs.shape

## マルチヘッドアテンション

In [None]:
# アテンションヘッド1つ
# head_dimは最終的に必要なembed_dimをヘッド数で割ったサイズを指定する
# BERTは特徴量次元が768で12個のヘッドがあるので768/12=64
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)  # W_q
        self.k = nn.Linear(embed_dim, head_dim)  # W_k
        self.v = nn.Linear(embed_dim, head_dim)  # W_v

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

In [None]:
head = AttentionHead(768, 64)
print(inputs_embeds.shape)
attn_outputs = head(inputs_embeds)
print(attn_outputs.shape)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        # 複数のヘッドの出力をconcatする
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

In [None]:
multihead_attn = MultiHeadAttention(config)
attn_outputs = multihead_attn(inputs_embeds)
attn_outputs.size()

## 順伝播層

* 各トークンの埋め込みを独立に処理する
* 位置単位順伝播層（position-wise feed-forward layer)
* 隠れ層のサイズは埋め込みの4倍でGELUを使うのが一般的
* nn.Linearは (batch_size, input_dim) が入力のときバッチ次元の各要素に独立に作用する
* (batch_size, seq_len, hidden_dim) が入力のときはバッチと系列のすべてのトークン埋め込みに対して独立に作用する

In [None]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [None]:
attn_outputs.shape

In [None]:
config.hidden_size, config.intermediate_size, config.hidden_dropout_prob

In [None]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

## レイヤー正規化の追加

* Layer Normalization
* Skip Connection
* レイヤー正規化後置型: Transformerの論文の実装
* レイヤー正規化前置型: 学習が安定する（こちらを使う）

In [None]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        # レイヤー正規化を適用
        hidden_state = self.layer_norm_1(x)
        # スキップ接続付きのアテンションを適用
        x = x + self.attention(hidden_state)
        # スキップ接続付きの順伝播層を適用
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [None]:
encoder_layer = TransformerEncoderLayer(config)

print(inputs_embeds.shape)
outputs = encoder_layer(inputs_embeds)
print(outputs.shape)

## 位置埋め込み

* これまでのエンコーダ層はトークンの位置を考慮していない
* データが大規模なら学習可能なパターン（位置IDのEmbeddingsを使う）を用いるのが一般的
* sin/cosを組み合わせる絶対位置表現はデータが少ないときに有効
* 相対位置表現

In [None]:
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size, 
                                             config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                                config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # 入力系列に対する位置IDを作成
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # トークン埋め込みと位置埋め込みを作成
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # トークン埋め込みと位置埋め込みを組み合わせる
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [None]:
# 最大系列長は512
config.max_position_embeddings

In [None]:
print(inputs.input_ids.shape)
seq_length = inputs.input_ids.size(1)
seq_length

In [None]:
position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
position_ids

In [None]:
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

In [None]:
config.num_hidden_layers

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) 
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

In [None]:
# 各トークンの隠れ状態を返す
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

## 分類ヘッドの追加

In [None]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
        
    def forward(self, x):
        # [CSL]トークン（0番目）の隠れ状態のみ使う
        x = self.encoder(x)[:, 0, :]
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [None]:
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()