<a href="https://colab.research.google.com/github/cedro3/Transformer/blob/master/transformer_ja_run.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Transformerによる日本語データのネガポジ判定と根拠の可視化

- 使用するデータセットはchABSA-datasetです。これは、日本の上場企業の有価証券報告書から文章を取り出し、それがポジテイブな表現なのか、ネガティブな表現なのかをまとめたものです。
- 学習後、テスト文章で推論を行い、その文章のどの単語が判断根拠になっているか(Attention)を可視化します。


# Githubのコピー

In [None]:
!git clone https://github.com/cedro3/Transformer.git

In [None]:
cd Transformer

# Mecab＋辞書、mojimojiのインストール

In [None]:
# 形態素分析ライブラリーMeCab と 辞書(mecab-ipadic-NEologd)のインストール 
!apt-get -q -y install sudo file mecab libmecab-dev mecab-ipadic-utf8 git curl python-mecab > /dev/null
!git clone --depth 1 https://github.com/neologd/mecab-ipadic-neologd.git > /dev/null 
!echo yes | mecab-ipadic-neologd/bin/install-mecab-ipadic-neologd -n > /dev/null 2>&1
!pip install mecab-python3 > /dev/null
!ln -s /etc/mecabrc /usr/local/etc/mecabrc

# 日本語半角・全角変換ライブラリーmojimojiのインストール
!pip install mojimoji

# 日本語fastTextモデルのダウンロード

In [None]:
# Google drive からfastText日本語モデルの圧縮ファイル(vector_neologd.zip)をダウンロードする
import requests

def download_file_from_google_drive(id, destination):
    
       # ダウンロード先URL指定
       URL = "https://drive.google.com/uc?id=0ByFQ96A4DgSPUm9wVWRLdm5qbmc&export=download" 

       session = requests.Session()

       response = session.get(URL, params = { 'id' : id }, stream = True)
       token = get_confirm_token(response)

       if token:
           params = { 'id' : id, 'confirm' : token }
           response = session.get(URL, params = params, stream = True)

       save_response_content(response, destination)    

def get_confirm_token(response):
       for key, value in response.cookies.items():
           if key.startswith('download_warning'):
               return value

       return None

def save_response_content(response, destination):
       CHUNK_SIZE = 32768

       with open(destination, "wb") as f:
           for chunk in response.iter_content(CHUNK_SIZE):
               if chunk: # filter out keep-alive new chunks
                   f.write(chunk)

if __name__ == "__main__":
       file_id = 'TAKE ID FROM SHAREABLE LINK' 
       destination = './data/vector_neologd.zip'  # 保存先パスの指定
       download_file_from_google_drive(file_id, destination)

In [None]:
# フォルダ「data」内の「/vector_neologd.zip」を解凍する
import zipfile
zipf = zipfile.ZipFile('./data/vector_neologd.zip')  # 解凍するファイルの指定
zipf.extractall('./data/')  # 解凍先フォルダの指定
zipf.close()  # ZIPファイルをクローズ   

# フォルダ「data」内にfastText日本語モデル「model.vec」ができます。

# コード本体

In [None]:
# パッケージのimport
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torchtext


In [None]:
# 乱数のシードを設定
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)

# DatasetとDataLoaderを作成

In [None]:
from dataloader_ja import get_chABSA_DataLoaders_and_TEXT
# 読み込み
train_dl, val_dl, TEXT = get_chABSA_DataLoaders_and_TEXT(max_length=256, batch_size=8)

# 辞書オブジェクトにまとめる
dataloaders_dict = {"train": train_dl, "val": val_dl}


# ネットワークモデルの作成

In [None]:
from transformer import TransformerClassification

# モデル構築
net = TransformerClassification(
    text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)

# ネットワークの初期化を定義
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        # Liner層の初期化
        nn.init.kaiming_normal_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0.0)

# 訓練モードに設定
net.train()

# TransformerBlockモジュールを初期化実行
net.net3_1.apply(weights_init)
net.net3_2.apply(weights_init)
print('ネットワーク設定完了')

# 損失関数と最適化手法を定義

In [None]:
# 損失関数の設定
criterion = nn.CrossEntropyLoss()
# nn.LogSoftmax()を計算してからnn.NLLLoss(negative log likelihood loss)を計算

# 最適化手法の設定
learning_rate = 2e-5
optimizer = optim.Adam(net.parameters(), lr=learning_rate)


# 学習・検証を実施

In [None]:
# モデルを学習させる関数を作成


def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):

    # GPUが使えるかを確認
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("使用デバイス：", device)
    print('-----start-------')
    # ネットワークをGPUへ
    net.to(device)

    # ネットワークがある程度固定であれば、高速化させる
    torch.backends.cudnn.benchmark = True

    # epochのループ
    for epoch in range(num_epochs):
        # epochごとの訓練と検証のループ
        for phase in ['train', 'val']:
            if phase == 'train':
                net.train()  # モデルを訓練モードに
            else:
                net.eval()   # モデルを検証モードに

            epoch_loss = 0.0  # epochの損失和
            epoch_corrects = 0  # epochの正解数

            # データローダーからミニバッチを取り出すループ
            for batch in (dataloaders_dict[phase]):
                # batchはTextとLableの辞書オブジェクト

                # GPUが使えるならGPUにデータを送る
                inputs = batch.Text[0].to(device)  # 文章
                labels = batch.Label.to(device)  # ラベル

                # optimizerを初期化
                optimizer.zero_grad()

                # 順伝搬（forward）計算
                with torch.set_grad_enabled(phase == 'train'):

                    # mask作成
                    input_pad = 1  # 単語のIDにおいて、'<pad>': 1 なので
                    input_mask = (inputs != input_pad)

                    # Transformerに入力
                    outputs, _, _ = net(inputs, input_mask)
                    loss = criterion(outputs, labels)  # 損失を計算

                    _, preds = torch.max(outputs, 1)  # ラベルを予測（dim=1 列方向のＭａｘを取得、predsは最大のindex）

                    # 訓練時はバックプロパゲーション
                    if phase == 'train':
                        loss.backward()   #損失の計算
                        optimizer.step()  # 勾配の更新

                    # 結果の計算
                    epoch_loss += loss.item() * inputs.size(0)  # lossの合計を更新
                    # 正解数の合計を更新
                    epoch_corrects += torch.sum(preds == labels.data)

            # epochごとのlossと正解率
            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double(
            ) / len(dataloaders_dict[phase].dataset)

            print('Epoch {}/{} | {:^5} |  Loss: {:.4f} Acc: {:.4f}'.format(epoch+1, num_epochs,
                                                                           phase, epoch_loss, epoch_acc))

    return net


In [None]:
# 学習・検証を実行
num_epochs = 14
net_trained = train_model(net, dataloaders_dict,
                          criterion, optimizer, num_epochs=num_epochs)


In [None]:
torch.save(net_trained.state_dict(), './data/14_steps_fastText_weight.pth')

In [None]:
param = torch.load('./data/14_steps_fastText_weight.pth')
net_trained.load_state_dict(param)

In [None]:
net_trained

# Attentionの可視化で判定根拠を探る



In [None]:
# HTMLを作成する関数を実装


def highlight(word, attn):
    "Attentionの値が大きいと文字の背景が濃い赤になるhtmlを出力させる関数"

    html_color = '#%02X%02X%02X' % (
        255, int(255*(1 - attn)), int(255*(1 - attn)))
    return '<span style="background-color: {}"> {}</span>'.format(html_color, word)


def mk_html(index, batch, preds, normlized_weights_1, normlized_weights_2, TEXT):
    "HTMLデータを作成する"

    # indexの結果を抽出
    sentence = batch.Text[0][index]  # 文章
    print("sentenceの形状：", sentence.shape)
    label = batch.Label[index]  # ラベル
    print("labelの形状:", label)
    pred = preds[index]  # 予測
    print("pored:",pred.shape)
    print("sentence:", sentence)
    print(label)

    # indexのAttentionを抽出と規格化
    attens1 = normlized_weights_1[index, 0, :]  # 0番目の<cls>のAttention
    attens1 /= attens1.max()

    attens2 = normlized_weights_2[index, 0, :]  # 0番目の<cls>のAttention
    attens2 /= attens2.max()

    # ラベルと予測結果を文字に置き換え
    if label == 0:
        label_str = "Negative"
    else:
        label_str = "Positive"

    if pred == 0:
        pred_str = "Negative"
    else:
        pred_str = "Positive"

    # 表示用のHTMLを作成する
    html = '正解ラベル：{}<br>推論ラベル：{}<br><br>'.format(label_str, pred_str)

    # 1段目のAttention
    html += '[TransformerBlockの1段目のAttentionを可視化]<br>'
    for word, attn in zip(sentence, attens1):
        html += highlight(TEXT.vocab.itos[word], attn)
    html += "<br><br>"

    # 2段目のAttention
    html += '[TransformerBlockの2段目のAttentionを可視化]<br>'
    for word, attn in zip(sentence, attens2):
        html += highlight(TEXT.vocab.itos[word], attn)

    html += "<br><br>"

    return html


In [None]:
from IPython.display import HTML

# Transformerで処理
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net_trained.eval()   # モデルを検証モードに
net_trained.to(device)

# ミニバッチの用意
batch = next(iter(val_dl))

# GPUが使えるならGPUにデータを送る
inputs = batch.Text[0].to(device)  # 文章
labels = batch.Label.to(device)  # ラベル

print("inputs.shape=",inputs.shape)
# mask作成
input_pad = 1  # 単語のIDにおいて、'<pad>': 1 なので
input_mask = (inputs != input_pad)
print("input_mask.shape=",input_mask.shape)
#print(inputs)
print(input_mask[0])
# Transformerに入力
outputs, normlized_weights_1, normlized_weights_2 = net_trained(
    inputs, input_mask)
_, preds = torch.max(outputs, 1)  # ラベルを予測


index = 7 # 出力させたいデータ
html_output = mk_html(index, batch, preds, normlized_weights_1,
                      normlized_weights_2, TEXT)  # HTML作成
HTML(html_output)  # HTML形式で出力


# 推論用の1文章をインプットしてラベルとAttentionを可視化する。

In [None]:
def preprocessing_text(text):
    
    # 半角・全角の統一
    text = mojimoji.han_to_zen(text) 
    # 改行、半角スペース、全角スペースを削除
    text = re.sub('\r', '', text)
    text = re.sub('\n', '', text)
    text = re.sub('　', '', text)
    text = re.sub(' ', '', text)
    text = re.sub('（','', text)
    text = re.sub('）','', text)
    # 数字文字の一律「0」化
    text = re.sub(r'[0-9 ０-９]+', '0', text)  # 数字

    # カンマ、ピリオド以外の記号をスペースに置換
    for p in string.punctuation:
        if (p == ".") or (p == ","):
            continue
        else:
            text = text.replace(p, " ")

    return text

# 分かち書き
def tokenizer_mecab(text):
    m_t = MeCab.Tagger('-Owakati -d /usr/local/lib/mecab/dic/mecab-ipadic-neologd')
    text = m_t.parse(text)  # これでスペースで単語が区切られる
    ret = text.strip().split()  # スペース部分で区切ったリストに変換
    return ret

# 前処理と分かち書きをまとめた関数を定義
def tokenizer_with_preprocessing(text):
    text = preprocessing_text(text)  # 前処理の正規化
    ret = tokenizer_mecab(text)  # Janomeの単語分割

    return ret

def create_tensor(text, max_length):
    #入力文章をTorch Teonsor型にのINDEXデータに変換
    token_ids = torch.ones((max_length)).to(torch.int64)
    ids_list = list(map(lambda x: TEXT.vocab.stoi[x] , text))
    print(ids_list)
    for i, index in enumerate(ids_list):
        token_ids[i] = index
    return token_ids


# HTMLを作成する関数を実装


def highlight(word, attn):
    "Attentionの値が大きいと文字の背景が濃い赤になるhtmlを出力させる関数"

    html_color = '#%02X%02X%02X' % (
        255, int(255*(1 - attn)), int(255*(1 - attn)))
    return '<span style="background-color: {}"> {}</span>'.format(html_color, word)


def mk_html(input, preds, normlized_weights_1, normlized_weights_2, TEXT):
    "HTMLデータを作成する"

    # indexの結果を抽出
    index = 0
    sentence = input.squeeze_(0) # 文章  #  torch.Size([1, 256])  > torch.Size([256]) 
    pred = preds[0]  # 予測


    # indexのAttentionを抽出と規格化
    attens1 = normlized_weights_1[index, 0, :]  # 0番目の<cls>のAttention
    attens1 /= attens1.max()

    attens2 = normlized_weights_2[index, 0, :]  # 0番目の<cls>のAttention
    attens2 /= attens2.max()

    if pred == 0:
        pred_str = "Negative"
    else:
        pred_str = "Positive"

    # 表示用のHTMLを作成する
    html = '推論ラベル：{}<br><br>'.format(pred_str)
  
    # 1段目のAttention
    html += '[TransformerBlockの1段目のAttentionを可視化]<br>'
    for word, attn in zip(sentence, attens1):
        html += highlight(TEXT.vocab.itos[word], attn)
    html += "<br><br>"

    # 2段目のAttention
    html += '[TransformerBlockの2段目のAttentionを可視化]<br>'
    for word, attn in zip(sentence, attens2):
        html += highlight(TEXT.vocab.itos[word], attn)

    html += "<br><br>"

    return html

In [None]:
from IPython.display import HTML, display
from dataloader_ja import *   ####

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net_trained.eval()   # モデルを検証モードに
net_trained.to(device)

#インプットデータ
text = "課金売上に関しては、ユーザー数の増加により順調に推移した為、医科セグメントとしては、初の黒字化を達成する事が出来ました"
#textの先頭と末尾に<cls>、<eos>を追加する。
text = tokenizer_with_preprocessing(text)
text.insert(0, '<cls>')
text.append('<eos>')
#   '<cls>': 2, '<eos>': 3,
text = create_tensor(text, 256)
text = text.unsqueeze_(0)   #  torch.Size([256])  > torch.Size([1, 256])

# GPUが使えるならGPUにデータを送る
input = text.to(device)
print("input_shape=",input.shape)
# mask作成
input_pad = 1  # 単語のIDにおいて、'<pad>': 1 なので
input_mask = (input != input_pad)
#print(input)
#print(input_mask)

outputs, normlized_weights_1, normlized_weights_2 = net_trained(input, input_mask)
_, preds = torch.max(outputs, 1)  # ラベルを予測

html_output = mk_html(input, preds, normlized_weights_1, normlized_weights_2, TEXT)  # HTML作成


In [None]:
display(HTML(html_output))

以上