In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import torch.autograd as autograd
import os
import numpy as np
import math

# Embedding Layer with Position Encoding

In [None]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        
    def forward(self, x):
        x = self.embedding(x)
        return x

In [4]:
class PositionEncoder(nn.Module):
    def __init__(self, d_model, max_seq_len=200, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.d_model = d_model
        
        pe = torch.zeros(max_seq_len, d_model)
        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                pe[pos, i] = math.sin(pos / (10000 ** ((2 * i) / d_model)))
                pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1)) / d_model)))
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x * math.sqrt(self.d_model)
        seq_len = x.size(1)
        
        pe = self.pe[:, :seq_len].detach()
        x = x + pe
        return self.dropout(x)

## Encoder

In [5]:
def Attention(query, key, value, mask=None, dropout=None):
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    
    if mask is not None:
        mask = mask.unsqueeze(1)
        scores = scores.masked_fill(mask == 0, -1e9)
    
    p_attn = F.softmax(scores, dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn

In [6]:
class MultiHeadedAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        super().__init__()
        assert d_model % h == 0
        
        self.d_model = d_model
        self.d_k = d_model // h
        self.h = h
        
        self.linears = nn.ModuleList([nn.Linear(d_model, d_model) for _ in range(4)])
        self.attn = None
        self.dropout = nn.Dropout(dropout)
        self.output = nn.Linear(d_model, d_model)
        
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            mask = mask.unsqueeze(1)
        
        nbatches = query.size(0)
        
        query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2) for l, x in zip(self.linears, (query, key, value))]
        
        x, self.attn = Attention(query, key, value, mask=mask, dropout=self.dropout)
        
        x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.d_model)
        return self.output(x)

## Residuals Connection and Normalization Layer

In [7]:
class Norm(nn.Module):
    def __init__(self, d_model, eps=1e-6):
        super().__init__()
        self.size = d_model
        self.eps = eps
        self.alpha = nn.Parameter(torch.ones(d_model))
        self.bias = nn.Parameter(torch.zeros(d_model))
    
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        x = (x - mean) / torch.sqrt(std + self.eps)
        x = self.alpha * x + self.bias
        return x

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)
    
    def forward(self, x):
        x = self.linear1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadedAttention(heads, d_model, dropout=dropout)
        self.norm1 = Norm(d_model)
        self.norm2 = Norm(d_model)
        self.ff = FeedForward(d_model, dropout=dropout)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        
    def forward(self, x, mask=None):
        x_norm = self.norm1(x)
        x = x + self.dropout1(self.self_attn(x_norm, x_norm, x_norm, mask=mask))
        x_norm = self.norm2(x)
        x = x + self.dropout2(self.ff(x_norm))
        return x

## Decoder

## Masked Multi Head Attention

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout=0.1):
        super().__init__()
        self.self_attn1 = MultiHeadedAttention(heads, d_model, dropout=dropout)
        self.self_attn2 = MultiHeadedAttention(heads, d_model, dropout=dropout)
        self.norm1 = Norm(d_model)
        self.norm2 = Norm(d_model)
        self.norm3 = Norm(d_model)
        self.ff = FeedForward(d_model, dropout=dropout)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        
    def forward(self, x, memory, src_mask=None, tgt_mask=None):
        x_norm = self.norm1(x)
        x = x + self.dropout1(self.self_attn1(x_norm, x_norm, x_norm, mask=tgt_mask))
        x_norm = self.norm2(x)
        x = x + self.dropout2(self.self_attn2(x_norm, memory, memory, mask=src_mask))
        x_norm = self.norm3(x)
        x = x + self.dropout3(self.ff(x_norm))
        return x

## Setting Encoder

In [None]:
import copy

def clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads, dropout=0.1):
        super().__init__()
        self.N = N
        self.embedding = Embedding(vocab_size, d_model)
        self.position = PositionEncoder(d_model, dropout=dropout)
        self.layers = clones(EncoderLayer(d_model, heads, dropout), N)
        self.norm = Norm(d_model)
    
    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.position(x)
        
        for layer in range(self.N):
            x = self.layers[layer](x, mask=mask)
        
        return self.norm(x)

## Setting Decoder

In [None]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads, dropout=0.1):
        super().__init__()
        self.N = N
        self.embedding = Embedding(vocab_size, d_model)
        self.position = PositionEncoder(d_model, dropout=dropout)
        self.layers = clones(DecoderLayer(d_model, heads, dropout), N)
        self.norm = Norm(d_model)
    
    def forward(self, x, memory, src_mask=None, tgt_mask=None):
        x = self.embedding(x)
        x = self.position(x)
        
        for layer in range(self.N):
            x = self.layers[layer](x, memory, src_mask=src_mask, tgt_mask=tgt_mask)
        
        return self.norm(x)

## Setting Tranformer

In [None]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, N, heads, dropout=0.1):
        super().__init__()
        self.encoder = Encoder(src_vocab_size, d_model, N, heads, dropout=dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, N, heads, dropout=dropout)
        self.out = nn.Linear(d_model, tgt_vocab_size)
        
    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        memory = self.encoder(src, mask=src_mask)
        output = self.decoder(tgt, memory, src_mask=src_mask, tgt_mask=tgt_mask)
        output = self.out(output)
        return output

In [None]:
from torchtext import data, datasets

class 