# インポート

In [18]:
import os
import io
from glob import glob
import string
import re
import random
import math

import torchtext
from torchtext.vocab import Vectors

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# IMDbのデータを作成

In [19]:
# 訓練データの作成
with open('./data/IMDb_train.tsv', 'w') as f:

    path = './data/aclImdb/train/pos/'
    for fname in glob(os.path.join(path, '*.txt')):
        with io.open(fname, 'r', encoding='utf-8') as ff:
            text = ff.readline()
            text = text.replace('\t', ' ')
            text = text+'\t'+'1'+'\t'+'\n'
            f.write(text)


    path = './data/aclImdb/train/neg/'
    for fname in glob(os.path.join(path, '*.txt')):
        with io.open(fname, 'r', encoding='utf-8') as ff:
            text = ff.readline()
            text = text.replace('\t', ' ')
            text = text+'\t'+'0'+'\t'+'\n'
            f.write(text)

# Datasetの作成

In [20]:
def get_IMDb_DataLoaders_and_TEXT(max_length=256, batch_size=24):
    """IMDbのDataLoaderとTEXTオブジェクトを取得する。 """

    # 訓練データのtsvファイルを作成します
    f = open('./data/IMDb_train.tsv', 'w')

    path = './data/aclImdb/train/pos/'
    for fname in glob(os.path.join(path, '*.txt')):
        with io.open(fname, 'r', encoding="utf-8") as ff:
            text = ff.readline()

            # タブがあれば消しておきます
            text = text.replace('\t', " ")

            text = text+'\t'+'1'+'\t'+'\n'
            f.write(text)

    path = './data/aclImdb/train/neg/'
    for fname in glob(os.path.join(path, '*.txt')):
        with io.open(fname, 'r', encoding="utf-8") as ff:
            text = ff.readline()

            # タブがあれば消しておきます
            text = text.replace('\t', " ")

            text = text+'\t'+'0'+'\t'+'\n'
            f.write(text)

    f.close()

   # テストデータの作成
    f = open('./data/IMDb_test.tsv', 'w')

    path = './data/aclImdb/test/pos/'
    for fname in glob(os.path.join(path, '*.txt')):
        with io.open(fname, 'r', encoding="utf-8") as ff:
            text = ff.readline()

            # タブがあれば消しておきます
            text = text.replace('\t', " ")

            text = text+'\t'+'1'+'\t'+'\n'
            f.write(text)

    path = './data/aclImdb/test/neg/'
    for fname in glob(os.path.join(path, '*.txt')):
        with io.open(fname, 'r', encoding="utf-8") as ff:
            text = ff.readline()

            # タブがあれば消しておきます
            text = text.replace('\t', " ")

            text = text+'\t'+'0'+'\t'+'\n'
            f.write(text)
    f.close()

    def preprocessing_text(text):
        # 改行コードを消去
        text = re.sub('<br />', '', text)

        # カンマ、ピリオド以外の記号をスペースに置換
        for p in string.punctuation:
            if (p == ".") or (p == ","):
                continue
            else:
                text = text.replace(p, " ")

        # ピリオドなどの前後にはスペースを入れておく
        text = text.replace(".", " . ")
        text = text.replace(",", " , ")
        return text

    # 分かち書き（今回はデータが英語で、簡易的にスペースで区切る）
    def tokenizer_punctuation(text):
        return text.strip().split()


    # 前処理と分かち書きをまとめた関数を定義
    def tokenizer_with_preprocessing(text):
        text = preprocessing_text(text)
        ret = tokenizer_punctuation(text)
        return ret


    # 読み込んだ内容に対して行う処理を定義
    TEXT = torchtext.data.Field(sequential=True, tokenize=tokenizer_with_preprocessing, use_vocab=True,
                                lower=True, include_lengths=True, batch_first=True, fix_length=max_length, 
                                init_token="<cls>", eos_token="<eos>")
    LABEL = torchtext.data.Field(sequential=False, use_vocab=False)

    # フォルダ「data」から各tsvファイルを読み込み
    train_val_ds, test_ds = torchtext.data.TabularDataset.splits(
        path='./data/', train='IMDb_train.tsv',
        test='IMDb_test.tsv', format='tsv',
        fields=[('Text', TEXT), ('Label', LABEL)])

    # 訓練データとvalidationデータを分ける
    train_ds, val_ds = train_val_ds.split(
        split_ratio=0.8, random_state=random.seed(1234))

    # torchtextで単語ベクトルとして英語学習済みモデルを読み込みます
    english_fasttext_vectors = Vectors(name='data/wiki-news-300d-1M.vec')

    # ボキャブラリー
    TEXT.build_vocab(train_ds, vectors=english_fasttext_vectors, min_freq=10)

    # DataLoaderを作成
    train_dl = torchtext.data.Iterator(
        train_ds, batch_size=batch_size, train=True)

    val_dl = torchtext.data.Iterator(
        val_ds, batch_size=batch_size, train=False, sort=False)

    test_dl = torchtext.data.Iterator(
        test_ds, batch_size=batch_size, train=False, sort=False)

    return train_dl, val_dl, test_dl, TEXT

In [21]:
train_dl, val_dl, test_dl, TEXT = get_IMDb_DataLoaders_and_TEXT(
    max_length=256, batch_size=24)

# Transformerの作成

## Embedder

In [22]:
class Embedder(nn.Module):
    def __init__(self, text_embedding_vectors):
        super(Embedder, self).__init__()
        
        self.embeddings = nn.Embedding.from_pretrained(
        embeddings=text_embedding_vectors, freeze=True)
        
    def forward(self, x):
        y = self.embeddings(x)
        return y

## PositionalEncoder

In [23]:
class PositionalEncoder(nn.Module):
    def __init__(self, d_model=300, max_seq_len=256):
        super(PositionalEncoder, self).__init__()
        
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # 単語ベクトルの次元数
        self.d_model = d_model
        
        # 単語の順番posとベクトルの次元位置iの(p, i)によって一意に定まる表を作成する
        pe = torch.zeros(max_seq_len, d_model)
        
        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                pe[pos, i] = math.sin(
                                        pos / (10000 ** ((2*i)/d_model)))
                pe[pos, i+1] = math.cos(
                                        pos / (10000 ** ((2*(i+1))/d_model)))
        
        self.pe = pe.to(device).unsqueeze(0)
        
        # 勾配を計算しないようにする
        self.pe.requires_grad = False
        
    def forward(self, x):
        # 入力xとPositional Encoderを足し算する
        ret = math.sqrt(self.d_model)*x + self.pe
        return ret

## Attention

In [24]:
# Attentionの作成
class Attention(nn.Module):
    def __init__(self, d_model=300):
        super().__init__()
        
        # 特徴量の作成
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        
        # 出力の全結合層
        self.out = nn.Linear(d_model, d_model)
        
        # Attentionの大きさ調整の変数
        self.d_k = d_model
        
    def forward(self, q, k, v, mask):
        q = self.q_linear(q)
        k = self.k_linear(k)
        v = self.v_linear(v)
        
        # Attentionの値を計算する
        weights = torch.matmul(q, k.transpose(1, 2)) / math.sqrt(self.d_k)
        
        # maskを計算
        mask = mask.unsqueeze(1)
        weights = weights.masked_fill(mask==0, -1e9)
        
        # softmaxで規格化する
        normalized_weights = F.softmax(weights, dim=-1)
        
        # AttentionをValueと掛け算
        output = torch.matmul(normalized_weights, v)
        
        # 特徴量を変換
        output = self.out(output)
        
        return output, normalized_weights

## Feedward

In [25]:
# FeedForwardの作成
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=1024, dropout=0.1):
        super().__init__()
        
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.dropout(x)
        x = self.linear_2(x)
        return x

## TransformerBlock

In [26]:
# Transformer Blockの作成
class TransformerBlock(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super().__init__()
        
        # LayerNorm層
        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)
        
        # Attention層
        self.attn = Attention(d_model)
        
        # 全結合層
        self.ff = FeedForward(d_model)
        
        # Dropout
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        # 正規化とAttention
        x_normalized = self.norm_1(x)
        output, normalized_weights = self.attn(
            x_normalized, x_normalized, x_normalized, mask)
        
        x2 = x + self.dropout_1(output)
        
        # 正規化と全結合層構築
        x_normalized2 = self.norm_2(x2)
        output = x2 + self.dropout_2(self.ff(x_normalized2))
        
        return output, normalized_weights

## ClassificationHead

In [27]:
class ClassificationHead(nn.Module):
    def __init__(self, d_model=300, output_dim=2):
        super().__init__()
        
        # 全結合層
        self.linear = nn.Linear(d_model, output_dim)
        
        # 重み初期化
        nn.init.normal_(self.linear.weight, std=0.02)
        nn.init.normal_(self.linear.bias, 0)
        
    def forward(self, x):
        x0 = x[:, 0, :]   # 各文の先頭の単語の特徴量を取り出す
        out = self.linear(x0)
        
        return out

## TransformerClassification

In [28]:
class TransformerClassification(nn.Module):
    def __init__(self, text_embedding_vectors, d_model=300, max_seq_len=256,
                           output_dim=2):
        super().__init__()
        
        # モデルの構築
        self.net1 = Embedder(text_embedding_vectors)
        self.net2 = PositionalEncoder(d_model, max_seq_len)
        self.net3_1 = TransformerBlock(d_model)
        self.net3_2 = TransformerBlock(d_model)
        self.net4 = ClassificationHead(d_model, output_dim)
        
    def forward(self, x, mask):
        x1 = self.net1(x)
        x2 = self.net2(x1)
        x3_1, normalized_weights_1 = self.net3_1(x2, mask)
        x3_2, normalized_weights_2 = self.net3_2(x3_1, mask)
        x4 = self.net4(x3_2)
        return x4, normalized_weights_1, normalized_weights_2

# モデルの構築

In [29]:
# モデルの構築
net = TransformerClassification(
    text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)

In [30]:
# パラメータの初期化を定義
def weights_init(m):
    classname =  m.__class__.__name__
    if classname.find('Linear') != -1:
        nn.init.kaiming_normal_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0.0)


# 訓練モード
net.train()

# パラメータ初期化
net.net3_1.apply(weights_init)
net.net3_2.apply(weights_init)

TransformerBlock(
  (norm_1): LayerNorm((300,), eps=1e-05, elementwise_affine=True)
  (norm_2): LayerNorm((300,), eps=1e-05, elementwise_affine=True)
  (attn): Attention(
    (q_linear): Linear(in_features=300, out_features=300, bias=True)
    (v_linear): Linear(in_features=300, out_features=300, bias=True)
    (k_linear): Linear(in_features=300, out_features=300, bias=True)
    (out): Linear(in_features=300, out_features=300, bias=True)
  )
  (ff): FeedForward(
    (linear_1): Linear(in_features=300, out_features=1024, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
    (linear_2): Linear(in_features=1024, out_features=300, bias=True)
  )
  (dropout_1): Dropout(p=0.1, inplace=False)
  (dropout_2): Dropout(p=0.1, inplace=False)
)

In [31]:
TEXT.vocab.vectors

tensor([[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        ...,
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0344, -0.0601, -0.0251,  ...,  0.2494,  0.3044,  0.0519]])

# 学習・推論

In [32]:
# 損失関数
criterion = nn.CrossEntropyLoss()

# 最適化手法
learning_rate = 2e-5
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

In [39]:
# 訓練と検証

def train_model(net, datalloaders_dict, criterion, optimizer, num_epochs):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print('----start----')
    net.to(device)
    
    torch.backends.cudnn.benchmark = True
    
    for epoch in range(num_epochs):
        for phase in ['train', 'val']:
            if phase == 'train':
                net.train()
            else:
                net.eval()
            
            epoch_loss = 0.0
            epoch_corrects = 0
            
            for batch in (dataloaders_dict[phase]):
                inputs = batch.Text[0].to(device)
                labels = batch.Label.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    
                    # maskの作成
                    input_pad = 1
                    input_mask = (inputs != input_pad)
                    
                    # Transformerに入力
                    outputs, _, _ = net(inputs, input_mask)
#                     print(outputs)
                    loss = criterion(outputs, labels)
                    
                    _, preds = torch.max(outputs, 1)
                    
                    # 更新
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                    
                    # 結果の計算
                    epoch_loss += loss.item() * inputs.size(0)
                    epoch_corrects += torch.sum(preds == labels.data)
            
            # epochごとのlossと正解率
            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double() / len(dataloaders_dict[phase].dataset)
            
            print('Epoch {}/{} | {:^5} | Loss: {:.4f} Acc: {:.4f}'.format(
                                                                         epoch+1,
                                                                         num_epochs,
                                                                         phase,
                                                                         epoch_loss,
                                                                         epoch_acc))
    return net

In [40]:
# 辞書オブジェクトにまとめる
dataloaders_dict = {'train': train_dl, 'val': val_dl}

In [41]:
num_epochs = 10
net_trained = train_model(net, dataloaders_dict, criterion, optimizer, num_epochs=num_epochs)

----start----
Epoch 1/10 | train | Loss: 0.4133 Acc: 0.8156
Epoch 1/10 |  val  | Loss: 0.3838 Acc: 0.8300
Epoch 2/10 | train | Loss: 0.3843 Acc: 0.8309
Epoch 2/10 |  val  | Loss: 0.3945 Acc: 0.8226
Epoch 3/10 | train | Loss: 0.3657 Acc: 0.8402
Epoch 3/10 |  val  | Loss: 0.3650 Acc: 0.8374
Epoch 4/10 | train | Loss: 0.3530 Acc: 0.8452
Epoch 4/10 |  val  | Loss: 0.3611 Acc: 0.8428
Epoch 5/10 | train | Loss: 0.3454 Acc: 0.8520
Epoch 5/10 |  val  | Loss: 0.3798 Acc: 0.8356
Epoch 6/10 | train | Loss: 0.3358 Acc: 0.8544
Epoch 6/10 |  val  | Loss: 0.3544 Acc: 0.8506
Epoch 7/10 | train | Loss: 0.3289 Acc: 0.8612
Epoch 7/10 |  val  | Loss: 0.3514 Acc: 0.8482


KeyboardInterrupt: 

# テスト

In [50]:
# テストデータでの正答率を求める
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

net_trained.eval()
net_trained.to(device)

epoch_corrects = 0

for batch in test_dl:
    inputs = batch.Text[0].to(device)
    labels = batch.Label.to(device)
    
    with torch.set_grad_enabled(False):
        input_pad = 1
        input_mask = (inputs != input_pad)
        
        outputs, _, _ = net_trained(inputs, input_mask)
        
        _, preds = torch.max(outputs, 1)
        
        epoch_corrects += torch.sum(preds == labels.data)

In [51]:
epoch_acc = epoch_corrects.double() / len(test_dl.dataset)
print('テストデータ{}個の正解率: {:.4f}'.format(len(test_dl.dataset), epoch_acc))

テストデータ25000個の正解率: 0.8526


# Attentionの可視化

In [52]:
# HTMLを作成する関数を実装


def highlight(word, attn):
    "Attentionの値が大きいと文字の背景が濃い赤になるhtmlを出力させる関数"

    html_color = '#%02X%02X%02X' % (
        255, int(255*(1 - attn)), int(255*(1 - attn)))
    return '<span style="background-color: {}"> {}</span>'.format(html_color, word)


def mk_html(index, batch, preds, normlized_weights_1, normlized_weights_2, TEXT):
    "HTMLデータを作成する"

    # indexの結果を抽出
    sentence = batch.Text[0][index]  # 文章
    label = batch.Label[index]  # ラベル
    pred = preds[index]  # 予測

    # indexのAttentionを抽出と規格化
    attens1 = normlized_weights_1[index, 0, :]  # 0番目の<cls>のAttention
    attens1 /= attens1.max()

    attens2 = normlized_weights_2[index, 0, :]  # 0番目の<cls>のAttention
    attens2 /= attens2.max()

    # ラベルと予測結果を文字に置き換え
    if label == 0:
        label_str = "Negative"
    else:
        label_str = "Positive"

    if pred == 0:
        pred_str = "Negative"
    else:
        pred_str = "Positive"

    # 表示用のHTMLを作成する
    html = '正解ラベル：{}<br>推論ラベル：{}<br><br>'.format(label_str, pred_str)

    # 1段目のAttention
    html += '[TransformerBlockの1段目のAttentionを可視化]<br>'
    for word, attn in zip(sentence, attens1):
        html += highlight(TEXT.vocab.itos[word], attn)
    html += "<br><br>"

    # 2段目のAttention
    html += '[TransformerBlockの2段目のAttentionを可視化]<br>'
    for word, attn in zip(sentence, attens2):
        html += highlight(TEXT.vocab.itos[word], attn)

    html += "<br><br>"

    return html

In [54]:
from IPython.display import HTML

batch = next(iter(test_dl))

inputs = batch.Text[0].to(device)
labels = batch.Label.to(device)

input_pad = 1
input_mask = (inputs != input_pad)

outputs, normilized_weights_1, normilized_weights_2 = net_trained(inputs, input_mask)
_, preds = torch.max(outputs, 1)

index = 3
html_output = mk_html(index, batch, preds, normilized_weights_1, normilized_weights_2, TEXT)
HTML(html_output)  # HTML形式で出力

In [55]:
index = 10
html_output = mk_html(index, batch, preds, normilized_weights_1, normilized_weights_2, TEXT)
HTML(html_output)  # HTML形式で出力