# Arabic Diacritization using RoBerta Transformers implementation

## First we need a tokenizer

In [None]:
#!pip3 install tokenizers
from tokenizers import ByteLevelBPETokenizer
tokenizer = ByteLevelBPETokenizer()

### Getting all the arabic combinations

In [None]:
arabic_letters = ['أ','ة','إ','ؤ','آ','ا','ب','ت', 'ث','ج','ح','خ','د','ذ','ر','ز','س','ش','ص','ض','ط','ظ','ع','غ','ف','ق','ك','ل','م','ن','ه','و','ي','ئ','ئ','ء']
arabic_diac = ["َ","ً","ِ","ٍ","ُ","ٌ","ْ","َّ","ِّ","ُّ"]

arabic_combinations= [];
for letter in arabic_letters:
    for diac in arabic_diac:
        arabic_combinations.append(letter+diac)

### Adding the tokens 
(Note the tokenizer won't save the tokens/ this is a tokenizer bug at the time of writing this code)

In [None]:
tokenizer.add_tokens(arabic_combinations)
tokenizer.add_tokens(arabic_letters)
tokenizer.add_special_tokens([
    "<s>",
    "<pad>",
    "</s>",
    "<unk>",
    "<mask>",
])


In [None]:
tokenizer.save_model('./Downloads/AraBertA/')

### Testing the tokenizer

In [None]:
tokenizer.encode("الَسَلَام").ids

Looks Like it's working great

## Now for the Roberta Model

### Making the tokenizer a RobertaTokenizer

In [None]:
# !pip install transformers
from transformers import RobertaTokenizerFast

roberta_tokenizer = RobertaTokenizerFast.from_pretrained('./Downloads/AraBertA/')

roberta_tokenizer.add_tokens(arabic_diac)
roberta_tokenizer.add_tokens(" ")
roberta_tokenizer.add_tokens(arabic_letters)

In [None]:
roberta_tokenizer.from_pretrained()

### Creating a new instance from the Roberta Model

In [None]:
from transformers import RobertaConfig,RobertaForMaskedLM

config = RobertaConfig(
    vocab_size=len(roberta_tokenizer.get_vocab()),
    max_position_embeddings=514,
    num_attention_heads=12,
    num_hidden_layers=6,
    type_vocab_size=1,
)

model = RobertaForMaskedLM(config=config)

In [None]:
model.config

In [None]:
roberta_tokenizer.tokenize('السَلام عَليكُم')

Looks Like it's working as well

## Preparing the dataset

### Importing all the files

In [None]:
from pathlib import Path

paths_train = [str(x) for x in Path("./Downloads/tashkeela_processing/tashkeela_train/").glob("*.txt")]
paths_eval = [str(x) for x in Path("./Downloads/tashkeela_processing/tashkeela_val/").glob("*.txt")]
paths_test = [str(x) for x in Path("./Downloads/tashkeela_processing/tashkeela_test/").glob("*.txt")]

### The Datasets library is a time-saving library from hugging face and it does it's job pretty well in keeping everything organized

In [None]:
# !pip install datasets
from datasets import load_dataset,DatasetDict

train_dataset = load_dataset('text',data_files=paths_train, split='train')
eval_dataset = load_dataset('text',data_files=paths_eval, split='train')
test_dataset = load_dataset('text',data_files=paths_test, split='train')

In [None]:
load_dataset()

In [None]:
# Forming the final DataSetDict
ds = DatasetDict({
    'train': train_dataset,
    'test': test_dataset,
    'eval': eval_dataset})

In [None]:
ds['train']

## In this next part we will try to augment the data by creating different variants of each sentence with random diacritics stripped from each sentence

In [None]:
import random
from random import randint

def create_diacritization_variants(example: str) -> list:
    """
    Creates a random number of variants of any string
    with randomly stripped diacritizations. Each new variant is more stripped down.
    """
    words= list(example.split(" "))
    arabic_diac = ["َ","ً","ِ","ٍ","ُ","ٌ","ْ","َّ","ِّ","ُّ"]
    list_of_example_variants=[]
    modified_example = words
    for iteration in range(randint(2,5)):
        for i in range(len(words)) :
            removed_diac=random.choice(arabic_diac)
            value=randint(1,10)
            modified_example[i] = modified_example[i].replace(removed_diac,'')
        list_of_example_variants.append(' '.join(modified_example))
    return list_of_example_variants

### Augmentiong the dataset
Careful as this cell takes a very long time to run

In [None]:
from tqdm.auto import tqdm
from datasets import Dataset

progress_bar = tqdm(range(ds['train'].num_rows))

augmented_train_data = []
for sentence in ds['train']['text']:
    progress_bar.update(1)
    augmented_train_data = augmented_train_data + create_diacritization_variants(sentence)

augmented_train_dict = {'text': augmented_train_data}
augmented_train_dataset = Dataset.from_dict(augmented_train_dict).shuffle()

###################################################################################

progress_bar = tqdm(range(ds['eval'].num_rows))

augmented_eval_data = []
for sentence in ds['train']['text']:
    progress_bar.update(1)
    augmented_eval_data = augmented_eval_data + create_diacritization_variants(sentence)

augmented_eval_dict = {'text': augmented_eval_data}
augmented_eval_dataset = Dataset.from_dict(augmented_train_dict).shuffle()

###################################################################################

progress_bar = tqdm(range(ds['eval'].num_rows))

augmented_test_data = []
for sentence in ds['train']['text']:
    progress_bar.update(1)
    augmented_test_data = augmented_test_data + create_diacritization_variants(sentence)

augmented_test_dict = {'text': augmented_test_data}
augmented_test_dataset = Dataset.from_dict(augmented_test_dict).shuffle()

In [None]:
aug_ds = DatasetDict{
    'train': augmented_train_dataset,
    'eval': augmented_eval_dataset,
    'test': augmented_test_dataset
}

In [None]:
def encode(examples):
    return roberta_tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)


encoded_train_dataset = ds['train'].map(encode, batched = True);
encoded_eval_dataset = ds['eval'].map(encode, batched = True);
encoded_test_dataset = ds['test'].map(encode, batched = True);

 ### Now we need to explicity specify for the data collator not to mask any letters, only the diacritics

In [None]:
import random
import warnings
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, NewType, Optional, Tuple, Union

import torch
from torch.nn.utils.rnn import pad_sequence
from transformers.file_utils import PaddingStrategy
from transformers.modeling_utils import PreTrainedModel
from transformers.tokenization_utils_base import BatchEncoding, PreTrainedTokenizerBase

@dataclass
class DataCollatorForLanguageModeling:
    """
    Data collator used for language modeling. Inputs are dynamically padded to the maximum length of a batch if they
    are not all of the same length.

    Args:
        tokenizer (:class:`~transformers.PreTrainedTokenizer` or :class:`~transformers.PreTrainedTokenizerFast`):
            The tokenizer used for encoding the data.
        mlm (:obj:`bool`, `optional`, defaults to :obj:`True`):
            Whether or not to use masked language modeling. If set to :obj:`False`, the labels are the same as the
            inputs with the padding tokens ignored (by setting them to -100). Otherwise, the labels are -100 for
            non-masked tokens and the value to predict for the masked token.
        mlm_probability (:obj:`float`, `optional`, defaults to 0.15):
            The probability with which to (randomly) mask tokens in the input, when :obj:`mlm` is set to :obj:`True`.
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.

    .. note::

        For best performance, this data collator should be used with a dataset having items that are dictionaries or
        BatchEncoding, with the :obj:`"special_tokens_mask"` key, as returned by a
        :class:`~transformers.PreTrainedTokenizer` or a :class:`~transformers.PreTrainedTokenizerFast` with the
        argument :obj:`return_special_tokens_mask=True`.
    """

    tokenizer: PreTrainedTokenizerBase
    mlm: bool = True
    mlm_probability: float = 0.15
    pad_to_multiple_of: Optional[int] = None

    def __post_init__(self):
        if self.mlm and self.tokenizer.mask_token is None:
            raise ValueError(
                "This tokenizer does not have a mask token which is necessary for masked language modeling. "
                "You should pass `mlm=False` to train on causal language modeling instead."
            )

    def __call__(
        self, examples: List[Union[List[int], torch.Tensor, Dict[str, torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        # Handle dict or lists with proper padding and conversion to tensor.
        if isinstance(examples[0], (dict, BatchEncoding)):
            batch = self.tokenizer.pad(examples, return_tensors="pt", pad_to_multiple_of=self.pad_to_multiple_of)
        else:
            batch = {"input_ids": _collate_batch(examples, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)}

        # If special token mask has been preprocessed, pop it from the dict.
        special_tokens_mask = batch.pop("special_tokens_mask", None)
        if self.mlm:
            batch["input_ids"], batch["labels"] = self.mask_tokens(
                batch["input_ids"], special_tokens_mask=special_tokens_mask
            )
        else:
            labels = batch["input_ids"].clone()
            if self.tokenizer.pad_token_id is not None:
                labels[labels == self.tokenizer.pad_token_id] = -100
            batch["labels"] = labels
        return batch

    def mask_tokens(
        self, inputs: torch.Tensor, special_tokens_mask: Optional[torch.Tensor] = None
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original.
        """
        labels = inputs.clone()
        # We sample a few tokens in each sequence for MLM training (with probability `self.mlm_probability`)
        probability_matrix = torch.full(labels.shape, self.mlm_probability)
        if special_tokens_mask is None:
            special_tokens_mask = [
                self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
            ]
            my_mask = inputs> 15
            special_tokens_mask = torch.tensor(special_tokens_mask, dtype=torch.bool) | my_mask
            
        else:
            special_tokens_mask = special_tokens_mask.bool()

        probability_matrix.masked_fill_(special_tokens_mask, value=0.0)
        masked_indices = torch.bernoulli(probability_matrix).bool()
        labels[~masked_indices] = -100  # We only compute loss on masked tokens

        # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
        indices_replaced = torch.bernoulli(torch.full(labels.shape, 0.8)).bool() & masked_indices
        inputs[indices_replaced] = self.tokenizer.convert_tokens_to_ids(self.tokenizer.mask_token)

        # 10% of the time, we replace masked input tokens with random word
        indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
        random_words = torch.randint(len(self.tokenizer), labels.shape, dtype=torch.long)
        inputs[indices_random] = random_words[indices_random]

        # The rest of the time (10% of the time) we keep the masked input tokens unchanged
        return inputs, labels


In [None]:
data_collator = DataCollatorForLanguageModeling(
tokenizer=roberta_tokenizer, mlm=True, mlm_probability=0.25)

## Defining the Trainer

In [None]:
from transformers import Trainer, TrainingArguments

training_args = TrainingArguments(
    output_dir = "./AraBertA/model",
    overwrite_output_dir=True,
    num_train_epochs=1,
    per_device_train_batch_size= 8,
    per_device_eval_batch_size= 8,
    logging_steps=10,
    save_steps=5,
    save_total_limit=2,
)
trainer = Trainer(
    model=model, args=training_args, data_collator=data_collator ,train_dataset=encoded_eval_dataset, eval_dataset= encoded_test_dataset)

In [None]:
Trainer()

### Now it's finally time to train

In [None]:
%%time

from torch import cuda
cuda.empty_cache()
trainer.train()


# Now to test the model

In [None]:
test_no_diac_dataset = load_dataset('text',data_files=['./AraBertA/data/old_data/test_no_diac.txt'], split = 'train')

In [None]:
ds['test'][2]

In [None]:
i=0;
masked_sentence_list = [];
for char in test_eval_dataset['text'][0]:
    temp = list(test_no_diac_dataset['text'][0])
    temp[i] = '<mask>'
    temp = ''.join(temp)
    i= i+1
    masked_sentence_list.append(temp)

In [None]:
import torch

In [None]:
import transformers

In [None]:
fill_mask = transformers.pipeline(
    "fill-mask",
    model="./AraBertA/model_4/checkpoint-19900/",
    tokenizer=roberta_tokenizer
)

In [None]:
fill_mask('وَقُلْتُمْ: «لاَ بَلْ عَلَى خَيْل نَهْر<mask>». لِذلِكَ تَهْبُونَ.')

In [None]:
output_of_pipeline = [];
for sentence in masked_sentence_list:
    tmp = fill_mask(sentence)
    output_of_pipeline.append(tmp)

In [None]:
predicted_text = [];
for i in range(len(output_of_pipeline)):
    predicted_text.append(output_of_pipeline[i][0]['token_str'])
    
predicted_text = ''.join(predicted_text)

In [None]:
predicted_text

In [None]:
ds['test'][0]['text']

In [None]:
encoded_test_dataset[0]['input_ids']
predicted_text = [];
for i in range(len(output_of_pipeline)):
    predicted_text.append(output_of_pipeline[i][0]['token_str'])
    
encoded_predicted_text = roberta_tokenizer.encode(''.join(predicted_text))

summation = 0;
for i in range(256):
    if encoded_predicted_text[i] == encoded_test_dataset[0]['input_ids'][i]:
        summation = summation +1

In [None]:
accuracy = summation/256
accuracy*100