In [177]:
from bs4 import BeautifulSoup
import requests
import re
from deep_translator import GoogleTranslator
import sentencepiece as spm
import json
import pandas as pd
from transformers import MarianTokenizer, MarianMTModel, MarianConfig
from datasets import Dataset, concatenate_datasets
import nlpaug.augmenter.char as nac
import numpy as np

Scraping the data from different websites/dictionaries/romani courses

In [181]:
class Scraper:
  # This scraper will help us gather romani texts from certain websites

  def __init__(self):
    self.romani_data_output_path = 'romani.txt'
    self.romanian_data_output_path = 'romanian.txt'

  def glosbe_romani_scraper(self, url):
    # This site provides samples in the carpathian romani dialect
    # Note: Some samples were added manually due to some problems with scraping

    request = requests.get(url)
    soup = BeautifulSoup(request.text, 'html.parser')
    romanian_phrases = soup.find_all('div', attrs = {'class':'w-1/2 dir-aware-pr-1'})
    carpathian_romani_phrases = soup.find_all('div', attrs = {'class':'w-1/2 dir-aware-pl-1'})

    file_writer_romanian, file_writer_romani = open(self.romanian_data_output_path, 'a'), open(self.romani_data_output_path, 'a')

    for romanian_phrase, romani_phrase in zip(romanian_phrases, carpathian_romani_phrases):
      ro_phrase_text, roma_phrase_text = romanian_phrase.get_text().strip(), romani_phrase.get_text().strip()

      if ro_phrase_text and roma_phrase_text:
        file_writer_romani.write(roma_phrase_text), file_writer_romani.write('\n')
        file_writer_romanian.write(ro_phrase_text), file_writer_romanian.write('\n')

  def get_phrases_from_dictionary_course(self, path_dict):
    # This method gets some samples from a romanian - kalderash romani dictionary and a romani course that we found online
    
    dict_file = open(path_dict, 'r')
    file_writer_romanian, file_writer_romani = open(self.romanian_data_output_path, 'a'), open(self.romani_data_output_path, 'a')
    for sample in dict_file:
      phrases = sample.split(':')
      romani_phrase, romanian_phrase = phrases[0], phrases[1].strip()
      
      file_writer_romani.write(romani_phrase), file_writer_romani.write('\n')
      file_writer_romanian.write(romanian_phrase), file_writer_romanian.write('\n')


Next step was to create a tokenizer. We decided to use a MarianTokenizer from HelsinkiNLP as it has support for romance languages.

In [None]:
# Creating a tokenizer and a vocabulary
# Here you can use your custom tokenizer if you don't want to use the already existent one
def create_tokenizer(corpus_path, tokenizer_model_prefix, vocab_size):
  spm.SentencePieceTrainer.train(
      input = corpus_path,
      model_prefix = tokenizer_model_prefix,
      vocab_size = int(vocab_size),
      pad_id=0,
      unk_id=1,
      bos_id=2,
      eos_id=3
  )

def create_vocabulary(tok_prefix_source, tok_prefix_target):
  sp_processor_source = spm.SentencePieceProcessor()
  sp_processor_source.load(tok_prefix_source + '.model')

  sp_processor_target = spm.SentencePieceProcessor()
  sp_processor_target.load(tok_prefix_target + '.model')

  vocab_source, vocab_target = {}, {}

  for id in range(sp_processor_source.get_piece_size()):
    token_source = sp_processor_source.id_to_piece(id)
    vocab_source[token_source] = id

    token_target = sp_processor_target.id_to_piece(id)
    vocab_target[token_target] = id
  
  return vocab_source, vocab_target

def combine_vocabularies(vocab_source, vocab_target):
  combined_vocab = {}
  output_file = 'vocab.json'

  for idx, (token, _) in enumerate(vocab_target.items()):
    combined_vocab[token] = int(idx)
  
  idx_start_source_vocab, counter_non_duplicates = len(combined_vocab), 0 # keeping count of non duplicate items in the dictionary and also maintaining their index

  for idx, (token, _) in enumerate(vocab_source.items()):
    if token not in combined_vocab:
      combined_vocab[token] = idx_start_source_vocab + counter_non_duplicates
      
      counter_non_duplicates += 1

  with open(output_file, 'w') as vocab_file:
    json.dump(combined_vocab, vocab_file, indent=4)

vocab_size = '' # insert the vocab size here: use a greater size if you have more data  
create_tokenizer('romani.txt', 'target', vocab_size)
create_tokenizer('romanian.txt', 'source', vocab_size)

vocab_source, vocab_target = create_vocabulary('source', 'target')
combine_vocabularies(vocab_source, vocab_target)

tokenizer = MarianTokenizer(
    source_spm="source.model",
    target_spm="target.model",
    vocab="vocab.json" 
)

tokenizer.save_pretrained('romanian_romani_tokenizer')

Obtaining the train, test sets and tokenizing them 

In [None]:
USE_PRETRAINED = False
USE_AUGMENTATION = True

# Loading the data
def load_parallel_data(romanian_path, romani_path):
    with open(romanian_path, 'r', encoding='utf-8') as ro_file, \
         open(romani_path, 'r', encoding='utf-8') as roma_file:
        romanian_lines = ro_file.readlines()
        romani_lines = roma_file.readlines()

    assert len(romanian_lines) == len(romani_lines), "Mismatched number of lines!"

    data = {"translation": [{"ro": ro.strip(), "roma": roma.strip()}
                              for ro, roma in zip(romanian_lines, romani_lines)]}
    return Dataset.from_dict(data)


# Tokenizing the whole dataset
def preprocess_function(examples):
    inputs = [example["ro"] for example in examples["translation"]]
    targets = [example["roma"] for example in examples["translation"]]

    model_inputs = tokenizer(inputs, text_target = targets, max_length=128, truncation=True, padding="max_length")
    
    return model_inputs

tokenizer = MarianTokenizer.from_pretrained('romanian_romani_tokenizer')
huggingface_dataset = load_parallel_data('romanian.txt', 'romani.txt')
huggingface_dataset_shuffled = huggingface_dataset.shuffle(seed = 42)
train_test_split = huggingface_dataset_shuffled.train_test_split(test_size=0.3)
train_dataset = train_test_split["train"]
test_dataset = train_test_split["test"]

This part deals with data augmentation: a first augmentation technique was back translation. The Romanian sentences were translated to English and then the English equivalent was translated back into Romanian, while the Romani samples were kept the same.
The second augmentation technique was sentence insertion (beginning and end). Precisely: we introduced a token at the begining of each romanian and romani sentence. The third augmentation technique was to insert a char in one of the words for each romanian and romani sample generating typos.

In [None]:
class DataAugmenter:
  def __init__(self, train_set : Dataset):
    self.romani_data_path = 'romani.txt'
    self.romanian_data_path = 'romanian.txt'
    self.train_set = train_set


  def back_translation(self):
    # Translating the romanian sentence in english and then back into romanian to provide some variety in the dataset

    augmented_sentences_list = []
    augmented_ratio = int(0.05 * len(self.train_set))

    shuffled_train_set = self.train_set.shuffle(seed = 42).select(range(augmented_ratio))
    for sample in shuffled_train_set:
      ro_sample, roma_sample = sample['translation']['ro'], sample['translation']['roma']

      english_equivalent = GoogleTranslator(source='ro', target='en').translate(ro_sample)
      back_translated = GoogleTranslator(source='en', target='ro').translate(english_equivalent)

      augmented_sentence = {
                "translation": {
                    "ro": back_translated.strip(),
                    "roma": roma_sample.strip(),
                }
            }
      augmented_sentences_list.append(augmented_sentence)

    return Dataset.from_list(augmented_sentences_list)
  
  def random_char_insertion(self):
    # Replacing 2 characters in each romanian and romani sample, simulating a typo

    augmented_sentences_list = []
    augmented_ratio = int(0.05 * len(self.train_set))
    shuffled_train_set = self.train_set.shuffle(seed = 42).select(range(augmented_ratio))

    for sample in shuffled_train_set:
      ro_sample, roma_sample = sample['translation']['ro'], sample['translation']['roma']

      aug = nac.RandomCharAug(action = 'insert', aug_word_max = 1, aug_char_max = 2)
      augmented_data_romanian, augmented_data_romani = aug.augment(ro_sample), aug.augment(roma_sample)
      
      new_sentence_romanian = " ".join(augmented_data_romanian)
      new_sentence_romani = " ".join(augmented_data_romani)

      augmented_sentence = {
                "translation": {
                    "ro": new_sentence_romanian.strip(),
                    "roma": new_sentence_romani.strip(),
                }
            }
      augmented_sentences_list.append(augmented_sentence)

    return Dataset.from_list(augmented_sentences_list)
      

  def sentence_insertion(self):
    # Inserting a token at the begining of the sentence for each romanian and romani sample

    begining_token_romani = 'Po del chavo - '
    begining_token_romanian = 'Începutul propoziției - '
    augmented_sentences_list = []
    augmented_ratio = int(0.05 * len(self.train_set))

    shuffled_train_set = self.train_set.shuffle(seed = 42).select(range(augmented_ratio))

    for sample in shuffled_train_set:
      ro_sample, roma_sample = sample['translation']['ro'], sample['translation']['roma']

      new_sentence_romanian = begining_token_romanian + ro_sample
      new_sentence_romani = begining_token_romani + roma_sample

      augmented_sentence = {
                "translation": {
                    "ro": new_sentence_romanian.strip(),
                    "roma": new_sentence_romani.strip(),
                }
            }
      augmented_sentences_list.append(augmented_sentence)

    return Dataset.from_list(augmented_sentences_list)


augmenter = DataAugmenter(train_dataset)
back_translated_ds, random_char_insertion_ds, sentence_insertion_augmented_ds = augmenter.back_translation(), augmenter.random_char_insertion(), augmenter.sentence_insertion()
augmented_training_dataset = concatenate_datasets([train_dataset, back_translated_ds, random_char_insertion_ds, sentence_insertion_augmented_ds])
augmented_training_dataset = augmented_training_dataset.shuffle(seed = 42)

if USE_AUGMENTATION:
  tokenized_train_dataset = augmented_training_dataset.map(preprocess_function, batched=True)
  tokenized_test_dataset = test_dataset.map(preprocess_function, batched=True)

else:
  tokenized_train_dataset = train_dataset.map(preprocess_function, batched=True)
  tokenized_test_dataset = test_dataset.map(preprocess_function, batched=True)

Preparing the model

In [None]:
if USE_PRETRAINED:
  model_to_train = MarianMTModel.from_pretrained('Helsinki-NLP/opus-mt-roa-en')
  tokenizer = MarianTokenizer.from_pretrained(model_to_train) 

else:
  trained_model_config = MarianConfig()
  model_to_train = MarianMTModel(config = trained_model_config)

Training

In [None]:
metric = evaluate.load('sacrebleu')

def postprocess_text(preds, labels):
    preds = [pred.strip() for pred in preds]
    labels = [[label.strip()] for label in labels]

    return preds, labels


def compute_metrics(eval_preds):
    preds, labels = eval_preds
    if isinstance(preds, tuple):
        preds = preds[0]
    decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)

    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

    decoded_preds, decoded_labels = postprocess_text(decoded_preds, decoded_labels)

    result = metric.compute(predictions=decoded_preds, references=decoded_labels)
    result = {"bleu": result["score"]}

    

    result = {k: round(v, 4) for k, v in result.items()}
    return result

training_args = Seq2SeqTrainingArguments(
    output_dir="./results",             # Output directory
    evaluation_strategy="epoch",       # Evaluate every epoch
    learning_rate=5e-5,                  # Learning rate
    per_device_train_batch_size=16,      # Batch size for training
    per_device_eval_batch_size=16,       # Batch size for evaluation
    num_train_epochs=10,                  # Number of epochs
    save_steps=500,                      # Save checkpoint every 500 steps
    save_total_limit=2,                  # Keep only the last 2 checkpoints
    predict_with_generate=True,          # Use generate for evaluation
    logging_dir="./logs",              # Log directory
    logging_steps=10,
     # Log every 10 steps
)

trainer = Seq2SeqTrainer(
    model=model_to_train,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_test_dataset,
    tokenizer=tokenizer,
    compute_metrics = compute_metrics,
)

# Train the model
trainer.train()