In [None]:
"""
【 self attention 】簡単に予測理由を可視化できる文書分類モデルを実装する
https://qiita.com/itok_msi/items/ad95425b6773985ef959#%E3%82%B3%E3%83%BC%E3%83%89
https://github.com/nn116003/self-attention-classification/blob/master/imdb_attn.py
"""

In [2]:
from torchtext import data
from torchtext import datasets
from torchtext.vocab import GloVe

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch
from torch.autograd import Variable

import unicodedata
import string

import dill

from itertools import chain

In [2]:
from gensim.models import word2vec, KeyedVectors

# 乾・岡崎研究室の日本語 Wikipedia エンティティベクトル   
model = KeyedVectors.load_word2vec_format('../entity_vector/entity_vector.model.bin', binary=True)

In [8]:
weights = model.vectors

print(type(weights))  # numpy.ndarray
weights.shape  # (3000000, 300)

vocab_size = weights.shape[0]
embedding_dim = weights.shape[1]
embed = nn.Embedding(vocab_size, embedding_dim)

# 学習済みの重みをセット
embed.weight = nn.Parameter(torch.from_numpy(weights))


<class 'numpy.ndarray'>


(1015474, 200)

In [None]:
# bi-LSTMによるエンコーダー
class EncoderRNN(nn.Module):
    def __init__(self, emb_dim, h_dim, v_size, gpu=True, v_vec=None, batch_first=True):
        super(EncoderRNN, self).__init__()
        self.gpu = gpu
        self.h_dim = h_dim
        self.embed = nn.Embedding(v_size, emb_dim) 
        if v_vec is not None:
            self.embed.weight.data.copy_(v_vec)
        self.lstm = nn.LSTM(emb_dim, h_dim, batch_first=batch_first,
                            bidirectional=True)

    def init_hidden(self, b_size):
        h0 = Variable(torch.zeros(1*2, b_size, self.h_dim))
        c0 = Variable(torch.zeros(1*2, b_size, self.h_dim))
        # GPUがあれば
        if self.gpu:
            h0 = h0.cuda()
            c0 = c0.cuda()
        return (h0, c0)

    def forward(self, sentence, lengths=None):
        self.hidden = self.init_hidden(sentence.size(0))
        emb = self.embed(sentence)
        packed_emb = emb

        if lengths is not None:
            lengths = lengths.view(-1).tolist()
            packed_emb = nn.utils.rnn.pack_padded_sequence(emb, lengths)

        out, hidden = self.lstm(packed_emb, self.hidden)

        if lengths is not None:
            out = nn.utils.rnn.pad_packed_sequence(output)[0]

        out = out[:, :, :self.h_dim] + out[:, :, self.h_dim:]

        return out

In [None]:
# Attentionクラス
# LSTMの隠れ層を入力として、各単語へのattentionを出力
class Attn(nn.Module):
    def __init__(self, h_dim):
        super(Attn, self).__init__()
        self.h_dim = h_dim
        self.main = nn.Sequential(
            nn.Linear(h_dim, 24),
            nn.ReLU(True),
            nn.Linear(24,1)
        )

    def forward(self, encoder_outputs):
        b_size = encoder_outputs.size(0)
        attn_ene = self.main(encoder_outputs.view(-1, self.h_dim)) # (b, s, h) -> (b * s, 1)
        return F.softmax(attn_ene.view(b_size, -1), dim=1).unsqueeze(2) # (b*s, 1) -> (b, s, 1)

In [None]:
# デコーダー・判別器
class AttnClassifier(nn.Module):
    def __init__(self, h_dim, c_num):
        super(AttnClassifier, self).__init__()
        self.attn = Attn(h_dim)
        self.main = nn.Linear(h_dim, c_num)


    def forward(self, encoder_outputs):
        attns = self.attn(encoder_outputs) #(b, s, 1)
        feats = (encoder_outputs * attns).sum(dim=1) # (b, s, h) -> (b, h)
        return F.log_softmax(self.main(feats)), attns

In [None]:
# 訓練
def train_model(epoch, train_iter, optimizer, log_interval=10):
    encoder.train()
    classifier.train()
    correct = 0
    for idx, batch in enumerate(train_iter):
        (x, x_l), y = batch.text, batch.label - 1
        optimizer.zero_grad()
        encoder_outputs = encoder(x)
        output, attn = classifier(encoder_outputs)
        loss = F.nll_loss(output, y)
        loss.backward()
        optimizer.step()
    
        pred = output.data.max(1, keepdim=True)[1]
        correct += pred.eq(y.data.view_as(pred)).cpu().sum()
        if idx % log_interval == 0:
            print('train epoch: {} [{}/{}], acc:{}, loss:{}'.format(
                epoch, idx*len(x), len(train_iter)*args.batch_size,
                correct/float(log_interval * len(x)),
                loss.data[0]))
            correct = 0

In [None]:
# テスト
def test_model(epoch, test_iter):
    encoder.eval()
    classifier.eval()
    correct = 0
    for idx, batch in enumerate(test_iter):
        (x, x_l), y = batch.text, batch.label - 1
        encoder_outputs = encoder(x)
        output, attn = classifier(encoder_outputs)
        pred = output.data.max(1, keepdim=True)[1]
        correct += pred.eq(y.data.view_as(pred)).cpu().sum()
        
    print('test epoch:{}, acc:{}'.format(epoch, correct/float(len(test))))