https://github.com/bentrevett/pytorch-seq2seq/blob/master/3%20-%20Neural%20Machine%20Translation%20by%20Jointly%20Learning%20to%20Align%20and%20Translate.ipynb

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torchtext.legacy.datasets import Multi30k
from torchtext.legacy.data import Field, BucketIterator

import spacy
import numpy as np

import random
import math
import time

In [2]:
seed=1234

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic=True

In [3]:
spacy_de=spacy.load('de_core_news_sm')
spacy_en=spacy.load('en_core_web_sm')

In [4]:
def tokenize_de(text):
    return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

In [5]:
SRC=Field(tokenize=tokenize_de, init_token='', eos_token='',lower=True)
TRG=Field(tokenize=tokenize_en, init_token='',eos_token='',lower=True)

In [6]:
train_data,valid_data,test_data = Multi30k.splits(exts=('.de', '.en'), fields=(SRC,TRG))

In [7]:
SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)

In [8]:
device= torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [9]:
BATCH_SIZE=128

train_iterator, valid_iterator, test_iterator = BucketIterator.splits((train_data, valid_data, test_data), batch_size=BATCH_SIZE, device=device)

In [10]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embdim, enc_hid_dim, dec_hid_dim, dropout):
        super().__init__()
        
        self.embedding = nn.Embedding(input_dim,emb_dim)
        #양방향=True
        self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True)
         # 양방향 rnn의 출력값을 concat 한 후에 fc layer에 전달합니다.
        self.fc=nn.Linear(enc_hid_dim * 2, dec_hid_dim)
        self.dropout=nn.Dropout(dropout)
    def forward(self,src):
        #src=[src len, batch_size]
        embedded= self.dropout(self.embedding(src))
        #embedded = [src len, batch_size, emb dim]
        outputs, hidden = self.rnn(embedded)
        #outputs=[단어길이, 배치사이즈, 은닉차원 * num_directions]
        #hidden=[n_layers*num_direction(2), batch size, hid_dim]
        #hidden layer는 [forward1,backward1,forward2,backward2... 식으로 쌓임]
        #hidden[-2,:,:]--> 마지막 forward, hidden[-1,:,:]->마지막 backward
        hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))
        
        return outputs, hidden

In [11]:
class Attention(nn.Module):
    def __init__(self, enc_hid_dim, dec_hid_dim):
        super().__init__()
        
        self.attn = nn.Linear((enc_hid_dim * 2) +dec_hid_dim, dec_hid_dim)
        self.v = nn.Linear(dec_hid_dim,1, bias=False)
    
    def forward(self,hidden,encoder_outputs):
        
        batch_size=encoder_outputs.shape[1]
        src_len=encoder_outputs.shape[0]
        
        hidden=hidden.unsqueeze(1).repeat(1, src_len, 1)
        
        encoder_outputs=encoder_outputs.permute(1,0,2)
        
        energy=torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        
        attention=self.v(energy).squeeze(2)
        
        return F.softmax(attention, dim=1)

In [12]:
class Decoder(nn.Module):
    def __init__(self,output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention):
        super().__init__()
        
        self.output_dim= output_dim
        self.attention=attention
        self.embedding=nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU((enc_hid_dim*2) + emb_dim,dec_hid_dim)
        self.fc_out=nn.Linear((enc_hid_dim*2)+dec_hid_dim + emb_dim, output_dim)
        self.dropout=nn.Dropout(dropout)
        
    def forward(self, input, hidden, encoder_outputs):
        
        input= input.unsqueeze(0)
        
        embedded=self.dropout(self.embedding(input))
        
        a=self.attention(hidden, encoder_outputs)
        
        a=a.unsqueeze(1)
        
        encoder_outputs = encoder_outputs.permute(1,0,2)
        
        weighted=torch.bmm(a, encoder_outputs)
        
        weighted=weighted.permute(1,0,2)
        
        rnn_input=torch.cat((embedded, weighted), dim=2)
        
        output, hidden=self.rnn(rnn_input, hidden.unsqueeze(0))
        
        assert (output == hidden).all()
        
        embedded = embedded.squeeze(0)
        output=output.squeeze(0)
        weighted=weighted.squeeze(0)
        
        prediction=self.fc_out(torch.cat((output, weighted, embedded),dim=1))
        
        return prediction, hidden.squeeze(0)