In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import matplotlib.pyplot as plt

dtype = torch.FloatTensor

In [2]:
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps

In [3]:
sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']

In [4]:
# Transformer Parameters
# Padding Should be Zero
src_vocab = {'P' : 0, 'ich' : 1, 'mochte' : 2, 'ein' : 3, 'bier' : 4}
src_vocab_size = len(src_vocab)

In [5]:
tgt_vocab = {'P' : 0, 'i' : 1, 'want' : 2, 'a' : 3, 'beer' : 4, 'S' : 5, 'E' : 6}
number_dict = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)


In [6]:
src_len = 5
tgt_len = 5

d_model = 512  # Embedding Size
d_ff = 2048 # FeedForward dimension
d_k = d_v = 64  # dimension of K(=Q), V
n_layers = 6  # number of Encoder of Decoder Layer
n_heads = 8  # number of heads in Multi-Head Attention


In [7]:
def make_batch(sentences):
    input_batch = [[src_vocab[n] for n in sentences[0].split()]]
    output_batch = [[tgt_vocab[n] for n in sentences[1].split()]]
    target_batch = [[tgt_vocab[n] for n in sentences[2].split()]]
    return Variable(torch.LongTensor(input_batch)), Variable(torch.LongTensor(output_batch)), Variable(torch.LongTensor(target_batch))

In [8]:
def get_sinusoid_encoding_table(n_position, d_model):
    def cal_angle(position, hid_idx):
        return position / np.power(10000, 2 * (hid_idx // 2) / d_model)
    def get_posi_angle_vec(position):
        return [cal_angle(position, hid_j) for hid_j in range(d_model)]

    sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(n_position)])
    sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])  # dim 2i
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  # dim 2i+1
    return torch.FloatTensor(sinusoid_table)

In [9]:
def get_attn_pad_mask(seq_q, seq_k):
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    # eq(zero) is PAD token
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  # batch_size x 1 x len_k(=len_q), one is masking
    return pad_attn_mask.expand(batch_size, len_q, len_k)  # batch_size x len_q x len_k

In [10]:
def get_attn_subsequent_mask(seq):
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    subsequent_mask = np.triu(np.ones(attn_shape), k=1)
    subsequent_mask = torch.from_numpy(subsequent_mask).byte()
    return subsequent_mask

In [11]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()
        
    def forward(self,Q,K,V,attn_mask):
        scores = torch.matmul(Q,K.transpose(-1,-2)) / np.sqrt(d_k) 
    # Q : [batch_size, n_heads, len_q, d_k]   
    # k.transpose(-1,-2) : [batch_size, n_heads, d_k, len_q]    
    # scores : [batch_size X n_heads X len_q(=len_k) X len_k(=len_q)]
        scores.masked_fill._(attn_mask,-1e9)
        attn = nn.Softmax(dim=-1)(scores) ## 행기준으로 softmax
        context = torch.matmul(attn, V)
        return context, attn
    

In [12]:
class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.W_Q = nn.Linear(d_model, d_k * n_heads)
        self.W_K = nn.Linear(d_model, d_k * n_heads)
        self.W_V = nn.Linear(d_model, d_v * n_heads)
        
    def forward(self,Q,K,V, attn_mask):
        # q : [batch_size, len_q(=len_k), d_model(=64)]
        # k : [batch_size, len_k(=len_q), d_model(=64)]
        # v : [batch_size, len_v, d_model(=64)]
        # Q : [batch_size, seq_len, d_model]
        residual = Q
        batch_size = Q.size(0)
        # self.W_Q(Q) : [barch_size, seq_len, d_k*n_heads]
        q_s = self.W_Q(Q).view(batch_size,-1,n_heads,d_k).transpose(1,2) # q_s : [batch_size, n_heads, len_q, d_k]
        k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # k_s : [batch_size, n_heads, len_q, d_k]
        v_s = self.W_V(V).view(batch_size, -1,n_heads, d_v).transpose(1,2) # v_s : [barch_size , n_heads, len_v, d_v]
        
        attn_mask = attn_mask.unsqueeze(1).repeat(1,n_heads,1,1) # attn_mask : [batch_size, n_heads, len_q, len_k]
        
        # context : [batch_size, n_heads, len_q, d_v]
        context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
        context = context.transpose(1,2).contiguous().view(batch_size, -1, n_heads * d_v)
        
        # context : [batch_size, len_q, n_heads*d_v]
        output = nn.Linear(n_heads*d_v,d_model)(context)
        return nn.LayerNorm(d_model)(output + residual), attn # output: [batch_size, len_q, d_model]

In [13]:
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self):
        super(PositionWiseFeedForwardNet, self).__init__()
        self.conv1 = nn.Conv1d(in_channels = d_model, out_channels = d_ff, kernel_size = 1)
        self.conv2 = nn.Conv1d(in_channels = d_ff, out_channels = d_model, kernel_size = 1)
        
    def forward(self, inputs):
        residual = inputs # inputs : [batch_size, len_q, d_model]
        output = nn.ReLU()(self.conv1(inputs.transpose(1,2))) # output : [batch_size, d_ff, len_q]
        output = self.conv2(output).transpose(1,2) # output : [batch_size, len_q, d_model]
        return nn.LayerNorm(d_model)(output + residual) # residual : [batch_size, len_q, d_model]


In [14]:
class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        self.enc_self_attn = MultiheadAttention()
        self.pos_ffn = PoswiseFeedForwardNet()
        
    def forward(self, enc_inputs, enc_self_attn_mask):
        enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask) # enc_inputs to same Q K V
        return enc_outputs, attn


In [None]:
class DecoderLayer(nn.Module):
    def __init__(self):
        super(DecoderLayer, self).__init__()
        self.dec_self_attn = MultiHeadAttention()
        self.dec_enc_attn = MultiHeadAttention()
        self.pos_ffn = PoswiseFeedForwardNet()
        
    def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
        dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)
        dec_putputs, dec_enc_attn = self.dec_enc_attn(dec_ouputs, enc)outputs, enc_outputs, dec_enc_attn_mask)
        dec_outputs = self.pos_ffn(dec_outputs)
        return dec_outputs, dec_self_attn, dec_enc_attn
        

In [None]:
def get_sinusoid_encoding_tanle(n_position, d_model):
    def cal_angle(position, hid_idx):
        return position / np.power