In [2]:
import torch
import torch.nn as nn

In [3]:
class Encoder(nn.Module):
  def __init__(self,input_dimension,embedding_dimension,hidden_dimension,layers,dropout,cell_type):
    #initialising the parameters
    super().__init__()
    self.hidden_dimension=hidden_dimension
    self.layers=layers
    self.embedding=nn.Embedding(input_dimension,embedding_dimension)
    self.cell_type=cell_type
    #selcting the cell type with number of layers in the encoder
    if self.cell_type=='LSTM':
      nn.rnn=nn.LSTM(embedding_dimension,hidden_dimension,layers,dropout=dropout)
    elif self.cell_type=='RNN':
      nn.rnn=nn.RNN(embedding_dimension,hidden_dimension,layers,dropout=dropout)
    else:
      nn.rnn=nn.GRU(embedding_dimension,hidden_dimension,layers,dropout=dropout)

      #adding the dropout
    self.dropout=nn.Dropout(dropout)

    def forward(self,source):
      embedded=self.dropout(self.embedding(source))
      outputs,(hidden,cell)=self.rnn(embedded)
      return hidden,cell

In [4]:
class Decoder(nn.Module):
  def __init__(self,output_dimension,embedding_dimension,hidden_dimension,layers,dropout,cell_type):
    super().__init__()
    self.output_dimension=output_dimension
    self.hidden_dimension=hidden_dimension
    self.layer=layers
    self.embedding=nn.Embedding(output_dimension,embedding_dimension)
    self.cell_type=cell_type
    if self.cell_type=='LSTM':
      nn.rnn=nn.LSTM(embedding_dimension,hidden_dimension,layers,dropout=dropout)
    elif self.cell_type=='RNN':
      nn.rnn=nn.RNN(embedding_dimension,hidden_dimension,layers,dropout=dropout)
    else:
      nn.rnn=nn.GRU(embedding_dimension,hidden_dimension,layers,dropout=dropout)
    
    self.fully_connected=nn.Linear(hidden_dimension,output_dimension)
    self.dropout=nn.Dropout(dropout)

    def forward(self,input,hidden_cell):
      input=input.unsqueeze(0)
      embedded=self.dropout(self.embedding(input))
      output,(hidden,cell)=self.rnn(embedded,(hidden,cell))
      prediction=self.fully_connected(output.squeeze(0))
      return predicton,hidden,cell

In [5]:
class seq_to_seq(nn.Module):
  def __init__(self,encoder,decoder,device):
    super().__init__()
    self.encoder=encoder
    self.decoder=decoder
    self.device=device

  def forward(self,source,target,teacher_forcing_ratio=0.5):
    batch_size=target.shape[1]
    target_length=target.shape[0]
    target_vocab_size=self.decoder.output_dimension
    output=torch.zeros(target_length,batch_size,target_vocab_size)
    hidden_cell=self.encoder(source)
    input = target[0,:]
    for t in range(1, target_length):
        output, hidden, cell = self.decoder(input, hidden, cell)
        outputs[t] = output
        teacher_force = random.random() < teacher_forcing_ratio
        top1 = output.argmax(1)
        input = trg[t] if teacher_force else top1   
    return outputs

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
train_data="/content/drive/MyDrive/Colab Notebooks/lexicons/hi.translit.sampled.train.tsv"

In [8]:
input_texts = []
target_texts = []
input_characters = set()
target_characters = set()
with open(train_data, "r", encoding="utf-8") as f:
    lines = f.read().split("\n")
for line in lines[: min(44204, len(lines) - 1)]:
    target_text, input_text,_ = line.split("\t")
    # We use "tab" as the "start sequence" character
    # for the targets, and "\n" as "end sequence" character.
    target_text = "\t" + target_text + "\n"
    input_texts.append(input_text)
    target_texts.append(target_text)
    for char in input_text:
        if char not in input_characters:
            input_characters.add(char)
    for char in target_text:
        if char not in target_characters:
            target_characters.add(char)

In [9]:
input_characters = sorted(list(input_characters))
target_characters = sorted(list(target_characters))
num_encoder_tokens = len(input_characters)
num_decoder_tokens = len(target_characters)
max_encoder_seq_length = max([len(txt) for txt in input_texts])
max_decoder_seq_length = max([len(txt) for txt in target_texts])

In [10]:
import numpy as np

In [11]:
input_token_index = dict([(char, i) for i, char in enumerate(input_characters)])
target_token_index = dict([(char, i) for i, char in enumerate(target_characters)])
encoder_input_data = np.zeros(
    (len(input_texts), max_encoder_seq_length, num_encoder_tokens+1), dtype="float32"
)
decoder_input_data = np.zeros(
    (len(input_texts), max_decoder_seq_length, num_decoder_tokens+1), dtype="float32"
)
decoder_target_data = np.zeros(
    (len(input_texts), max_decoder_seq_length, num_decoder_tokens+1), dtype="float32"
)

In [12]:
input_token_index[" "]=26
target_token_index[" "]=65

In [13]:
for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
    for t, char in enumerate(input_text):
        encoder_input_data[i, t, input_token_index[char]] = 1.0
    encoder_input_data[i, t + 1 :, input_token_index[" "]] = 1.0
    for t, char in enumerate(target_text):
        # decoder_target_data is ahead of decoder_input_data by one timestep
        decoder_input_data[i, t, target_token_index[char]] = 1.0
        if t > 0:
            # decoder_target_data will be ahead by one timestep
            # and will not include the start character.
            decoder_target_data[i, t - 1, target_token_index[char]] = 1.0
    decoder_input_data[i, t + 1 :, target_token_index[" "]] = 1.0
    decoder_target_data[i, t:, target_token_index[" "]] = 1.0

In [14]:
def weight_intialisation(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)

In [15]:
INPUT_DIM = len(input_token_index)
OUTPUT_DIM = len(target_token_index)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
cell_type='LSTM'

enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT,cell_type)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT,cell_type)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = seq_to_seq(enc, dec, device).to(device)

In [16]:
model.apply(weight_intialisation)

seq_to_seq(
  (encoder): Encoder(
    (embedding): Embedding(27, 256)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(66, 256)
    (fully_connected): Linear(in_features=512, out_features=66, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

In [17]:
import torch.optim as optim
optimizer = optim.Adam(model.parameters())

In [19]:
criterion = nn.CrossEntropyLoss()

In [20]:
def train(model, iterator, optimizer, criterion, clip):
    
    model.train()
    
    epoch_loss = 0
    
    for i, batch in enumerate(iterator):
        
        src = batch.src
        trg = batch.trg
        
        optimizer.zero_grad()
        
        output = model(src, trg)
        
        #trg = [trg len, batch size]
        #output = [trg len, batch size, output dim]
        
        output_dim = output.shape[-1]
        
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        
        #trg = [(trg len - 1) * batch size]
        #output = [(trg len - 1) * batch size, output dim]
        
        loss = criterion(output, trg)
        
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        
        optimizer.step()
        
        epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

In [21]:
def evaluate(model, iterator, criterion):
    
    model.eval()
    
    epoch_loss = 0
    
    with torch.no_grad():
    
        for i, batch in enumerate(iterator):

            src = batch.src
            trg = batch.trg

            output = model(src, trg, 0) #turn off teacher forcing

            #trg = [trg len, batch size]
            #output = [trg len, batch size, output dim]

            output_dim = output.shape[-1]
            
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)

            #trg = [(trg len - 1) * batch size]
            #output = [(trg len - 1) * batch size, output dim]

            loss = criterion(output, trg)
            
            epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)