This notebook serves as a prototype for creating the Yoruba tokenizer in use for a machine translation task.

Note: The labeled datasets used in this notebook extermly small for a machine translation task

In [1]:
'''
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm
!pip install datasets evaluate torchtext --upgrade
!pip install tokenizers
'''

Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m32.3 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
Collecting de-core-news-sm==3.7.0
  Downloading https://github.com/explosion/spacy-models/releases/download/de_core_news_sm-3.7.0/de_core_news_sm-3.7.0-py3-none-any.whl (14.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.6/14.6 MB[0m [31m26.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: de-core-ne

In [27]:
# import required libaries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

import torch
import torch.nn as nn
import torch.optim as optim

import spacy
import datasets
from datasets import Dataset, DatasetDict, load_dataset
import torchtext
from torchtext import vocab
import tqdm
import evaluate

import random
seed = 1234
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True

### Creating a custom tokenizer.
Using the HuggingFace custom tokenizer api

In [60]:
tokenizer = Tokenizer(BPE(unk_token='[UNK]'))
tokenizer.pre_tokenizer = Whitespace()

trainer = BpeTrainer(special_tokens=['[UNK]'])

# load data to create tokenizer from
data_path = '/content/drive/MyDrive/Colab_Notebooks/RNN/eng_yor_Seq2Seq/data_for_custom_tokenizer/biblica.txt'

# tokenizer training
tokenizer.train([data_path], trainer)

# save custom tokenizer
tokenizer.save('/content/drive/MyDrive/Colab_Notebooks/RNN/eng_yor_Seq2Seq/data_for_custom_tokenizer/yoruba_tokenizer.json')

# load tokenizer
tokenizer = Tokenizer.from_file('/content/drive/MyDrive/Colab_Notebooks/RNN/eng_yor_Seq2Seq/data_for_custom_tokenizer/yoruba_tokenizer.json')

# simple usage
tokenizer.encode('Láàárọ̀ ọjọ́ kejì, bègbè míì.').tokens

['L', 'áà', 'á', 'rọ̀', 'ọjọ́', 'kejì', ',', 'bè', 'gbè', 'mí', 'ì', '.']

### Load dataset
Note: using a very small labeled English to Yoruba dataset.

1. Load the dataset
2. Split into train, validation and test dataset.
3. Save the datasets to file
4. Load them using the HuggingFace load_dataset function

In [61]:
# Paths
data_path = '/content/drive/MyDrive/Colab_Notebooks/RNN/eng_yor_Seq2Seq/data/'
english_path = data_path + 'small_vocab_en.txt'
yoruba_path = data_path + 'small_vocab_yor.txt'


# Load data
english = pd.read_csv(english_path, sep='/n', header=None, names=['en'], engine='python')
yoruba = pd.read_csv(yoruba_path, sep='/n', header=None, names=['yr'], engine='python')

# Concatenate and split data
df = pd.concat([english, yoruba], axis=1)
train, test = train_test_split(df, test_size=0.5, random_state=42)
test, validation = train_test_split(test, test_size=0.5, random_state=42)

# Save DataFrames to CSV
train_path = data_path + 'train.csv'
test_path = data_path + 'test.csv'
validation_path = data_path + 'validation.csv'

train.to_csv(train_path, index=False)
test.to_csv(test_path, index=False)
validation.to_csv(validation_path, index=False)

# Load DataFrames into DatasetDict
data_files = {"train": train_path, "test": test_path, "validation": validation_path}
dataset = load_dataset('csv', data_files=data_files)

# Check the shapes
dataset

Generating train split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['en', 'yr'],
        num_rows: 10
    })
    test: Dataset({
        features: ['en', 'yr'],
        num_rows: 5
    })
    validation: Dataset({
        features: ['en', 'yr'],
        num_rows: 6
    })
})

In [31]:
# dataset = datasets.load_dataset("bentrevett/multi30k")

In [62]:
train_data, valid_data, test_data = (
    dataset["train"],
    dataset["validation"],
    dataset["test"],
)

train_data[0]

{'en': 'I  don’t like going out to parties with friends but I like watching TV..',
 'yr': 'Mo kórìíra láti máa lo sí àpèjẹ pèlú àwọn ọ̀rẹ́ mi ṣùgbọ́n mo fẹ́ràn láti máa wo tẹlifíṣọ̀n'}

## Tokenizer
The class is to override the spacy.load tokenizer with my custom tokenizer (yoruba) in json format

In [63]:
import spacy
from spacy.tokens import Doc
from transformers import PreTrainedTokenizerFast

class CustomTokenizer:
    def __init__(self, vocab, hf_tokenizer):
        self.vocab = vocab
        self.hf_tokenizer = hf_tokenizer

    def __call__(self, text):
        encoded_input = self.hf_tokenizer(text)
        words = self.hf_tokenizer.convert_ids_to_tokens(encoded_input['input_ids'])
        spaces = [True] * len(words)  # Assuming each token is followed by a space
        doc = Doc(self.vocab, words=words, spaces=spaces)

        # Store token IDs in the custom attribute
        doc.user_data['input_ids'] = encoded_input['input_ids']
        return doc

# Load your custom Hugging Face tokenizer
hf_tokenizer = PreTrainedTokenizerFast(tokenizer_file="/content/drive/MyDrive/Colab_Notebooks/RNN/eng_yor_Seq2Seq/data_for_custom_tokenizer/yoruba_tokenizer.json")

# Load the SpaCy pipelines
en_nlp = spacy.load("en_core_web_sm")
yr_nlp = spacy.load("de_core_news_sm")

# Replace the tokenizer in the de_nlp pipeline
yr_nlp.tokenizer = CustomTokenizer(yr_nlp.vocab, hf_tokenizer)

# Test the pipeline with your custom tokenizer
text = "Láàárọ̀ ọjọ́ kejì, bègbè míì. Ìdí nìyẹn tí ìwọ kò tíì fi ṣàìsàn."
doc = yr_nlp(text)

# Print the tokens and their IDs
for token, token_id in zip(doc, doc.user_data['input_ids']):
    print(f"Token: {token.text}, ID: {token_id}")

Token: L, ID: 30
Token: áà, ID: 129
Token: á, ID: 73
Token: rọ̀, ID: 224
Token: ọjọ́, ID: 265
Token: kejì, ID: 532
Token: ,, ID: 4
Token: bè, ID: 1730
Token: gbè, ID: 416
Token: mí, ID: 198
Token: ì, ID: 76
Token: ., ID: 6
Token: Ìdí, ID: 2213
Token: nìyẹn, ID: 15733
Token: tí, ID: 107
Token: ìwọ, ID: 196
Token: kò, ID: 147
Token: tí, ID: 107
Token: ì, ID: 76
Token: fi, ID: 134
Token: ṣàìsàn, ID: 4551
Token: ., ID: 6


In [64]:
def tokenize_example(example, en_nlp, yr_nlp, max_length, sos_token, eos_token):
  en_tokens = [token.text.lower() for token in en_nlp.tokenizer(example['en'])][:max_length]
  yr_tokens = [token.text.lower() for token in yr_nlp.tokenizer(example['yr'])][:max_length]

  en_tokens = [sos_token] + en_tokens + [eos_token]
  yr_tokens = [sos_token] + yr_tokens + [eos_token]
  return {'en_tokens': en_tokens, 'yr_tokens': yr_tokens}

In [65]:
max_length = 1_000
sos_token = '[SOS]'
eos_token = '[EOS]'

fn_kwargs = {
    'en_nlp': en_nlp,
    'yr_nlp': yr_nlp,
    'max_length': max_length,
    'sos_token': sos_token,
    'eos_token': eos_token,
}

train_data = train_data.map(tokenize_example, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(tokenize_example, fn_kwargs=fn_kwargs)
test_data = test_data.map(tokenize_example, fn_kwargs=fn_kwargs)

Map:   0%|          | 0/10 [00:00<?, ? examples/s]

Map:   0%|          | 0/6 [00:00<?, ? examples/s]

Map:   0%|          | 0/5 [00:00<?, ? examples/s]

## Vocabulary

In [66]:
from torchtext.vocab import vocab
from collections import Counter

min_freq = 2
unk_token = "[UNK]"
pad_token = "[PAD]"

special_tokens = [
    unk_token,
    pad_token,
    sos_token,
    eos_token,
]

en_vocab = vocab(Counter([token for tokens in train_data['en_tokens'] for token in tokens]), min_freq=min_freq, specials=special_tokens)
yr_vocab = vocab(Counter([token for tokens in train_data['yr_tokens'] for token in tokens]), min_freq=min_freq, specials=special_tokens)

In [67]:
assert en_vocab[unk_token] == yr_vocab[unk_token]
assert en_vocab[pad_token] == yr_vocab[pad_token]

unk_index = en_vocab[unk_token]
pad_index = en_vocab[pad_token]

In [68]:
en_vocab.set_default_index(unk_index)
yr_vocab.set_default_index(unk_index)

In [69]:
# map tokens to indices
def numericalize_example(example, en_vocab, yr_vocab):
  en_ids = en_vocab.lookup_indices(example['en_tokens'])
  yr_ids = yr_vocab.lookup_indices(example['yr_tokens'])
  return {'en_ids': en_ids, 'yr_ids': yr_ids}

In [70]:
fn_kwargs = {
    'en_vocab': en_vocab,
    'yr_vocab': yr_vocab
}

train_data = train_data.map(numericalize_example, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(numericalize_example, fn_kwargs=fn_kwargs)
test_data = test_data.map(numericalize_example, fn_kwargs=fn_kwargs)

Map:   0%|          | 0/10 [00:00<?, ? examples/s]

Map:   0%|          | 0/6 [00:00<?, ? examples/s]

Map:   0%|          | 0/5 [00:00<?, ? examples/s]

In [71]:
# with format
data_type = 'torch'
format_columns = ['en_ids', 'yr_ids']

train_data = train_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

valid_data = valid_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

test_data = test_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

In [72]:
# sequence padding
def get_collate_fn(pad_index):
  def collate_fn(batch):
    batch_en_ids = [example['en_ids'] for example in batch]
    batch_yr_ids = [example['yr_ids'] for example in batch]
    batch_en_ids = nn.utils.rnn.pad_sequence(batch_en_ids, padding_value=pad_index)
    batch_yr_ids = nn.utils.rnn.pad_sequence(batch_yr_ids, padding_value=pad_index)
    batch = {
        'en_ids': batch_en_ids,
        'yr_ids': batch_yr_ids
    }
    return batch
  return collate_fn

In [73]:
# dataloader
def get_data_loader(dataset, batch_size, pad_index, shuffle=False):
  collate_fn = get_collate_fn(pad_index)
  data_loader = torch.utils.data.DataLoader(
      dataset=dataset,
      batch_size=batch_size,
      collate_fn=collate_fn,
      shuffle=shuffle,
  )
  return data_loader

In [74]:
batch_size = 128
train_data_loader = get_data_loader(train_data, batch_size, pad_index, shuffle=True)
valid_data_loader = get_data_loader(valid_data, batch_size, pad_index, shuffle=False)
test_data_loader = get_data_loader(test_data, batch_size, pad_index, shuffle=False)

In [75]:
next(iter(train_data_loader))['yr_ids'].shape

torch.Size([43, 10])

# Building the model

In [76]:
# Encoder
class Encoder(nn.Module):
  def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
    super().__init__()
    self.hidden_dim = hidden_dim
    self.n_layers = n_layers
    self.embedding = nn.Embedding(input_dim, embedding_dim)
    self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
    self.dropout = nn.Dropout(dropout)

  def forward(self, src):
    embedded = self.dropout(self.embedding(src))
    output, (hidden, cell) = self.rnn(embedded)
    return hidden, cell

# Decoder
class Decoder(nn.Module):
  def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
    super().__init__()
    self.output_dim = output_dim
    self.hidden_dim = hidden_dim
    self.n_layers = n_layers
    self.embedding = nn.Embedding(output_dim, embedding_dim)
    self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
    self.fc_out = nn.Linear(hidden_dim, output_dim)
    self.dropout = nn.Dropout(dropout)

  def forward(self, input, hidden, cell):
    input = input.unsqueeze(0)
    embedded = self.dropout(self.embedding(input))
    output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
    prediction = self.fc_out(output.squeeze(0))
    return prediction, hidden, cell

# Seq2Seq
class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder, device):
    super().__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.device = device
    assert(
        encoder.hidden_dim == decoder.hidden_dim
    ), 'Hidden dimensions of encoder and decoder must be equal'
    assert(
        encoder.n_layers == decoder.n_layers
    ), 'Encoder and Decoder must have equal number of layers'

  def forward(self, src, trg, teacher_forcing_ratio):
    batch_size = trg.shape[1]
    trg_length = trg.shape[0]
    trg_vocab_size = self.decoder.output_dim
    outputs = torch.zeros(trg_length, batch_size, trg_vocab_size).to(self.device)
    hidden, cell = self.encoder(src)

    input = trg[0, :]
    for t in range(1, trg_length):
      output, hidden, cell = self.decoder(input, hidden, cell)
      outputs[t] = output
      teacher_force = random.random() < teacher_forcing_ratio
      top1 = output.argmax(1)
      input = trg[t] if teacher_force else top1
    return outputs

In [77]:
# input_dim, embedding_dim, hidden_dim, n_layers, dropout
input_dim = len(yr_vocab)
output_dim = len(en_vocab)
encoder_embedding_dim = 256
decoder_embedding_dim = 256
hidden_dim = 512
n_layers = 2
encoder_dropout = 0.5
decoder_dropout = 0.5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = Encoder(
    input_dim, encoder_embedding_dim, hidden_dim, n_layers, encoder_dropout
)

decoder = Decoder(
    output_dim, decoder_embedding_dim, hidden_dim, n_layers, decoder_dropout
)

model = Seq2Seq(encoder, decoder, device).to(device)

# weight initialization
def init_weights(m):
  for name, param in m.named_parameters():
    nn.init.uniform_(param.data, -0.08, 0.08)

model.apply(init_weights)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(41, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(16, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (fc_out): Linear(in_features=512, out_features=16, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

In [78]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"The model has {count_parameters(model):,} trainable parameters")

The model has 7,379,216 trainable parameters


In [79]:
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=pad_index)

In [80]:
def train_fn(
    model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio, device):
  model.train()
  epoch_loss = 0
  for i, batch in enumerate(data_loader):
    src = batch['yr_ids'].to(device)
    trg = batch['en_ids'].to(device)

    optimizer.zero_grad()
    output = model(src, trg, teacher_forcing_ratio)
    output_dim = output.shape[-1]
    output = output[1:].view(-1, output_dim)

    trg = trg[1:].view(-1)
    loss = criterion(output, trg)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
    optimizer.step()

    epoch_loss += loss.item()
  return epoch_loss / len(data_loader)

In [81]:
def evaluate_fn(model, data_loader, criterion, device):
  model.eval()
  epoch_loss = 0
  with torch.no_grad():
    for i, batch in enumerate(data_loader):
      src = batch['yr_ids'].to(device)
      trg = batch['en_ids'].to(device)

      output = model(src, trg, 0)
      output_dim = output.shape[-1]
      output = output[1:].view(-1, output_dim)
      trg = trg[1:].view(-1)

      loss = criterion(output, trg)
      epoch_loss += loss.item()
  return epoch_loss / len(data_loader)

In [82]:
n_epochs = 10
clip = 2.0
teacher_forcing_ratio = 0.5

best_valid_loss = float('inf')

for epoch in tqdm.tqdm(range(n_epochs)):
  train_loss = train_fn(
      model, train_data_loader, optimizer, criterion, clip, teacher_forcing_ratio, device,
  )
  valid_loss = evaluate_fn(
      model, valid_data_loader, criterion, device
  )

  if valid_loss < best_valid_loss:
    best_valid_loss = valid_loss
    torch.save(model.state_dict(), 'tut1-model.pt')
  print(f'\tTrain Loss: {train_loss:7.3f} | Train PPL: {np.exp(train_loss): 7.3f}')
  print(f'\tTrain Loss: {valid_loss:7.3f} | Train PPL: {np.exp(valid_loss): 7.3f}')

 10%|█         | 1/10 [00:01<00:11,  1.23s/it]

	Train Loss:   2.777 | Train PPL:  16.070
	Train Loss:   2.429 | Train PPL:  11.345


 20%|██        | 2/10 [00:02<00:09,  1.25s/it]

	Train Loss:   2.444 | Train PPL:  11.518
	Train Loss:   1.929 | Train PPL:   6.885


 30%|███       | 3/10 [00:03<00:08,  1.24s/it]

	Train Loss:   1.960 | Train PPL:   7.101
	Train Loss:   1.781 | Train PPL:   5.939


 40%|████      | 4/10 [00:05<00:08,  1.36s/it]

	Train Loss:   1.881 | Train PPL:   6.560
	Train Loss:   1.537 | Train PPL:   4.650


 50%|█████     | 5/10 [00:06<00:07,  1.45s/it]

	Train Loss:   1.642 | Train PPL:   5.166
	Train Loss:   1.434 | Train PPL:   4.195


 60%|██████    | 6/10 [00:08<00:05,  1.47s/it]

	Train Loss:   1.573 | Train PPL:   4.820
	Train Loss:   1.435 | Train PPL:   4.200


 70%|███████   | 7/10 [00:10<00:04,  1.52s/it]

	Train Loss:   1.588 | Train PPL:   4.894
	Train Loss:   1.385 | Train PPL:   3.994


 80%|████████  | 8/10 [00:11<00:02,  1.45s/it]

	Train Loss:   1.554 | Train PPL:   4.732
	Train Loss:   1.326 | Train PPL:   3.766


 90%|█████████ | 9/10 [00:12<00:01,  1.43s/it]

	Train Loss:   1.500 | Train PPL:   4.482
	Train Loss:   1.310 | Train PPL:   3.707


100%|██████████| 10/10 [00:14<00:00,  1.42s/it]

	Train Loss:   1.469 | Train PPL:   4.345
	Train Loss:   1.332 | Train PPL:   3.788





In [83]:
# evaluating the model
model.load_state_dict(torch.load('/content/tut1-model.pt'))

test_loss = evaluate_fn(model, test_data_loader, criterion, device)

print(f'Test Loss: {test_loss: .3f} | Test PPL: {np.exp(test_loss): 7.3f} |')

Test Loss:  1.577 | Test PPL:   4.841 |


In [84]:
def translate_sentence(
    sentence, model, en_nlp, yr_nlp, en_vocab, yr_vocab, sos_token, eos_token, device, max_output_length=25):
  model.eval()
  with torch.no_grad():
    if isinstance(sentence, str):
      tokens = [token.text.lower() for token in yr_nlp.tokenizer(sentence)]
    else:
      tokens = [token.lower() for token in sentence]

    tokens = [sos_token] + tokens + [eos_token]
    ids = yr_vocab.lookup_indices(tokens)
    tensor = torch.LongTensor(ids).unsqueeze(-1).to(device)
    hidden, cell = model.encoder(tensor)
    inputs = yr_vocab.lookup_indices([sos_token])

    for _ in range(max_output_length):
      inputs_tensor = torch.LongTensor([inputs[-1]]).to(device)
      output, hidden, cell = model.decoder(inputs_tensor, hidden, cell)
      predicted_tokens = output.argmax(-1).item()
      inputs.append(predicted_tokens)
      if predicted_tokens == en_vocab[eos_token]:
        break
    tokens = en_vocab.lookup_tokens(inputs)
  return tokens

In [85]:
sentence = test_data[0]['yr']
expected_sentence = test_data[0]['en']

sentence, expected_sentence

('Gẹ́gẹ́ bí ìṣe rẹ̀, ó ti to tábìlì fún èèyàn méjì.',
 'Out of habit, she has set the table for two.')

In [None]:
translation = translate_sentence(
    sentence,
    model,
    en_nlp,
    yr_nlp,
    en_vocab,
    yr_vocab,
    sos_token,
    eos_token,
    device

)
translation

reference: [notebook](https://github.com/bentrevett/pytorch-seq2seq/blob/main/1%20-%20Sequence%20to%20Sequence%20Learning%20with%20Neural%20Networks.ipynb)


Future work
1. Increase the tokenizer dataset
2. Increase the labeled dataset