## 形態素解析の関数

In [12]:
import MeCab
def tokenize(text):
    """
    :param text: str, 日本語文
    :return tokenized: list of str, トークナイズされたリスト
    """
    tagger = MeCab.Tagger()
    node = tagger.parseToNode(text)
    tokenized = []
    while node:
        if node.surface != '':
            tokenized.append(node.surface)
        node = node.next
    
    return tokenized

In [13]:
tokenize('古池や蛙飛び込む水の音')

['古池', 'や', '蛙', '飛び込む', '水', 'の', '音']

## Vocabクラスの定義

In [17]:
PAD_TOKEN = '<PAD>'
UNK_TOKEN = '<UNK>'
PAD = 0
UNK = 1
MIN_COUNT = 1
word2id = {PAD_TOKEN: PAD, UNK_TOKEN:UNK}

In [27]:
class Vocab(object):
    """語彙を管理するクラス"""
    def __init__(self, word2id={}):
        self.word2id = dict(word2id)
        self.id2word = {v:k for k,v in self.word2id.items()}
    
    def build_vocab(self, corpus, min_count=1):
        """テキストから語彙を構築するメソッド
        :pram corpus: list of list of str, コーパスとなるテキスト
        :pram min_count: int
        """
        word_counter = {} # 単語の出現回数をカウント
        for sentence in corpus:
            for word in sentence:
                word_counter[word] = word_counter.get(word, 0) + 1
            
        # min_count以上出現する単語のみを語彙に追加
        # 出現回数の多い順にIDを振る
    
        for word, count in sorted(word_counter.items(), key=lambda x: -x[1]):
            if count < min_count:
                break
            _id = len(word2id)
            self.word2id.setdefault(word, _id)
            self.id2word[_id] = word
        # 語彙に含まれる単語の出現回数
        self.row_vocab = {w:word_counter[w] for w in self.word2id.keys() if w in word_counter}

In [32]:
# 単語リストをIDのリストに変換する関数
def sentence_to_ids(vocab, sentence):
    """
    :pram vocab: vocabオブジェクト
    :pram sentence: list of str
    :return ids : list of int
    """
    ids = [vocab.word2id.get(word, UNK) for word in sentence]
    return ids

## CBOWの実装をやってみる

In [1]:
from Vocab import *
import MeCab
import numpy as np
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
# データローダーの定義
class DataLoader_CBOW(object):
    """CBOW用のデータローダー"""
    def __init__(self, text, batch_size=50, window=2):
        """
        :pram text: 教師データ list of lists of int
        :pram batch_size
        :pram window
        """
        self.text = text
        self.batch_size = batch_size
        self.window = window
        self.w_pointer = 0  # 単語単位のポインタ
        self.s_pointer = 0  # 文単位のポインタ
        self.n_sent = len(text)
    
    def __iter__(self):
        return self
    
    def __next__(self):
        """
        :return batch_X: (batch_size, window*2)のテンソル
        :return batch_Y: (batch_size, 1)のテンソル
        """
        
        batch_X = []
        batch_Y = []
        
        while len(batch_X) < self.batch_size:
            sent = self.text[self.s_pointer]
            target = sent[self.w_pointer]
            start = max(0, self.w_pointer - self.window)
            one_x = sent[start:self.w_pointer] + sent[self.w_pointer + 1:self.w_pointer + self.window + 1]
            one_x = pad_seq(one_x, self.window * 2)
            
            batch_X.append(one_x)
            batch_Y.append(target)
            
            self.w_pointer += 1
            if self.w_pointer >= len(sent):
                self.w_pointer = 0
                self.s_pointer += 1
            
                if self.s_pointer >= self.n_sent:
                    self.s_pointer = 0
                    raise StopIteration
                
        batch_X = torch.tensor(batch_X, dtype=torch.long, device=device)
        batch_Y = torch.tensor(batch_Y, dtype=torch.long, device=device)
        return batch_X, batch_Y

In [5]:
dataloader = DataLoader_CBOW([[1, 2, 3, 4], [5, 6, 7, 8], [9, 1, 2, 3], [4, 5, 6, 7]], batch_size=2, window=1)

In [6]:
for data in dataloader:
    print(data)

(tensor([[2, 0],
        [1, 3]]), tensor([1, 2]))
(tensor([[2, 4],
        [3, 0]]), tensor([3, 4]))
(tensor([[6, 0],
        [5, 7]]), tensor([5, 6]))
(tensor([[6, 8],
        [7, 0]]), tensor([7, 8]))
(tensor([[1, 0],
        [9, 2]]), tensor([9, 1]))
(tensor([[1, 3],
        [2, 0]]), tensor([2, 3]))
(tensor([[5, 0],
        [4, 6]]), tensor([4, 5]))


In [3]:
class CBOW(nn.Module):
    def __init__(self, vocab_size, embedding_size):
        """
        :param vocab_size: int, 語彙の総数
        :param embedding_size: int, 単語埋め込みベクトルの次元
        """
        super(CBOW, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_size = embedding_size
        self.emb = nn.Embedding(self.vocab_size, self.embedding_size, padding_idx=0)  # Embedding層の定義
        self.linear = nn.Linear(self.embedding_size, self.vocab_size, bias=False)  # 全結合層（バイアスなし）
        
    def forward(self, batch_X, batch_Y):
        """
        :pram batch_X: Tensor(dtype=torch.long), (batch_size, window*2)
        :pram batch_Y: Tensor(dtype=torch.long), (batch_size, 1)
        :return loss: CBOWのロス
        """
        
        emb_X = self.emb(batch_X) # (batch_size, window*2, embedding_size)
        sum_X = torch.sum(emb_X, dim=1)  # (batch_size, embedding_size)
        lin_X = self.linear(sum_X)  # (batch_size, vocab_size)
        log_prob_X = F.log_softmax(lin_X, dim=-1) # (batch_size, vocab_size)
        loss = F.nll_loss(log_prob_X, batch_Y)
        return loss

In [4]:
with open('./data/kokoro.txt', 'r') as f:
    sentences = f.readlines()

In [5]:
sentences = [sent.strip() for sent in sentences]
sentences = [tokenize(sent) for sent in sentences]
vocab = Vocab(word2id)
vocab.build_vocab(sentences, min_count=3, min_length=2)

In [6]:
sentences = [sentence_to_ids(vocab, sent) for sent in sentences]

In [13]:
cbow = CBOW(vocab_size=len(vocab.word2id), embedding_size=128)
optimizer_cbow = optim.Adam(cbow.parameters())
dataloader_cbow = DataLoader_CBOW(sentences, batch_size=50)

In [8]:
def compute_loss(model, input, optimizer=None, is_train=True):
    """lossを計算するための関数
    
    is_train=Trueならモデルをtrainモードに、
    is_train=Falseならモデルをevaluationモードに設定します
    
    :param model: 学習させるモデル
    :param input: モデルへの入力
    :param optimizer: optimizer
    :param is_train: bool, モデルtrainさせるか否か
    """
    model.train(is_train)

    # lossを計算します。
    loss = model(*input)

    if is_train:
        # .backward()を実行する前にmodelのparameterのgradientを全て0にセットします
        optimizer.zero_grad()
        # parameterのgradientを計算します。
        loss.backward()
        # parameterのgradientを用いてparameterを更新します。
        optimizer.step()

    return loss.item()

In [17]:
start_at = time.time()
n_batches = 1000

for batch_id, (batch_X, batch_Y) in enumerate(dataloader_cbow):
    loss = compute_loss(cbow, (batch_X, batch_Y), optimizer=optimizer_cbow, is_train=True)
    if batch_id % 100 == 0:
        print("batch:{}, loss:{:.4f}".format(batch_id, loss))
    if batch_id >= n_batches:
        break

end_at = time.time()

print("Elapsed time: {:.2f} [sec]".format(end_at - start_at))

batch:0, loss:9.7713
batch:100, loss:3.5774
batch:200, loss:5.3056
batch:300, loss:4.4193
batch:400, loss:3.7120
batch:500, loss:4.2474
batch:600, loss:4.7219
batch:700, loss:3.9432
batch:800, loss:2.1595
batch:900, loss:3.8819
batch:1000, loss:3.5987
Elapsed time: 6.07 [sec]


In [18]:
torch.save(cbow.emb.weight.data.cpu().numpy(), "./models/cbow_embedding.pth")

# 埋め込み層のパラメータのみを保存する
torch.save(cbow.embedding.weight.data.cpu().numpy(),  "./data/cbow_embedding.pth")

# 保存したパラメータの読み込み方
e = torch.load("./data/cbow_embedding.pth")
e

In [11]:
def compute_word_similarity(embedding_path, word, n):
    """
    与えられた単語に最も似ている単語とcos類似度を返す関数

    :param embedding_path: str, 保存した埋め込み層のパラメータのパス
    :param word: str, 単語
    :param n: int
    :return out: str, 上位n個の類似単語とそのcos類似度
    """
    embedding = torch.load(embedding_path)

    # 単語ベクトルを全て単位ベクトルにする
    norm = np.linalg.norm(embedding, ord=2, axis=1, keepdims=True)
    norm = np.where(norm==0, 1, norm) # 0で割ることを避ける
    embedding /= norm
    e = embedding[vocab.word2id[word]]

    # 単語ベクトル同士のcos類似度を計算する
    cos_sim = np.dot(embedding, e.reshape(-1, 1)).reshape(-1,)
    most_sim = np.argsort(cos_sim)[::-1][1:n+1] # 自分は除く
    most_sim_words = [vocab.id2word[_id] for _id in most_sim]
    top_cos_sim = cos_sim[most_sim]
    out = ", ".join([w+"({:.4f})".format(v) for w, v in zip(most_sim_words, top_cos_sim)])
    return out

In [19]:
compute_word_similarity('./models/cbow_embedding.pth', '先生', 5)

'準備(0.2844), 二人(0.2808), 落ち付い(0.2740), 大きな(0.2513), 近づく(0.2476)'

In [24]:
vocab.id2word[2]

'です'