<a href="https://colab.research.google.com/github/AzeemWaqarRao/Pytorch_Implementations/blob/main/seq2seq_implementation_pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import re
import random

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

import numpy as np
from torch.utils.data import TensorDataset, DataLoader, RandomSampler

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# helper functions

def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

# Lowercase, trim, and remove non-letter characters
def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z!?]+", r" ", s)
    return s.strip()


# reading data from file
def filter_pairs(pairs, MAXLENGTH):
  new_pairs = []
  for pair in pairs:
    if len(pair[0].split(' ')) <=MAXLENGTH and len(pair[1].split(' ')) <=MAXLENGTH:
      new_pairs.append(pair)
  return new_pairs


def read_data(path,lang1,lang2,MAXLENGTH):
  with open(path, 'r') as f:
    lines = f.read().strip().split('\n')
    lines = [[normalizeString(sent.lower()) for sent in line.split('\t')] for line in lines]

  input_lang = Lang(lang1)
  output_lang = Lang(lang2)

  for line in lines:
    input_lang.addSentence(line[0])
    output_lang.addSentence(line[1])


  lines = filter_pairs(lines,MAXLENGTH)

  return input_lang, output_lang, lines


def padd_seq(seq,length,place=0):
  if len(seq) < length:
    for i in range(length-len(seq)):
      seq.append(0)
  return seq

def sent_to_index(lines,input_lang,output_lang,MAXLENGTH):
  inputs = []
  targets = []
  for line in lines:
    inp = padd_seq(input_lang.sent_to_index(line[0]),MAXLENGTH+1)
    targ = padd_seq(output_lang.sent_to_index(line[1]),MAXLENGTH+1)
    inputs.append(inp)
    targets.append(targ)

  return inputs, targets


# gives us a data loader with inputs and targets

def get_dataloader(inputs,outputs,batch_size,device):
  train_data = TensorDataset(torch.tensor(inputs,dtype=torch.long, device=device),
                               torch.tensor(targets,dtype=torch.long,device=device))
  train_data = DataLoader(train_data,batch_size=batch_size)
  return train_data

In [None]:
SOS_token = 0
EOS_token = 1

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {'SOS':0 , 'EOS':1}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS"}
        self.n_words = 2  # Count SOS and EOS

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

    def sent_to_index(self,sent):
      sent =  [self.word2index[word] for word in sent.split(' ')]
      sent.append(EOS_token)
      return sent

In [None]:
# path = '/content/eng-fra.txt'
# input_lang, output_lang, lines = read_data(path,'English', 'French',10)
# print(f"number of unique {input_lang.name} words : {input_lang.n_words}")
# print(f"Number of unique {output_lang.name} words : {output_lang.n_words}")
# print(f"Dimension of Sentence Pairs {len(lines), len(lines[0])}")

In [None]:
# lines

In [None]:
# Encoder

class Encoder(nn.Module):
  def __init__(self, num_words, vec_size, MAXLENGTH, dp=0.2):
    super(Encoder, self).__init__()
    self.num_words = num_words
    self.vec_size = vec_size
    self.maxlength = MAXLENGTH

    self.embedding = nn.Embedding(self.num_words, self.vec_size)
    self.gru = nn.GRU(self.vec_size, self.vec_size,batch_first=True)
    self.dropout = nn.Dropout(dp)

  def forward(self,x):
    outputs = []
    hidden = torch.zeros(1,x.shape[0], self.vec_size, dtype=torch.float).to(device)

    for i in range(self.maxlength+1):
      input = x[:,i].unsqueeze(1)
      output, hidden = self.forward_step(input, hidden)
      outputs.append(output)

    outputs = torch.cat(outputs, dim=1)
    return outputs, hidden




  def forward_step(self,x, hidden):
    # x -> (*) tensor size

    embeddings = self.dropout(self.embedding(x))
    # embeddings -> (* , H) H = hidden_size

    output, hidden = self.gru(embeddings, hidden)

    # output -> (batch, seq_lenght, H_inp)
    # hidden -> (1, batch, H_out)

    return output, hidden

In [None]:
class Decoder(nn.Module):
  def __init__(self,num_words,vec_size, MAXLENGTH):
    super(Decoder, self).__init__()

    self.num_words = num_words
    self.vec_size = vec_size
    self.max_length = MAXLENGTH

    self.embedding = nn.Embedding(self.num_words,self.vec_size)
    self.gru = nn.GRU(self.vec_size, self.vec_size,batch_first=True)
    self.linear = nn.Linear(self.vec_size, self.num_words)

  def forward(self,encoder_output, encoder_hidden):
    batch_size = encoder_output.size(0)
    decoder_input = torch.zeros(batch_size,1,dtype=torch.long,device=device)
    hidden = encoder_hidden

    outputs = []

    for i in range(self.max_length+1):
      output, hidden = self.forward_step(decoder_input, hidden)
      outputs.append(output)

      _, topi = output.topk(1)
      decoder_input = topi.squeeze(-1).detach()



    outputs = torch.cat(outputs,dim=1)
    outputs = F.log_softmax(outputs,dim=-1)

    return outputs, hidden

  def forward_step(self, input, hidden_state):

    input = self.embedding(input)
    input = F.relu(input)
    output, hidden_state = self.gru(input, hidden_state)
    output = self.linear(output)
    return output, hidden_state





In [None]:
class Attention(nn.Module):
  def __init__(self,hidden_size):
    super(Attention, self).__init__()
    self.linear_q = nn.Linear(hidden_size, hidden_size)
    self.linear_k = nn.Linear(hidden_size, hidden_size)
    self.linear = nn.Linear(hidden_size,1)

  def forward(self, query, keys):
    val = self.linear(torch.tanh(self.linear_q(query) + self.linear_k(keys)))
    weights = val.permute(0,2,1)
    weights = F.log_softmax(weights,-1)

    context = torch.bmm(weights, keys)

    return context, weights



class AttentionDecoder(nn.Module):
  def __init__(self, output_size, hidden_size, MAXLENGTH, dp=0.1):
    super(AttentionDecoder,self).__init__()
    self.attention = Attention(hidden_size)
    self.embedding = nn.Embedding(output_size, hidden_size)
    self.dropout = nn.Dropout(dp)
    self.gru = nn.GRU(2*hidden_size, hidden_size, batch_first = True)
    self.linear = nn.Linear(hidden_size, output_size)
    self.maxlength = MAXLENGTH

  def forward(self, encoder_output, encoder_hidden, target=None):
    batch_size = encoder_output.shape[0]
    inp = torch.zeros(batch_size, 1, dtype = torch.long, device = device)
    hidden = encoder_hidden
    outputs = []
    attentions = []
    for i in range(self.maxlength + 1):
      output, hidden, weights = self.forward_step(inp, hidden, encoder_output)
      outputs.append(output)
      attentions.append(weights)

      if target is not None:
        inp = target[:,i].unsqueeze(1)

      else:
        _, topi = output.topk(1)
        inp = topi.squeeze(-1).detach()


    outputs = torch.cat(outputs,dim=1)
    outputs = F.log_softmax(outputs,dim=-1)
    attentions = torch.cat(attentions,dim=1)

    return outputs, hidden, attentions


  def forward_step(self, inp, hidden, encoder_output):
    inp = self.dropout(self.embedding(inp))

    context, weights = self.attention(hidden.permute(1,0,2), encoder_output)

    input = torch.cat([inp,context], dim=2)

    output, hidden = self.gru(input,hidden)
    output = self.linear(output)
    return output, hidden, weights


In [None]:
hidden_size = 128
lr = 0.001
epochs = 30
batch_size = 32
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = 'cpu'
path = '/content/eng-fra.txt'
MAXLENGTH = 20



input_lang, output_lang, lines = read_data(path,'English', 'French',MAXLENGTH)

encoder = Encoder(input_lang.n_words,hidden_size, MAXLENGTH).to(device)
decoder = AttentionDecoder(output_lang.n_words, hidden_size, MAXLENGTH).to(device)

encoder_optimizer = optim.Adam(encoder.parameters(), lr)
decoder_optimizer = optim.Adam(decoder.parameters(), lr)


inputs, targets = sent_to_index(lines,input_lang, output_lang,MAXLENGTH)
data_loader = get_dataloader(inputs,targets,batch_size, device)
# _,_,data_loader = get_dataloader(batch_size)

num_batches = len(data_loader)

loss_fn = nn.NLLLoss()

In [None]:
for epoch in range(1):
  print(f"Epoch {epoch+1} starting")
  total_loss = 0
  for batch in data_loader:
    input, target = batch

    output, hidden = encoder(input)
    output, _, _ = decoder(output, hidden, target)

    loss = loss_fn(
            output.view(-1, output.size(-1)),
            target.view(-1)
        )
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()

    total_loss += loss.item()

  print(f"Loss : {total_loss/num_batches}")
  print(f"Epoch {epoch+1} ended")




Epoch 1 starting
Loss : 2.2477341580064327
Epoch 1 ended


In [None]:
def evaluateRandomly(encoder, decoder, n=10):
    for i in range(n):
        pair = random.choice(lines)
        print('>', pair[0])
        print('=', pair[1])
        output_words = evaluate(encoder, decoder, pair[0], input_lang, output_lang)
        output_sentence = ' '.join(output_words)
        print('<', output_sentence)
        print('')


In [None]:
def evaluate(encoder, decoder, sentence, input_lang, output_lang):
    with torch.no_grad():
        input_tensor = input_lang.sent_to_index(sentence)
        input_tensor = padd_seq(input_tensor,11)

        input_tensor = torch.LongTensor(input_tensor).view(1,-1).to(device)

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, decoder_hidden, _ = decoder(encoder_outputs, encoder_hidden)

        _, topi = decoder_outputs.topk(1)
        decoded_ids = topi.squeeze()

        decoded_words = []
        for idx in decoded_ids:
            if idx.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            decoded_words.append(output_lang.index2word[idx.item()])
    return decoded_words

In [None]:
encoder.eval()
decoder.eval()
evaluateRandomly(encoder, decoder)

In [None]:
torch.zeros(1,32, 128, dtype=torch.long).shape

torch.Size([1, 32, 128])