<a href="https://colab.research.google.com/github/KurodaHiroaki/hangman/blob/main/ATT_NMT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 学習
import torch # PyTorchという深層学習のライブラリをインポート
import torch.nn as nn # ニューラルネットワークのモジュールをインポート
import torch.optim as optim # 最適化アルゴリズムのモジュールをインポート
import torch.nn.functional as F # 活性化関数などの関数をインポート
import numpy as np # NumPyという数値計算のライブラリをインポート

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # GPU

# Data setting
id, eid2w, ew2id = 1, {}, {} # 英語の単語に対応するIDとその逆の辞書を初期化
with open("/content/drive/MyDrive/small_parallel_enja-master/train.en.vocab.4k", "r", encoding="utf-8") as f: # 英単語の語彙辞書
    for w in f:
        w = w.strip() # 単語の前後の空白を除去
        eid2w[id] = w # IDから単語の辞書に登録
        ew2id[w] = id # 単語からIDへの辞書に登録
        id += 1 # IDを1増やす
ev = id # 英語の語彙数を保存

edata = [] # 英語の文データを保存するリストを初期化
with open("/content/drive/MyDrive/small_parallel_enja-master/train.en","r",encoding="utf-8") as f: # 英語の文データを読み込む
    for sen in f:
        wl = [ew2id["<s>"]] # 文頭記号<s> に対応するIDをリストに追加
        for w in sen.strip().split(): # 文を単語に分割してキープ
            if w in ew2id: # 単語が辞書にあれば
                wl.append(ew2id[w]) # 単語に対応するIDをリストに追加
            else: # 単語が辞書になければ
                wl.append(ew2id["<unk>"]) # 未知語記号<unk>に対応するIDをリストに追加
        wl.append(ew2id["</s>"]) # 文末記号</s>に対応するIDをリストに追加
        edata.append(wl) # リストを文データとして保存

id, jid2w, jw2id = 1, {}, {} # 日本語の単語に対応するIDとその逆の辞書を初期化
with open("/content/drive/MyDrive/small_parallel_enja-master/train.ja.vocab.4k", "r", encoding="utf-8") as f: # 日本語の単語リストを読み込む
    id = 1
    for w in f:
        w = w.strip() # 単語の前後の空白を除去
        jid2w[id] = w # IDから単語への辞書に登録
        jw2id[w] = id # 単語からIDへの辞書に登録
        id += 1 # IDを1増やす
jv = id # 日本語の単語数を保存

jdata = [] # 日本語の文データを保存するリストを初期化
with open("/content/drive/MyDrive/small_parallel_enja-master/train.ja", "r", encoding="utf-8") as f: # 日本語の文データを読み込む
    for sen in f:
        wl = [jw2id["<s>"]] # 文頭記号<s>二対応するIDをリストに追加
        for w in sen.strip().split(): # 文を単語に分割してキープ
            if w in jw2id: # 単語が辞書にあれば
                wl.append(jw2id[w]) # 単語に対応するIDをリストに追加
            else: # 単語が辞書になければ
                wl.append(jw2id["<unk>"]) # 未知語記号<unk>に対応するIDをリストに追加
        wl.append(jw2id["</s>"]) # 文末記号</s>に対応するIDをリストに追加
        jdata.append(wl) # リストを文データとして保存

# Define model
class MyAttNMT(nn.Module): # Attention付きニューラル機械翻訳モデルのクラスを定義
    def __init__(self, jv, ev, k): # 初期化関数を定義
        super(MyAttNMT, self).__init__() # 親クラスの初期化関数を呼び出す
        self.jemb = nn.Embedding(jv, k) # 日本語の単語IDをk次元のベクトルに変換する層を定義
        self.eemb = nn.Embedding(ev, k) # 英語の単語IDをk次元のベクトルに変換する層を定義
        self.lstm1 = nn.LSTM(k, k, num_layers=2,
                             batch_first=True) # 日本語の文ベクトルを生成するLSTM層を定義
        self.lstm2 = nn.LSTM(k, k, num_layers=2,
                             batch_first=True) # 英語の文ベクトルを生成するLSTM層を定義(隠れ層のサイズと層数はk)
        self.Wc = nn.Linear(2*k,k) # Attentionで得られたコンテキストベクトルと出力ベクトルを結合して、k次元に変換する層を定義
        self.W = nn.Linear(k, ev) # 出力ベクトルから英語の単語IDへ変換する層を定義
    def forward(self, jline, eline): # 順伝播関数を定義(入力は日本語と英語の文のIDのリスト)
        x = self.jemb(jline) # 日本語の単語IDからベクトル変換
        ox, (hnx, cnx) = self.lstm1(x) # LSTMで日本語の文ベクトルと隠れ状態ベクトルを得る
        y = self.eemb(eline) # 英語の単語IDからベクトル変換
        oy, (hny,cny) = self.lstm2(y,(hnx,cnx)) # LSTMで英語の文ベクトルと隠れ状態ベクトルを入力
        ox1 = ox.permute(0,2,1) # 日本語の文ベクトルの次元を入れ替える
        sim = torch.bmm(oy,ox1) # 英語と日本語の文ベクトルの類似度を計算
        bs, yws, xws = sim.shape # # 類似度の形状を取得(バッチサイズ、英語の単語数、日本語の単語数)
        sim2 = sim.reshape(bs*yws,xws) # 類似度を2次元に変換
        alpha = F.softmax(sim2,dim=1).reshape(bs, yws, xws) # 類似度にソフトマックス関数を適用して確率分布に変換し、元の形状に戻す
        ct = torch.bmm(alpha,ox) # 確率分布と日本語の文ベクトルの積でコンテキストベクトルを得る
        oy1 = torch.cat([ct,oy],dim=2) # コンテキストベクトルと英語の文ベクトルを結合する
        oy2 = self.Wc(oy1) # 結合したベクトルをk次元に戻す
        return self.W(oy2) # k次元のベクトルから英語の単語IDへ変換する

# model generate, optimizer and criterion setting
demb = 200 # 埋め込み層や隠れ層の次元数を200に設定
net = MyAttNMT(jv, ev, demb).to(device) # モデルを生成し、GPUに転送
optimizer = optim.SGD(net.parameters(),lr=0.01) # 最適化アルゴリズムとして確率的勾配降下法を選択
criterion = nn.CrossEntropyLoss() # 損失関数として交差エントロピー誤差を選択

# Learn
for epoch in range(20): #エポック数を20に設定し、ループ
    loss1k = 0.0 # 損失関数を初期化
    for i in range(len(jdata)): # 日本語の文データの数だけループ
        jinput = torch.LongTensor([jdata[i][1:]]).to(device) # 日本語の文IDからテンソルに変換し、GPUに転送(文頭記号は除く)
        einput = torch.LongTensor([edata[i][:-1]]).to(device) # 英語の文IDからテンソルに変換し、GPUに転送(文末記号は除く)
        out = net(jinput, einput) #モデルに入力して出力を得る
        gans = torch.LongTensor([edata[i][1:]]).to(device) # 正解データとして英語の文IDからテンソルに変換し、GPUに転送(文頭記号は除く)
        loss = criterion(out[0],gans[0]) # 出力と正解データから損失関数を計算
        loss1k += loss.item() # 損失関数を累積
        if (i % 100 == 0): # 100回ごとに
            print(epoch, i, loss1k) # エポック数、回数、損失数を表示
            loss1k = 0.0 # 損失数初期化
        optimizer.zero_grad() # 勾配を初期化
        loss.backward() # 誤差逆伝播法で勾配を計算
        optimizer.step() # 最適化アルゴリズムでパラメーターを更新
    outfile = "attnmt-" + str(epoch) + ".model" # モデルのファイル名を設定
    torch.save(net.state_dict(),outfile) # モデルのパラメーターを保存

In [None]:
# テスト
import torch # Pythorchという深層学習のライブラリをインポート
import torch.nn as nn # ニューラルネットワークのモジュールをインポート
import torch.optim as optim # 最適化のモジュールをインポート
import torch.nn.functional as F # 活性化関数などの関数をインポート
import numpy as np # NumPyという数値計算のライブラリをインポート

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # GPU設定

# データの設定

id, eid2w, ew2id = 1, {}, {} # 英語の単語に対応するID、IDから単語に変換する辞書、単語からIDに変換する辞書を初期化
with open("/content/drive/MyDrive/small_parallel_enja-master/train.en.vocab.4k","r",encoding="utf-8") as f: # 英語の語彙ファイルを開く
    for w in f: # ファイルの各行に対して
        w = w.strip() # 単語を取得（改行文字を除く）
        eid2w[id] = w # IDから単語に変換する辞書に登録
        ew2id[w] = id # 単語からIDに変換する辞書に登録
        id += 1 # IDをインクリメント
ev = id # 英語の語彙数を保存

edata = [] # 英語のテストデータ（単語IDのリスト）を格納するリストを初期化
with open("/content/drive/MyDrive/small_parallel_enja-master/test.en","r",encoding="utf-8") as f: # 英語のテキストデータファイルを開く
    for sen in f: # ファイルの各行(文)にたいして
        wl = [ew2id["<s>"]] # 単語のIDリストを初期化(文頭記号<s>のIDを追加)
        for w in sen.strip().split(): # 文を単語に分割してループ
            if w in ew2id: # 単語が辞書にある場合
                wl.append(ew2id[w]) # 単語に対応するIDを追加
            else: # 単語が辞書にない場合(未知語)
                wl.append(ew2id["<unk>"]) # 未知語記号<unk>に対応するIDをリストに追加
        wl.append(ew2id["</s>"]) # 文末記号</s>に対応するIDをリストに追加
        edata.append(wl) # 単語IDのリストをテストデータに追加

id, jid2w, jw2id = 1, {}, {} # 日本語の単語に対応するID、IDから単語に変換する辞書、単語からIDに変換する辞書を初期化
with open("/content/drive/MyDrive/small_parallel_enja-master/train.ja.vocab.4k","r",encoding="utf-8") as f: # 日本語の語彙ファイルを開く
    id = 1 # IDを初期化
    for w in f: # ファイルの各行にたいして
        w = w.strip() # 単語を取得(改行文字を除く)
        jid2w[id] = w # IDから単語に変換する辞書に登録
        jw2id[w] = id # 単語からIDに変換する辞書に登録
        id += 1 # IDをインクリメント
jv = id # 日本語の語彙数を保存

jdata = [] # 日本語のテキストデータ(単語のIDのリスト)を格納するリストを初期化
with open("/content/drive/MyDrive/small_parallel_enja-master/test.ja","r",encoding="utf-8") as f: # 日本語のテストデータファイルを開く
    for sen in f: # ファイルの各行(文)にたいして
        wl = [jw2id["<s>"]] # 単語のIDのリストを初期化(文頭記号<s>のIDを追加)
        for w in sen.strip().split(): # 文を単語に分割してループ
            if w in jw2id:
                wl.append(jw2id[w]) # 単語に対応するIDをリストに追加
            else: # 単語が辞書にない場合(未知語)
                wl.append(jw2id["<unk>"]) # 未知語記号<unk>に対応するIDをリストに追加
        wl.append(jw2id["</s>"]) # 文末記号</s>に対応するIDをリストに追加
        jdata.append(wl) # 単語IDのリストをテストデータに追加

# モデルの定義

class MyAttNMT(nn.Module): # MyAttNMTというクラスを定義(nn.Moduleを継承)
    def __init__(self, jv, ev, k): # 初期化関数を定義(引数は日本語の語彙数、英語の語彙数、埋め込み次元数)
        super(MyAttNMT, self).__init__() # 親クラスの初期化関数を呼び出す
        self.jemb = nn.Embedding(jv, k) # 日本語の単語IDから埋め込みベクトルに変換する層を定義
        self.eemb = nn.Embedding(ev, k) # 英語の単語IDから埋め込みベクトルに変換する層を定義
        self.lstm1 = nn.LSTM(k, k, num_layers=2,
                             batch_first=True) # 日本語の埋め込みベクトルから隠れ状態ベクトルに変換するLSTM層(2層)を定義
        self.lstm2 = nn.LSTM(k, k, num_layers=2,
                             batch_first=True) # 英語の埋め込みベクトルから隠れ状態ベクトルに変換するLSTM層(2層)を定義
        self.Wc = nn.Linear(2*k, k) # コンテキストベクトルと隠れ状態ベクトルから出力ベクトルに変換する全結合層を定義
        self.W = nn.Linear(k, ev) # 出力ベクトルから英語の単語IDに変換する全結合層を定義
    def forward(self, jline, eline): # 順伝播関数を定義(引数は日本語の単語IDリスト、英語の単語IDリスト)
        x = self.jemb(jline) # 日本語の単語IDのリストを埋め込みベクトルに変換する
        ox, (hnx, cnx) = self.lstm1(x) # 埋め込みベクトルから隠れ状態ベクトルに変換(oxは各時刻の隠れ状態ベクトルのリスト、hnxとcnxは最終時刻の隠れ状態ベクトルとセル状態ベクトル)
        y = self.eemb(eline) # 英語の単語IDのリストを埋め込みベクトルに変換
        oy, (hny, cny) = self.lstm2(y, (hnx, cnx)) # 埋め込みベクトルから隠れ状態ベクトルに変換(oyは各時刻の隠れ状態ベクトルのリスト、hnyとcnyは最終時刻の隠れ状態ベクトルとセル状態ベクトル)
        ox1 = ox.permute(0,2,1) # oxの次元を入れ替える(バッチサイズ、埋め込み次元数、時刻数)
        sim = torch.bmm(oy,ox1) # oyとox1のバッチごとの行列積を計算(simは各時刻の英語の隠れ状態ベクトルと日本語の隠れ状態ベクトルの類似度を表す行列のリスト)
        bs, yws, xws = sim.shape # simの形状を取得(バッチサイズ、英語の単語数、日本語の単語数)
        sim2 = sim.reshape(bs*yws,xws) # simを2次元に変換(バッチサイズ＊英語の単語数、日本語の単語数)
        alpha = F.softmax(sim2,dim=1).reshape(bs, yws, xws) # sim2にソフトマックス関数を適用して正規化し、3次元に戻す(alphaは各時刻の英語の単語に対する日本語の単語の重みを表す行列のリスト)
        ct = torch.bmm(alpha,ox) # alphaとoxのバッチごとの行列積を計算(ctは各時刻の英語の単語に対するコンテキストベクトルのリスト)
        oy1 = torch.cat([ct,oy], dim=2) # ctとoyを連結(oy1は各時刻の英語の単語に対するコンテキストベクトルと隠れ状態ベクトルを結合したリスト)
        oy2 = self.Wc(oy1) # oy1に全結合層を適用して出力ベクトルに変換(oy2は各時刻の英語の単語に対する出力ベクトルのリスト)
        return self.W(oy2) # oy2に全結合層を適用して英語の単語IDをに変換(戻り値は各時刻の英語の単語IDに対する確率分布を表す)

# モデル生成、オプティマイザーとロス関数の設定

demb = 200 # 埋め込み次元数を200に設定
net = MyAttNMT(jv, ev, demb).to(device) # MyAttNMTのインスタンスを生成し、デバイスに転送

net.load_state_dict(torch.load("/content/drive/MyDrive/model/attnmt-19.model")) # 指定されたファイルからモデルのパラメーターを読み込む

# 翻訳

esid = ew2id["<s>"] # 英語の文頭記号<s>に対応するIDを取得
eeid = ew2id["</s>"] # 英語の文末記号</s>に対応するIDを取得
net.eval() # モデルを評価モードに設定
with torch.no_grad(): # 勾配計算を無効化
    with open("result.txt", "w", encoding="utf-8") as f: # ファイルを作成
        for i in range(len(jdata)): # 日本語のテストデータの各文に対してループ
            jinput = torch.LongTensor([jdata[i][1:]]).to(device) # 日本語の単語IDのリストをテンソルに変換し、デバイスに転送(文頭記号<s>は除く)
            x = net.jemb(jinput) # 日本語の単語IDのリストを埋め込みベクトルに変換
            ox, (hnx, cnx) = net.lstm1(x) # 埋め込みベクトルから隠れ状態ベクトルに変換(oxは各時刻の隠れ状態ベクトルのリスト、hnxとcnxは最終時刻の隠れ状態ベクトルとセル状態ベクトル)
            wid = esid # widに英語の文頭記号<s>に対応するIDを代入
            sl = 0 # 翻訳した英単語をカウントする変数を初期化
            while True: # 無限ループ
                wids = torch.LongTensor([[wid]]).to(device) # widをテンソルに変換し、デバイスに転送
                y = net.eemb(wids) # widを埋め込みベクトルに変換
                oy, (hnx, cnx) = net.lstm2(y,(hnx, cnx)) # 埋め込みベクトルから隠れ状態ベクトルに変換(oyは各時刻の隠れ状態ベクトルのリスト、hnyとcnyは最終時刻の隠れ状態ベクトルとセル状態ベクトル)
                ox1 = ox.permute(0,2,1) # oxの次元を入れ替える(バッチサイズ、埋め込み次元数、時刻数)
                sim = torch.bmm(oy,ox1) # oyとox1のバッチごとの行列積を計算(simは各時刻の隠れ状態ベクトルと日本語の隠れ状態ベクトルの類似度を表行列)
                bs, yws, xws = sim.shape # simの形状を取得(バッチサイズ、英語の単語数、日本語の単語数)
                sim2 = sim.reshape(bs*yws,xws) # simを2次元に変換(バッチサイズ＊英語の単語数、日本語の単語数)
                alpha = F.softmax(sim2,dim=1).reshape(bs, yws, xws) # sim2にソフトマックス関数を適用し正規化し3次元に戻す(alphaは各時刻の英語の単語に対する日本語の単語の重みを表す行列)
                ct = torch.bmm(alpha, ox) # alphaとoxのバッチごとの行列積を計算(ctは各時刻の英語の単語に対するコンテキストベクトルのリスト)
                oy1 = torch.cat([ct,oy],dim=2) # ctとoyを連結(oy1は各時刻の英語の単語に対するコンテキストベクトルと隠れ状態ベクトルを結合したリスト)
                oy2 = net.Wc(oy1) # oy1に全結合層を適用して出力ベクトルに変換(oy2は各時刻の英語の単語に対する出力ベクトルのリスト)
                oy3 = net.W(oy2) # oy2に全結合層を適用して英語の単語IDに変換(oy3は各時刻の英語の単語IDに対する確率分布を表す行列)
                wid = torch.argmax(oy3[0]).item() # oy3の最大値のインデックスを取得(widは予測された英語の単語ID)
                if (wid == eeid): # widが文末記号</s>に対応するIDならば
                    break # ループを抜ける
                f.write(eid2w[wid] + " ") # widに対応する英語の単語をファイルに書き込む
                sl += 1 # 翻訳した単語数をインクリメント
                if (sl == 30): # 翻訳した単語数が30に達したら
                    break # ループを抜ける
            f.write("\n") # 改行をファイルに書き込む


In [None]:
from nltk.translate.bleu_score import corpus_bleu


gold = []
with open('/content/drive/MyDrive/small_parallel_enja-master/test.en','r') as f:
    for sen in f:
        w = sen.strip().split()
        gold.append([ w ])

myans = []
with open("/content/result.txt",'r') as f:
    for sen in f:
        w = sen.strip().split()
        myans.append(w)


score = corpus_bleu(gold, myans)
print(100*score)


20.643023523751694
