<a href="https://colab.research.google.com/github/cmari038/Language-Translator/blob/main/RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torch torchvision torchaudio
!pip install spacy
#!pip install collections

In [None]:
!python -m spacy download en_core_web_sm
!python -m spacy download es_core_news_sm

In [None]:
import torch
import torchtext; torchtext.disable_torchtext_deprecation_warning()
import pandas as pd
import numpy as np
import spacy
import torch.nn as nn
import torch.nn.functional as functional
from torchtext.data.utils import get_tokenizer
from torch.utils.data import random_split
from torchtext.vocab import vocab
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter, OrderedDict

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Cuda activated")
else:
    device = torch.device("cpu")

dataset = 'https://raw.githubusercontent.com/cmari038/Language-Translator/main/data.csv'
data = pd.read_csv(dataset)

#spacy.load('en_core_web_sm')
#spacy.load('es_core_news_sm')

# processing data

english_tokenizer = get_tokenizer('spacy', language = 'en_core_web_sm')
spanish_tokenizer = get_tokenizer('spacy', language= 'es_core_news_sm')

#print(data)

train = data.sample(frac=0.7)
validate = data.drop(train.index).sample(frac=0.1)
test = data.drop(validate.index)

counter1 = Counter()
counter2 = Counter()

for sentence in train['english']:
  counter1.update(english_tokenizer(sentence))

for sentence in train['spanish']:
  counter2.update(spanish_tokenizer(sentence))

en_dict = OrderedDict(counter1.most_common())
es_dict = OrderedDict(counter2.most_common())

vocab1 = vocab(en_dict, specials = ['<unk>', '<pad>', '<sos>', '<eos>'])
vocab2 = vocab(es_dict, specials = ['<unk>', '<pad>', '<sos>', '<eos>'])

vocab1.set_default_index(vocab1['<unk>'])
vocab2.set_default_index(vocab2['<unk>'])

Cuda activated


In [None]:
def getTokens(df, en_tokenizer, es_tokenizer, lang):
  if lang == "english":
    en_tokens = []

    for token in en_tokenizer(df):
      en_tokens.append(token)

    en_tokens = ['<sos>'] + en_tokens + ['<eos>']

    return en_tokens

  else:
    es_tokens = []

    for token in es_tokenizer(df):
        es_tokens.append(token)

    es_tokens = ['sos'] + es_tokens + ['<eos>']

    return es_tokens


#token_dict = {"en_tokenizer": english_tokenizer, "es_tokenizer": spanish_tokenizer}
#train = train.apply(map(lambda col: col.map(getTokens)))
en_tokens = []
es_tokens = []

for element in train['english']:
  tmp1 = getTokens(element, english_tokenizer, spanish_tokenizer, "english")
  en_tokens.append(tmp1)

for element in train["spanish"]:
  tmp2 = getTokens(element, english_tokenizer, spanish_tokenizer, "spanish")
  es_tokens.append(tmp2)

train['en_tokens'] = en_tokens
train['es_tokens'] = es_tokens

#print(train['en_tokens'])

In [None]:
def getIndices(df, en_vocab, es_vocab, lang):
  if lang == "english":
    en_indices = []

    for word in df:
      en_indices.append(en_vocab[word])

    return en_indices

  else:
    es_indices = []

    for word in df:
        es_indices.append(es_vocab[word])

    return es_indices

en_indices = []
es_indices = []

for element in train["en_tokens"]:
  tmp1 = getIndices(element, vocab1, vocab2, "english")
  en_indices.append(tmp1)

for element in train["es_tokens"]:
  tmp1 = getIndices(element, vocab1, vocab2, "spanish")
  es_indices.append(tmp1)

train['es_indices'] = es_indices
train['en_indices']= en_indices

#en_tensor = []
#es_tensor = []

"""
print(train['en_indices'])

for series in train['en_indices']:
  en_tensor.append(torch.tensor(series, dtype=torch.long))

for series in train['es_indices']:
  es_tensor.append(torch.tensor(series, dtype=torch.long))

train['en_indices'] = en_tensor
train['es_indices'] = es_tensor
"""

"\nprint(train['en_indices'])\n\nfor series in train['en_indices']:\n  en_tensor.append(torch.tensor(series, dtype=torch.long))\n\nfor series in train['es_indices']:\n  es_tensor.append(torch.tensor(series, dtype=torch.long))\n\ntrain['en_indices'] = en_tensor\ntrain['es_indices'] = es_tensor\n"

In [None]:
#print(train['es_indices'])

In [None]:
"""
class TensorSet(Dataset):
  def __init__(self, data, en_tokenizer, es_tokenizer, en_vocab, es_vocab):
    self.data = data
    self.en_tokenizer = en_tokenizer
    self.es_tokenizer = es_tokenizer
    self.en_vocab = en_vocab
    self.es_vocab = es_vocab

  def __len__(self):
    return len(self.data)

  def __getitem__(self, index):
    english = self.data.iloc[index]['english']
    spanish = self.data.iloc[index]['spanish']
    en_indices = []
    es_indices = []

    en_tokens = self.en_tokenizer(english)
    es_tokens = self.es_tokenizer(spanish)

    for token in en_tokens:
      en_indices.append(self.en_vocab[token])

    for token in es_tokens:
      es_indices.append(self.es_vocab[token])

    en_tensor = torch.tensor([self.en_vocab['<sos>']] + en_indices + [self.en_vocab['<eos>']], dtype=torch.long)
    es_tensor = torch.tensor([self.en_vocab['<sos>']] + es_indices + [self.en_vocab['<eos>']], dtype=torch.long)

    return en_tensor, es_tensor
  """

class TensorSet(Dataset):
  def __init__(self, df):
    self.df = df

  def __len__(self):
    return len(self.df)

  def __getitem__(self, index):
    return torch.tensor(self.df['en_indices'].iloc[index], dtype=torch.long), torch.tensor(self.df['es_indices'].iloc[index], dtype=torch.long)

def collate_fn(batch):
  # used for making sure sequences are similar lengths by adding tokens to pad out the length
  en_batch = []
  es_batch = []
  for en_sample, es_sample in batch:
    en_batch.append(en_sample)
    es_batch.append(es_sample)

  en_batch = pad_sequence(en_batch, padding_value=vocab1['<pad>'])
  es_batch = pad_sequence(es_batch, padding_value=vocab2['<pad>'])

  return en_batch, es_batch

#tensorSet = TensorSet(train, english_tokenizer, spanish_tokenizer, vocab1, vocab2)
tensorSet = TensorSet(train)
batch_size = 128
dataLoad = DataLoader(tensorSet, shuffle=True, collate_fn=collate_fn, batch_size=batch_size)



In [None]:
class RNN_Encoder(nn.Module):
    def __init__(self, input, embedding_dimension, hidden_dimension, layers, dropout_p=0.5):
        super(RNN_Encoder, self).__init__()
        self.hidden_dimension = hidden_dimension
        self.layers = layers
        self.embed = nn.Embedding(input, embedding_dimension)
        self.gru = nn.LSTM(embedding_dimension, hidden_dimension, layers, dropout=dropout_p)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, english):
        embedded = self.dropout(self.embed(english))
        output, (hidden, cell) = self.gru(embedded)
        return hidden, cell

class RNN_Decoder(nn.Module):
    def __init__(self, output, embedding_dimension, hidden_dimension, layers, dropout_p=0.5):
      super(RNN_Decoder, self).__init__()
      self.output = output
      self.hidden_dimension = hidden_dimension
      self.layers = layers
      self.embed = nn.Embedding(output, hidden_dimension)
      self.gru = nn.LSTM(embedding_dimension, hidden_dimension, layers, dropout=dropout_p)
      self.fc_out = nn.Linear(hidden_dimension, output)
      self.dropout = nn.Dropout(dropout_p)

    def forward(self, input, hidden, cell):
      input = input.unsqueeze(0)
      embedded = self.dropout(self.embed(input))
      output, (hidden, cell) = self.gru(embedded, (hidden, cell))
      prediction = self.fc_out(output.squeeze(0))
      #output = self.embed(input)
      #output = functional.relu(output)
      #output, hidden = self.gru(output, hidden)
      return prediction, hidden, cell

class Sequence(nn.Module):
    def __init__(self, encoder, decoder, device):
      super(Sequence, self).__init__()
      self.encoder = encoder
      self.decoder = decoder
      self.device = device

    def forward(self, english, spanish, teacher_forcing_ratio):
      """
      batch_length = english.size(0)
      highest_es_length = spanish.size(1)
      en_vocab_length = self.decoder.output.out_features
      outputs = torch.zeros(batch_length, highest_es_length, en_vocab_length).to(self.device)
      encoder_outputs, hidden = self.encoder(english, english_length)
      decoder_input = torch.tensor([vocab2['<sos>']] * batch_length).to(self.device)

      for i in range(highest_es_length):
        output, hidden = self.decoder(decoder_input, hidden)
        outputs[:, i, :] = output
        teacher_force = np.random.random() < teacher_forcing_ratio
        top1 = output.argmax(1)
        decoder_input = spanish[:, i] if teacher_force else top1
      """

      batch_size = spanish.shape[1]
      es_length = spanish.shape[0]
      es_vocab_size = self.decoder.output
      outputs = torch.zeros(es_length, batch_size,es_vocab_size).to(self.device)
      hidden, cell = self.encoder(english)
      input = spanish[0,:]

      for i in range(1, es_length):
        output, hidden, cell = self.decoder(input, hidden, cell)
        outputs[i] = output
        teacher_force = np.random.random() < teacher_forcing_ratio
        top1 = output.argmax(1)
        decoder_input = spanish[i] if teacher_force else top1

      return outputs

encoder = RNN_Encoder(len(vocab1), 256, 100, 2)
decoder = RNN_Decoder(len(vocab2), 256, 100, 2)
RNN_model = Sequence(encoder, decoder, device).to(device)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=vocab2['<pad>'])
optimizer = torch.optim.Adam(RNN_model.parameters())

def train(RNN_model, dataLoad, criterion, optimizer, device, epochs, teacher_forcing_ratio, clip):
  RNN_model.train()
  for epoch in range(epochs):
    epoch_loss = 0
    for english, spanish in dataLoad: # iterate in batches through dataloader
      english = english.to(device)
      spanish = spanish.to(device)
      optimizer.zero_grad()

      output = RNN_model(english, spanish, teacher_forcing_ratio)
      output = output[1:].view(-1, output.shape[-1])
      spanish = spanish[1:].view(-1)

      loss = criterion(output, spanish)
      loss.backward()
      torch.nn.utils.clip_grad_norm_(RNN_model.parameters(),clip)
      optimizer.step()
      epoch_loss += loss.item()

train(RNN_model, dataLoad, criterion, optimizer, device, 5, 0.5, 1.0)

KeyboardInterrupt: 

In [None]:
def translation(model, input, vocab1, vocab2, device):

  model.eval()
  #tokens = english_tokenizer(input)
  #indices = []
  """for token in tokens:
      indices.append(vocab1[token])
  input_tensor = torch.tensor([vocab1['<sos>']] + indices + [vocab1['<eos>']], dtype=torch.long) """

  with torch.no_grad():
    tokens = getTokens(input, english_tokenizer, spanish_tokenizer, "english")
    indices = getIndices(tokens, vocab1, vocab2, "english")
    tensor = torch.tensor(indices, dtype=torch.long).unsqueeze(-1).to(device)
    hidden, cell = model.encoder(tensor)
    spanish = [vocab2['<sos>']]

    """encoder_output, hidden = model.encoder(tensor, len(tokens))
    spanish_vocab = {i: word for word, i in spanish_vocab.items()}
    decoder_input = torch.tensor([vocab2['<sos>']]).to(device)
    spanish = [] """

    for i in range(50):
        tensorInput = torch.tensor([spanish[-1]], dtype=torch.long).to(device)
        decoder_output, hidden, cell = model.decoder(tensorInput, hidden, cell)
        top1 = decoder_output.argmax(-1).item()
        if top1 == vocab2['<eos>']:
          break
        spanish.append(top1)
        decoder_input = torch.tensor([top1]).to(device)
    tokens = vocab2.lookup_tokens(spanish)
  return tokens


#train(RNN_model, dataLoad, criterion, optimizer, device, 10, 0.5)
sentence = "Hello there"
translated_sentence = translation(RNN_model, sentence, vocab1, vocab2, device)
print(translated_sentence)

['<sos>', 'Tom']
