In [1]:
'''
rnn project
sequence to sequence translation task 
using custom tokenization and vectorization
'''

'\nrnn project\nsequence to sequence translation task \nusing custom tokenization and vectorization\n'

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [3]:
# !wget -P "../data" http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
# !unzip ../data/spa-eng.zip -d ../data/

In [4]:
'''
.read() method:
appropriate when you want to read the entire contents of a file into a string.

If the file is very large and you only need to process it line by line or in chunks, you might prefer to use 
.readline() or iterate over the file object directly. This approach is more memory-efficient.

as the data is not so big .read() is used in this project
'''

'\n.read() method:\nappropriate when you want to read the entire contents of a file into a string.\n\nIf the file is very large and you only need to process it line by line or in chunks, you might prefer to use \n.readline() or iterate over the file object directly. This approach is more memory-efficient.\n\nas the data is not so big .read() is used in this project\n'

In [5]:
with open ('../data/spa-eng/spa.txt', 'r') as f:
        lines = f.read().split('\n')[:-1] # '\n' : split the data line by line

print(lines[0])
print(len(lines[0]))
print(len(lines))

Go.	Ve.
7
118964


In [6]:
data = []
source_data = []
target_data = []

for line in lines:
    source, target = line.split('\t') # '\t' : split the data by the space
    source_data.append(source)
    target_data.append(target)
    data.append((source, target))

print(source_data[0])
print(target_data[0])
print(len(data))

Go.
Ve.
118964


In [7]:
import random
random.shuffle(data)

num_val_samples = int(len(data)*0.15)
num_train_samples = len(data) - 2*num_val_samples

train_pairs = data[:num_train_samples]
val_pairs = data[num_train_samples: num_train_samples + num_val_samples]
test_pairs = data[num_train_samples + num_val_samples: ]

In [8]:
import string
from collections import Counter
from tqdm import tqdm

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

In [9]:
class TextVectorizer():
    def __init__(self, sequence_length, vocab_size, target = False):

        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.target = target
        self.vocab_counter = Counter()
        self.stoi = {'[pad]':0, '[start]':1, '[end]':2, '[unkown]':3}
        self.itos = {0:'[pad]', 1:'[start]', 2:'[end]', 3:'[unkown]'}

    def standardize(self, text):
        text = text.lower()
        return "".join(char for char in text
                        if char not in strip_chars)

    def tokenize(self, text):
        text = self.standardize(text)
        return text.split()
    
    def adapt(self, dataset):
        
        for text in tqdm(dataset):
            tokens = self.tokenize(text)
            for token in tokens:
                self.vocab_counter[token] += 1

        for token, _ in self.vocab_counter.most_common(self.vocab_size):
            index = len(self.stoi)
            self.stoi[token] = index
            self.itos[index] = token

    def encode(self, text):
        text = self.standardize(text)
        tokens = self.tokenize(text)

        if self.target:
            result = ([self.stoi['[start]']]+ [self.stoi.get(token, 3) for token in tokens]
                    + [self.stoi['[end]']])
        else:
            result = [self.stoi.get(token, 3) for token in tokens]
        
        if len(result) <= self.sequence_length:
            pad_size = self.sequence_length - len(result)
            result += [self.stoi.get('[pad]')] * (pad_size)
        else:
            #truncate!
            result = result[:self.sequence_length]    
        return result
        
    def decode(self, int_sequence):
        
        return " ".join(self.itos.get(i , '[unknown]') for i in int_sequence)

In [10]:
sequence_length = 20
vocab_size = 15000

source_vectorizer = TextVectorizer(sequence_length, vocab_size)
target_vectorizer = TextVectorizer(sequence_length +1, vocab_size, target=True)

In [11]:
source_vectorizer.adapt(source_data)
target_vectorizer.adapt(target_data)

100%|██████████| 118964/118964 [00:00<00:00, 289830.25it/s]
100%|██████████| 118964/118964 [00:00<00:00, 268507.75it/s]


In [12]:
encoded_ = source_vectorizer.encode('If you want to sound')
source_vectorizer.decode(encoded_)

'if you want to sound [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad]'

In [13]:
eng, spa = data[700]
print(eng)
print(spa)

What would I do without you?
¿Qué haría yo sin vosotros?


In [14]:
print(source_vectorizer.decode(source_vectorizer.encode(eng)))
print(target_vectorizer.decode(target_vectorizer.encode(spa)))

what would i do without you [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad]
[start] qué haría yo sin vosotros [end] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad] [pad]


In [15]:
print(len(source_vectorizer.decode(source_vectorizer.encode(eng))))
print(len(target_vectorizer.decode(source_vectorizer.encode(eng))))

111
106


In [16]:
class EngSpaDataset(Dataset):
    def __init__(self, data, source_vectorizer, target_vectorizer):
        super().__init__()

        self.data = data
        self.source_vectorizer = source_vectorizer
        self.target_vectorizer = target_vectorizer

    def __len__(self):
        return len(self.data)
    

    def __getitem__(self, index):
        eng, spa = self.data[index]
        eng = self.source_vectorizer.encode(eng)
        spa = self.target_vectorizer.encode(spa)
        return ({
            'english': torch.tensor(eng).long(),
            'spanish': torch.tensor(spa[:-1]).long()
            }, torch.tensor(spa[1:]).long())

In [17]:
train_dataset = EngSpaDataset(train_pairs, source_vectorizer, target_vectorizer)
val_dataset = EngSpaDataset(val_pairs, source_vectorizer, target_vectorizer)
test_dataset = EngSpaDataset(test_pairs, source_vectorizer, target_vectorizer)

In [18]:
train_dataset[0][0]['spanish'].size()

torch.Size([20])

In [19]:
'''
why the collate_fn ->

- permute() method : to change the order of dimensions of a tensor.
- handle variable length (however, it is handled by TextVectorizer already)
- much more organized data

(also possible direct indexing without zero-initialized tensors)
zero-initialized tensors: 
- prepare for padding
- control data storage
'''

'\nwhy the collate_fn ->\n\n- permute() method : to change the order of dimensions of a tensor.\n- handle variable length (however, it is handled by TextVectorizer already)\n- much more organized data\n\n(also possible direct indexing without zero-initialized tensors)\nzero-initialized tensors: \n- prepare for padding\n- control data storage\n'

In [20]:
def permute_batch_seq_collate(data: torch.Tensor): # data-> is a batch of the data
  batch_size = len(data)  
  source_input = torch.zeros(batch_size, data[0][0]["english"].size(0))
  target_input = torch.zeros(batch_size, data[0][0]["spanish"].size(0))
  target_output = torch.zeros(batch_size, data[0][1].size(0))
  for idx, (inputs, output) in enumerate(data):
    source_input[idx] = inputs["english"]
    target_input[idx] = inputs["spanish"]
    target_output[idx] = output

  return (source_input.permute(1, 0).long(), target_input.permute(1, 0).long(),
          target_output.permute(1, 0).long())

In [21]:
batch_size=64

train_ds = DataLoader(train_dataset, batch_size, shuffle=True, collate_fn= permute_batch_seq_collate)
val_ds = DataLoader(val_dataset, batch_size, collate_fn= permute_batch_seq_collate)
test_ds = DataLoader(test_dataset, batch_size, collate_fn= permute_batch_seq_collate)


In [22]:
source_, target_input_ , target_output_ = next(iter(train_ds))
print(source_.size())
print(target_input_.size())
print(target_output_.size())

torch.Size([20, 64])
torch.Size([20, 64])
torch.Size([20, 64])


In [24]:
x = torch.randint(0, vocab_size, size = (20, 64)) # torch.Size([20, 64]) integers between 0 to vocab size

torch.Size([20, 64])

In [51]:
class Encoder(nn.Module):
    def __init__(self, source_dim: int, embedding_dim: int, hidden_dim: int, 
                 padding_index:int =0, num_rnn_layers: int= 1, dropout= 0.2):
        super().__init__()

        self.dropout = nn.Dropout(dropout)
        self.embedding_layer = nn.Embedding(source_dim, embedding_dim, 
                                            padding_idx=padding_index)
        self.lstm_layer = nn.LSTM(embedding_dim, hidden_dim, 
                                  num_layers= num_rnn_layers)

    def forward(self, x):
        x = self.embedding_layer(x)
        x = self.dropout(x)
        output, (cell_state, hidden_state) = self.lstm_layer(x)
        return hidden_state, cell_state
    
        # output size: [seq_len, batch_size, hidden_dim] which we don't need in this model
        # cell_state , hidden_state: [1, batch_size, hidden_dim] 
        # cell_ state and hidde_state passed to Decoder(input cell and input hidden)

In [52]:
x = torch.randint(0, len(source_vectorizer.stoi), size = (20, 64))
encoder_ = Encoder(len(source_vectorizer.stoi), 256, 512)
print(encoder_(x)[0].size())
print(encoder_(x)[1].size())

torch.Size([1, 64, 512])
torch.Size([1, 64, 512])


In [28]:
class Decoder(nn.Module):
    def __init__(self, target_dim:int, embedding_dim: int, hidden_dim:int, 
                 padding_index: int= 0, num_rnn_layers:int =1,  dropout = 0.2):
        super().__init__()

        self.embedding_layer = nn.Embedding(target_dim, embedding_dim, 
                                            padding_index=padding_index)
        self.lstm_layer = nn.LSTM(embedding_dim, hidden_dim, 
                                  num_layers=n_layers)
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, cell_state, hidden_state):
        x = self.embedding_layer(x)
        x = self.dropout(x)
        output , (cell, hidden) = self.lstm_layer(x, cell_state, hidden_state)
        x = self.linear(output)
        return 

13636