In [None]:
import os
import re
import sys
import cv2
import tqdm
import time
import spacy 
import math
import random
import scipy.io
import itertools
import numpy as np
from math import ceil
import pandas as pd
from itertools import chain
import matplotlib.pyplot as plt
from skimage.io import imread
from scipy.ndimage.filters import gaussian_filter
from sklearn.model_selection import train_test_split

In [None]:
from torch.nn.utils.rnn import pad_sequence 
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

<img src="https://i.stack.imgur.com/eAKQu.png">

In [None]:
class PositionalEmbedding(nn.Module):

    def __init__(self, d_model, dropout=0, max_len=1000):
        super(PositionalEmbedding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [None]:
class MultiHeadAttention(nn.Module):
    
    def __init__(self, emd_size = 256, num_heads = 8, dim_key = 64, dim_value = 64):
        super(MultiHeadAttention, self).__init__()
        self.query_linear = nn.Linear(in_features=emd_size, out_features=num_heads*dim_key)
        self.key_linear = nn.Linear(in_features=emd_size, out_features=num_heads*dim_key)
        self.value_linear = nn.Linear(in_features=emd_size, out_features=num_heads*dim_value)
        self.resize_linear = nn.Linear(in_features=num_heads*dim_value, out_features=emd_size)
        self.softmax = nn.Softmax(dim=2)
        self.norm = nn.LayerNorm(num_heads*dim_key)
        self.drop = nn.Dropout(p=0.1)
        
    def forward(self, query, key, value):
        embedded_query = self.query_linear(query)
        embedded_key = self.key_linear(key)
        embedded_value = self.value_linear(value)
        dot_prod = torch.matmul(embedded_query, embedded_key.transpose(dim0 = 1, dim1 = 2))
        dot_prod = self.softmax(dot_prod / np.sqrt(dot_prod.shape[1]))
        filtered_value = torch.matmul(dot_prod, embedded_value)
        mha_out = self.drop(self.resize_linear(self.norm(filtered_value)))
        return mha_out

In [None]:
class Pos_FFN(nn.Module):
    
    def __init__(self, emd_size = 256, dim_inner = 1024):
        super(Pos_FFN, self).__init__()
        self.encode_linear = nn.Linear(in_features=emd_size, out_features=dim_inner)
        self.decode_linear = nn.Linear(in_features=dim_inner, out_features=emd_size)
        self.norm = nn.LayerNorm(emd_size)
        self.drop = nn.Dropout(p=0.1)
        
    def forward(self, in_features):
        in_features = self.encode_linear(in_features)
        in_features = self.decode_linear(in_features)
        out_features = self.drop(self.norm(in_features))
        return out_features

In [None]:
class Transformer_Embedding(nn.Module):
    
    def __init__(self, vocab_size, embed_size):
        super(Transformer_Embedding, self).__init__()     
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.pos_embed = PositionalEmbedding(embed_size, dropout=0)
        self.mha = MultiHeadAttention()
        self.pos_ffn = Pos_FFN()
    def forward(self, sentence):
        embeddings = self.pos_embed(self.embed(sentence))
        return embeddings

In [None]:
class Transformer_Encoder(nn.Module):
    
    def __init__(self, emd_size = 256):
        super(Transformer_Encoder, self).__init__()
        self.mha = MultiHeadAttention()
        self.norm0 = nn.LayerNorm(emd_size)
        self.norm1 = nn.LayerNorm(emd_size)
        self.pos_ffn = Pos_FFN()
    def forward(self, embeddings):
        multi_head_attention_out = self.mha(embeddings, embeddings, embeddings)
        multi_head_attention_out = self.norm0(multi_head_attention_out)
        pos_ffn_out = self.pos_ffn(multi_head_attention_out)
        pos_ffn_out = self.norm1(pos_ffn_out + multi_head_attention_out)
        return pos_ffn_out

In [None]:
class Transformer_Decoder(nn.Module):
    
    def __init__(self, embed_size = 256, num_heads = 8, num_decoder = 3, dim_inner = 1024):
        super(Transformer_Decoder, self).__init__()
        self.decoder_layer = nn.TransformerDecoderLayer(d_model=embed_size, dim_feedforward = dim_inner, nhead=num_heads, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_decoder)
    def forward(self, output_embedded, encoder_out, tgt_mask):
        decoder_out = self.transformer_decoder(tgt = output_embedded, memory = encoder_out, tgt_mask = tgt_mask)
        return decoder_out
    
    def decoder_only(self, ys, memory, tgt_mask):
        out = self.transformer_decoder(ys, memory, tgt_mask)
        return out

In [None]:
class Transformer(nn.Module):
    
    def __init__(self, in_vocab_size, out_vocab_size, embed_size):
        super(Transformer, self).__init__()
        self.input_embedding = Transformer_Embedding(in_vocab_size, embed_size)
        self.output_embedding = Transformer_Embedding(out_vocab_size, embed_size)
        self.encoder0 = Transformer_Encoder()
        self.encoder1 = Transformer_Encoder()
        self.encoder2 = Transformer_Encoder()

        self.decoder = Transformer_Decoder()

        self.linear_output_mapping = nn.Linear(in_features=embed_size, out_features=out_vocab_size)
        
    def forward(self, in_sentences, out_sentences, tgt_mask):
        in_embedded = self.input_embedding(in_sentences)
        out_embedded = self.output_embedding(out_sentences)
        
        encoder_out = self.encoder0(in_embedded)
        encoder_out = self.encoder1(encoder_out)
        encoder_out = self.encoder2(encoder_out)

        decoder_out = self.decoder(out_embedded, encoder_out, tgt_mask)

        sentence_out = self.linear_output_mapping(decoder_out)
        sentence_out = torch.transpose(sentence_out, 1, 2)
        
        return sentence_out
    
    def encoder_out(self, in_sentences):
        
        in_embedded = self.input_embedding(in_sentences)

        encoder_out = self.encoder0(in_embedded)
        encoder_out = self.encoder1(encoder_out)
        encoder_out = self.encoder2(encoder_out)
        
        return encoder_out
    
    def decoder_translate(self, ys, memory, tgt_mask):
        ys = self.output_embedding(ys)
        decoder_out = self.decoder.decoder_only(ys, memory, tgt_mask)
        sentence_out = self.linear_output_mapping(decoder_out)
        return sentence_out

Testing

In [None]:
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device='cpu')) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

def translate(model, src):
    model.eval()
    start_symbol = 1
    num_tokens = src.shape[0]

    src = src.to(device)
    memory = model.encoder_out(src)
    
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(device)
    
    max_len=num_tokens + 7
    for i in range(max_len-1):
        memory = memory.to(device)
        tgt_mask = (generate_square_subsequent_mask(ys.size(1)).type(torch.bool)).to(device)
        
        sentence_out = model.decoder_translate(ys, memory, tgt_mask)
        next_word = torch.argmax(sentence_out, dim = 2)[0][-1]
        ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=1)
        if next_word == 2:
            break
            
    return ys