# BERTモデルの実装

## 目標
- BERTのEmbeddingsモジュールの動作を理解し、実装できる
- BERTのSelf-Attentionを活用したTransformer部分であるBertLayerモジュールの動作を理解し、実装できる
- BERTのPoolerモジュールの動作を理解し、実装できる



- BERTの学習済みモデルを自分の実装モデルにロードできる
- BERT用の単語分割クラスなど、言語データの前処理部分を実装できる
- BERTで単語ベクトルを取り出して確認する内容を実装できる

## Library

In [1]:
import math
import numpy as np
import torch
from torch import nn

## BERTの実装

## BERT_Baseのネットワークの設定ファイルの読み込み


In [3]:
# config.jsonから読み込み、JSON の辞書変数をオブジェクト変数に変換
import json

config_file = './weights/bert_config.json'

# ファイルを開きJSONとして読み込む
json_file = open(config_file, 'r')
config = json.load(json_file)

# 出力確認
config

{'attention_probs_dropout_prob': 0.1,
 'hidden_act': 'gelu',
 'hidden_dropout_prob': 0.1,
 'hidden_size': 768,
 'initializer_range': 0.02,
 'intermediate_size': 3072,
 'max_position_embeddings': 512,
 'num_attention_heads': 12,
 'num_hidden_layers': 12,
 'type_vocab_size': 2,
 'vocab_size': 30522}

In [5]:
# 辞書変数をオブジェクト変数に変換するライブラリを使う
from attrdict import AttrDict

config = AttrDict(config)
config.hidden_size

768

## BERT用にLayerNormalization層を定義

In [7]:
# bert用にlayernormalization層を定義

class BertLayerNorm(nn.Module):
    """
    LayerNormalization層
    """
    
    def __init__(self, hidden_size, eps=1e-12):
        super(BertLayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(hidden_size))
        self.beta = nn.Parameter(torch.zeros(hidden_size))
        self.bariance_epsilon = eps
        
    def forward(self, x):
        u = x.mean(-1, keepdim=True)
        s = (x-u).pow(2).mean(-1, keepdim=True)
        x = (x-u)/torch.sqrt(s+self.variance_epsilon)
        return self.gamma * x + self.beta
    

## Embeddingsモジュールの実装

In [10]:
class BertEmbeddings(nn.Module):
    """
    文章の単語ID列と1文目か2文目かの情報を埋め込みベクトルに変換する
    """
    
    def __init__(self, config):
        super(BertEmbeddings, self).__init__()
        
        # 3つのベクトル表現の埋め込み
        # Token Embedding : 単語IDを単語ベクトルに変換
        # vocab_size = 30522でBERTの学習済みモデルで使用したボキャブラリーの量
        # hidden_size = 768 で特徴量ベクトルの長さは768
        self.word_embeddings = nn.Embedding(
            config.vocab_size, config.hidden_size, padding_idx=0)
        # padding_idx=0はidx=0の単語ベクトルは0にする。BERTのボキャブラリーのidx=0が[PAD]
        
        # Transformer Positional  Embedding : 位置情報テンソルをベクトルに変換
        # Transformerの場合はsin, cosからなる固定値だったがBERTでは学習させる
        # max_position_embeddings=512で文の長さは512単語
        self.position_embeddings = nn.Embedding(
            config.max_position_embeddings, config.hidden_size)
        
        # Sentence Embedding:文章の1文目、2文目の情報をベクトルに変換
        # type_vocab_size = 2
        self.token_type_embeddings = nn.Embedding(
            config.type_vocab_size, config.hidden_size)
        
        # 上で作成したlayernormalization層
        self.LayerNorm = BertLayerNorm(config.hidden_size, eps=1e-12)
        
        # Dropout 'hidden_dropout_prob' : 0.1
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, input_ids, token_type_ids=None):
        """
        input_ids : [batch_size, seq_len]の文章の単語IDの羅列
        token_types_ids : [batch_size, seq_len]の各単語が1文目なのか2文目なのかを示すid
        """
        
        # 1. token Embeddings
        # 単語IDを単語ベクトルに変換
        words_embeddings = self.word_embeddings(input_ids)
        
        # 2. sentence embedding
        # token_type_idsがない場合は文章の全単語を1文目として0にする
        # input_idsと同じサイズのゼロテンソルを作成
        if token_type_ids is None:
            token_type_ids = torch.zeros_like(input_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids)
        
        # 3. Transformer Positional Embedding:
        # [0, 1, 2, ...]と文章の長さだけ連番が続いた[batch_size, seq_len]のテンソルposition_idsを作成
        # position_idsを入力してposition_embeddings層から768次元のテンソルを取り出す
        seq_length = input_ids.size(1)  # 文の長さ
        position_ids = torch.arange(
            seq_length, dtype=torch.long, device=input_ids.device)
        position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        
        # 3つの埋め込みテンソルを足し合わせる[batch_size, seq_len, hidden_size]
        embeddings =words_embeddings + position_embeddings + token_type_embeddings
        
        # layernormalizationとDropout
        embeddings = self.LayerNorm(embeddings)
        embeddings = self.dropout(embeddings)
        
        return embeddings

## BertLayerモジュール

In [12]:
class BertLayer(nn.Module):
    """
    BERTのBertLayerモジュール。Transformerと同じ。
    BERTでは12回繰り返される
    """
    
    def __init__(self, config):
        super(BertLayer, self).__init__()
        
        # self-attentionの部分
        self.attention = BertAttention(config)
        
        # self-attentionの出力を処理する全結合層
        self.intermediate = BertIntermediate(config)
        
        # self-attentionによる特徴量とBertLayerへの元の入力を足し算する層
        self.output = BertOutput(config)
        
    def forward(self, hidden_states, attention_mask, attention_show_flg=False):
        """
        hidden_states: Embedderモジュールの出力テンソル[batch_size, seq_len, hidden_size]
        attention_mask: Transformerのマスクと同じ働きのマスキング
        attention_show_flg: self-attentionの重みを返すかのフラグ
        """
        if attention_show_flg == True:
            """attention_probsもリターンする"""
            attention_output, attention_probs = self.attention(
                hidden_states, attention_mask, attention_show_flg)
            intermediate_output = self.intermediate(attention_output)
            layer_output = self.output(intermediate_output, attention_output)
            
            return layer_output, attention_probs
        
        elif attention_show_flg == False:
            attention_output = self.attention(
                hidden_states, attention_mask, attention_show_flg)
            intermediate_output = self.intermediate(attention_output)
            layer_output = self.output(intermediate_output, attention_output)
            
            return layer_output

In [17]:
class BertAttention(nn.Module):
    """
    BertLayerモジュールのself-attention部分
    """
    def __init__(self, config):
        super(BertAttention, self).__init__()
        self.selfattn = BertSelfAttention(config)
        self.output = BertSelfOutput(config)
        
    def forward(self, input_tensor, attention_mask, attention_show_flg=False):
        """
        input_tensor : Embeddingsモジュールもしくは前段のBertLayerからの出力
        attention_mask : Transformerのマスクと同じ働き padの部分を無効化
        attention_show_flg : self-attentionの重みを返すかのフラグ
        """
        
        if attention_show_flg == True:
            """attention_showのときはattention_probsもリターンする"""
            self_output, attention_probs = self.self.selfattn(
                input_tensor, attention_mask, attention_show_flg)
            attention_output = self.output(self_output, input_tensor)
           
            return attention_outuput, attention_probs
        
        elif attention_show_flg == False:
            self_output = self.self.selfattn(
                input_tensor, attention_mask, attention_show_flg)
            attention_output = self.output(self_output, input_tensor)
            
            return attention_outuput

In [18]:
class BertSelfAttention(nn.Module):
    """
    BertAttentionのSelf-Attention
    """
    
    def __init__(self, config):
        super(BertSelfAttention, self).__init__()
        
        # num_attention_heads = 12
        self.num_attention_heads = config.num_attention_heads
        
        self.attention_head_size = int(
            config.hidden_size / config.num_attention_heads) # 768/12  =  64
        self.all_head_size = self.num_attention_heads *self.attention_head_size
        
        # self-attentionの特徴量を作成する全結合
        self.query = nn.Linear(config.hidden_size, self.all_head_size)
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)
        
        # dropout
        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
        
    def transpose_for_scores(self, x):
        """
        multi-head attention用にテンソルの形を変換する
        [batch_size, seq_len, hidden] -> [batch_size, 12, seq_len, hidden/12]
        """
        
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
        x = x.view(*new_x_shape)  # 可変長引数として入れる
        return x.permute(0, 2, 1, 3)
    
    def forward(self, hidden_states, attention_mask, attention_show_flg=False):
        """
        hidden_states : Embeddingsモジュールもしくは前段のBertLayerからの出力
        attention_mask : Transformerのマスクと同じ働き padの部分を無効化
        attention_show_flg : self-attentionの重みを返すかのフラグ
        """
        
        # 入力を全結合層で特徴量変換(multi-head Attentionの全部をまとめて変換)
        mixed_query_layer = self.query(hidden_states)
        mixed_key_layer = self.key(hidden_states)
        mixed_value_layer = self.value(hidden_states)
        
        # multi-head attention用にテンソルの形を変換
        query_layer = self.transpose_for_scores(mixed_query_layer)
        key_layer = self.transpose_for_scores(mixed_key_layer)
        value_layer = self.transpose_for_scores(mixed_value_layer)
        
        # 特徴量同士を掛け算して似ている度合いをAttention_scoreとして求める
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        attention_scores = attention_scores / math.sqrt(self.attention_head_size)
        
        # マスク部分にマスクを掛ける
        attention_scores = attention_scores + attention_mask
        # この後softmaxで正規化する際に0になればいいので、
        # maskされた部分は-infにする。
        # attention_maskは0, -infで構成されている
        
        # attentionを正規化
        attention_probs = nn.Sequential(dim=-1)(attention_scores)
        
        # dropout
        attention_probs = self.dropout(attention_probs)
        
        # attention mapを掛け算
        context_layer = torch.matmul(attention_probs, value_layer)
        
        # multi-head Attetnionのテンソルの形をもとに戻す
        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size, )
        context_layer = context_layer.view(*new_context_layer_shape)
        
        # attention_showのときはattention_probsもリターン
        if attention_show_flg == True:
            return context_layer, attention_probs
        elif attention_show_flg == False:
            return context_layer
    
        

In [20]:
class BertSelfoutput(nn.Module):
    """
    BertSelfAttentionの出力を処理する全結合
    """
    
    def __init__(self, config):
        super(BertSelfoutput, self).__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.LayerNorm = BertLayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, hidden_states, input_tensor):
        """
        hidden_states：BertSelfAttentionの出力テンソル
        input_tensor：Embeddingsモジュールもしくは前段のBertLayerからの出力
        """
        hidden_states = self.dense(hidden_states)
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.LayerNorm(hidden_states + input_tensor)
        
        return hidden_states


In [25]:
def gelu(x):
    """
    Gaussian Error Linear Unit　（活性化関数）
    ReLUが０で微分不可能なので、なめらかにした形のReLU
    """
    
    return x*0.5*(1.0+torch.erf(x/math.sqrt(2.0)))

class BertIntermediate(nn.Module):
    """
    BERTのTransformerBlockモジュールのFeedForward
    """
    
    def __init__(self, config):
        super(BertIntermediate, self).__init__()
        
        # 全結合層　　hidden_size: 768,  intermediate_size: 3072
        self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
        
        # 活性化関数gelu
        self.intermediated_act_fn = gelu
        
    def forward(self, hidden_states):
        """
        hidden_states: BertAttentionの出力テンソル
        """
        hidden_states = self.dense(hidden_states)  # 全結合層
        hidden_states = self.intermediated_act_fn(hidden_states)  # 活性化関数gelu
        
        return hidden_states
        

In [26]:
class BertOutput(nn.Module):
    """
    BERTのTransformerBlockモジュールのFeedForward
    """
    def __init__(self, config):
        super(BertOutput, self).__init__()
        
        # 全結合層 
        self.dense  = nn.Linear(config.intermediate_size, config.hidden_size)
        self.LayerNorm = BertLayerNorm(config.hidden_size, eps=1e-12)
        
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, hidden_states, input_tensor):
        """
        hidden_states : BertOntermediatedの出力テンソル
        input_tensor : BertAttentionの出力テンソル
        """
        
        hidden_states = self.dense(hidden_states)
        hidden_states = self.dropout(hidden_states)
        hidden_states =self.LayerNorm(hidden_states + input_tensor)
        return hidden_states

## BertLayerモジュールの繰り返し部分

In [27]:
class BertEncoder(nn.Module):
    def __init__(self, conifg):
        """
        BertLayerモジュールの繰り返し部分モジュールの繰り返し部分
        """
        
        super(BertEncoder, self).__init__()
        
        # config.num_hidden_layersの値、12個のBertLayerモジュールを作る
        self.layer = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)])
        
    def forward(self, hidden_states, attention_mask, output_all_encoded_layers=True, attention_show_flg=False):
        """
        hidden_states: Embeddingsモジュールの出力
        attention_mask: Transformerのマスクと同じ働き
        output_all_encoded_layers : 返り値を全transformerBlockモジュールの出力にするか、最終層だけにするかのフラグ
        attention_show_flg : self-attentionの重みを返すかのフラグ
        """
        
        # 返り値として使うリスト
        all_encoder_layers = []
        
        # bertlayerモジュールの処理を繰り返す
        for layer_module in self.layer:
            if attention_show_flg == True:
                hidden_states, attention_probs = layer_module(
                    hidden_states, attention_mask, attention_show_flg)
            elif attention_show_flg == False:
                hidden_states = layer_module(
                    hidden_states, attention_mask, attention_show_flg)
            
            # 返り値にBertLayerから出力された特徴量を12層分、全て使用する場合の処理
            if output_all_encoded_layers:
                all_encoder_layers.append(hidden_states)
                
        # 返り値に最後のBertLayerから出力された特徴量のみ使う場合の処理
        if not output_all_encoded_layers:
            all_encoder_layers.append(hidden_states)
            
        # attention_showのときはattention_probsのリターン
        if attention_show_flg == True:
            return all_encoder_layers, attention_probs
        elif attention_show_flg == False:
            return all_encoder_layers
        

## BertPoolerモジュール

In [28]:
class BertPooler(nn.Module):
    """
    入力文章の1単語目[cls]の特徴量を変換して保持するためのモジュール
    """
    
    def __init__(self, config):
        super(BertPooler, self).__init__()
        
        # 全結合層
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.activation = nn.Tanh()
        
    def forward(self, hidden_states):
        # 1単語目の特徴量を取得
        first_token_tensor = hidden_states[:, 0]
        # 全結合層で特徴量変換
        pooled_output = self.dense(first_token_tensor)
        # 活性化関数Tanh
        pooled_output = self.activation(pooled_output)
        
        return pooled_output

## 動作確認

In [29]:
# 入力の単語ID列　
input_ids = torch.LongTensor([[31, 51, 12, 23, 99], [15, 5, 1, 0, 0]])
print('入力単語ID列のテンソルサイズ：', input_ids.shape)

# マスク
attention_mask = torch.LongTensor([[1,1,1,1,1], [1,1,1,0,0]])
print('入力マスクのテンソルサイズ：', attention_mask.shape)

# 文章のID
token_type_ids = torch.LongTensor([[0,0,1,1,1], [0,1,1,1,1]])
print('入力の文章IDのテンソルサイズ: ', token_type_ids.shape)


# bertの各モジュールのインスタンス生成
# 引数全部config  DNAみたいに設計図の役割を果たしている、必要なところだけ使ってる
embeddings = BertEmbeddings(config)
encoder = BertEncoder(config)
pooler = BertPooler(config)

# マスクの変形　[batch_size, 1, 1, seq_len]に変形
# Attentionかけない部分はマイナス無限にしてsigmoidで0にしたいので代わりに-10000を掛け算
extended_attention_mask = attention_mask.unsqeeze(1).unsqeeze(2)
extended_attention_mask = extended_attention_mask.to(dtype=torch.float32)
extended_attention_mask = (1.0-extended_attention_mask) * -10000.0
print('拡張したてマスクのテンソルサイズ：', extended_attention_mask.shape)

# 順伝播
out1 = embeddings(input_ids, token_type_ids)
print('BertEmbeddingsの出力テンソルサイズ：', out1.shape)

out2 = encoder(out1, extended_attention_mask)
print('BertEncorderの最終層の出力テンソルサイズ：', out2.shape)
# [minibatch, seq_length, embedding_dim]が12このリスト

out3 = pooler(out2[-1])
print('BertPoolerの出力テンソルサイズ：', out3.shape)
# out2は12層の特徴量のリストになっているので一番最後を使用する


入力単語ID列のテンソルサイズ： torch.Size([2, 5])
入力マスクのテンソルサイズ： torch.Size([2, 5])
入力の文章IDのテンソルサイズ:  torch.Size([2, 5])


NameError: name 'BertSelfOutput' is not defined

## 全部つなげてBERTモデルにする

In [32]:
class BertModel(nn.Module):
    '''上で作成したモジュールを全部つなげたBERTモデル'''
    
    def __init__(self, config):
        super(BertModel, self).__init__()
        
        # 3つのモジュールのインスタンスをインスタンス変数として保持
        self.embeddings = BertEmbeddings(config)
        self.encoder = BertEncoder(config)
        self.pooler = BertPooler(config)
        
        def forward(self, input_ids, token_type_ids=None, attention_mask=None, output_all_encoded_layers=True, attention_show_flg=False):
            '''
            input_ids : [batch_size, sequence_length]の文章の単語IDの羅列
            token_type_ids : [batch_size, sequence_length]の各単語が1文目なのか2文目なのかを表すid
            attention_mask : paddingに対して-10000をかけmask
            output_all_encoded_layers : 最終出力に12段のTransformerの全部をリストで返すか最後だけかを指定する
            attention_show_flg : self-attentionの重みを返すかのフラグ
            '''
            
            # attentionのマスクと文の1文目、2文目のidがなければ作成する
            if attention_mask == None:
                attention_mask = torch.ones_like(input_ids)  # maskするものはない
            if token_type_ids == None:
                token_type_ids = torch.zeros_like(input_ids)  # 全部1文目
                
            # マスクの変形[minibatch, 1, 1, seq_length]
            # 後でmulti-head Attentionに使用できる形にするために
            extended_attention_mask = attention_mask.unsqueeze(1).unssqueeze(2)
            
            # maskされているところを-10000かける
            extended_attention_mask = extended_attention_mask.to(dtype=torch.float32)
            extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0
            
            # 順伝播
            # BertEmbeddingsモジュール
            embedding_output = self.embeddings(input_ids, token_type_ids)
            
            # BertLayerモジュール(Transform)を繰り返すBertEncoderモジュール
            if attention_show_flg == True:
                '''attention_showのと言いはattention_probもリターンする'''
                
                encoded_layers, attention_probs = self.encoder(embedding_output, extended_attention_mask, output_all_encoded_layers, attention_show_flg)
                
            elif attention_show_flg == False:
                encoded_layers = self.encoder(embedding_output, extended_attention_mask, output_all_encoded_layers, attention_show_flg)
                
            # BertPoolerモジュール
            # encoderの一番最後のBertLayerから出力された特徴量を使う
            pooled_output = self.pooler(encoded_layers[-1])
            
            # output_all_encoded_layersがFalseの場合はリストでなくテンソルを返す
            if not output_all_encoded_layers:
                encoded_layers = encoded_layers[-1]
                
            # attention_showのときは（1番最後の）attention_probもリターンする
            if attention_show_flg == True:
                return encoded_layers, pooled_output, attention_probs
            elif attention_show_flg == False:
                return encoded_layers, pooled_outupt
            
            

In [None]:
# 動作確認
input_ids = torch.LongTensor([[31, 51, 12, 23, 99], [15, 5, 1, 0, 0]])
attention_mask = torch.LongTensor([[1, 1, 1, 1, 1], [1, 1, 1, 0, 0]])
token_type_ids = torch.LongTensor([[0, 0, 1, 1, 1], [0, 1, 1, 1, 1]])

# bertモデルの作成
net = BertModel(config)

# 順伝播
encoded_layers, pooled_output, attention_probs = net(
input_ids, token_type_ids, attention_mask, output_all_encoded_layers=False, attention_show_flg=True)

# 確認
print('encoded_layersのテンソルサイズ : ', encoded_layers.shape)
print('encoded_layersのテンソルサイズ : ', pooled_output.shape)
print('attention_probsのテンソルサイズ : ', attention_probs.shape)
