In [1]:
import os
import sys
import numpy as np
import torch
from datasets import load_dataset, load_from_disk
from transformers import AutoTokenizer, DataCollatorForLanguageModeling
import epitran

In [21]:
normal_tokenizer = AutoTokenizer.from_pretrained("psktoure/BERT_WordPiece_wikitext")
phonetic_tokenizer = AutoTokenizer.from_pretrained("psktoure/BERT_WordPiece_phonetic_wikitext")

In [3]:
print("Len normal tokenizer vocab: ", len(normal_tokenizer.get_vocab()))
print("Len phonetic tokenizer vocab: ", len(phonetic_tokenizer.get_vocab()))

Len normal tokenizer vocab:  30522
Len phonetic tokenizer vocab:  30522


In [4]:
epi = epitran.Epitran("eng-Latn")

In [6]:
text = "Hello, my name is Paulevec. I am a student at the University of Toronto."
tokenized_text = normal_tokenizer(text, return_offsets_mapping=True)
print(tokenized_text)
ids_to_tokens = normal_tokenizer.convert_ids_to_tokens(tokenized_text["input_ids"])
print(ids_to_tokens)
print(tokenized_text.word_ids())
print(tokenized_text.offset_mapping)

{'input_ids': [1, 12325, 8, 434, 935, 68, 10489, 1244, 25, 9, 31, 79, 23, 3467, 57, 51, 1184, 65, 5758, 9, 2], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 'offset_mapping': [(0, 0), (0, 5), (5, 6), (7, 9), (10, 14), (15, 17), (18, 21), (21, 25), (25, 26), (26, 27), (28, 29), (30, 32), (33, 34), (35, 42), (43, 45), (46, 49), (50, 60), (61, 63), (64, 71), (71, 72), (0, 0)]}
['[CLS]', 'hello', ',', 'my', 'name', 'is', 'pau', 'leve', 'c', '.', 'i', 'am', 'a', 'student', 'at', 'the', 'university', 'of', 'toronto', '.', '[SEP]']
[None, 0, 1, 2, 3, 4, 5, 5, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, None]
[(0, 0), (0, 5), (5, 6), (7, 9), (10, 14), (15, 17), (18, 21), (21, 25), (25, 26), (26, 27), (28, 29), (30, 32), (33, 34), (35, 42), (43, 45), (46, 49), (50, 60), (61, 63), (64, 71), (71, 72), (0, 0)]


In [13]:
nums = [1, 4, 5, 4]
idx = nums.index(4)
print(idx)

1


In [None]:
def xsampa_tokens(word, phonetic_tokenizer):
    phonetic_word = "".joint(epi.xsampa_list(word))
    tokenized_word = phonetic_tokenizer(phonetic_word, add_special_tokens=False)
    ids = tokenized_word["input_ids"]
    return ids


In [14]:
from transformers import DataCollatorForLanguageModeling
import random

class CustomDataCollatorForMLM(DataCollatorForLanguageModeling):
    def __init__(self, tokenizer, phonetic_tokenizer, word_to_phonetic, mlm_probability=0.15):
        super().__init__(tokenizer=tokenizer, mlm_probability=mlm_probability)
        self.phonetic_tokenizer = phonetic_tokenizer
        self.word_to_phonetic = word_to_phonetic

    def __call__(self, examples):
        # Tokenize normal and phonetic text
        normal_texts = [e['normal_text'] for e in examples]
        phonetic_texts = [e['phonetic_text'] for e in examples]
        
        # Tokenize both
        normal_encodings = self.tokenizer(normal_texts, return_tensors="pt", padding=True, truncation=True)
        phonetic_encodings = self.phonetic_tokenizer(phonetic_texts, return_tensors="pt", padding=True, truncation=True)
        
        # Generate MLM masks for normal text
        input_ids = normal_encodings.input_ids
        labels = input_ids.clone()  # Original labels for computing loss
        probability_matrix = torch.full(labels.shape, self.mlm_probability)
        special_tokens_mask = self.tokenizer.get_special_tokens_mask(input_ids.tolist(), already_has_special_tokens=True)
        probability_matrix.masked_fill_(torch.tensor(special_tokens_mask, dtype=torch.bool), value=0.0)
        
        masked_indices = torch.bernoulli(probability_matrix).bool()
        input_ids[masked_indices] = self.tokenizer.mask_token_id  # Replace with mask token

        # Handle corresponding phonetic tokens
        phonetic_labels = phonetic_encodings.input_ids.clone()
        for idx, (normal_sentence, phonetic_sentence) in enumerate(zip(normal_texts, phonetic_texts)):
            # Get masked words
            for token_idx in masked_indices[idx].nonzero():
                word_id = normal_encodings.word_ids(batch_index=idx)[token_idx.item()]
                if word_id is not None:  # Ignore special tokens
                    word = self.tokenizer.decode(normal_encodings.input_ids[idx][word_id])
                    phonetic_tokens = self.word_to_phonetic.get(word, [])
                    # Find and mask in phonetic text
                    for p_token in phonetic_tokens:
                        p_index = phonetic_encodings.input_ids[idx].tolist().index(p_token)
                        phonetic_encodings.input_ids[idx][p_index] = self.phonetic_tokenizer.mask_token_id

        # Return modified normal and phonetic encodings
        return {
            'input_ids': input_ids,
            'labels': labels,
            'phonetic_input_ids': phonetic_encodings.input_ids,
            'phonetic_labels': phonetic_labels
        }


In [None]:
text = "Hello, my name is Paul. I am a student at the University of Toronto."
encoded = normal_tokenizer(text, return_tensors="pt")
labels = encoded.input_ids.clone()
probability_matrix = torch.full(labels.shape, 0.15)
print(probability_matrix)
special_tokens_mask = normal_tokenizer.get_special_tokens_mask(encoded.input_ids.tolist(), already_has_special_tokens=True)
print(special_tokens_mask)
probability_matrix.masked_fill_(torch.tensor(special_tokens_mask, dtype=torch.bool), value=0.0)
print(probability_matrix)
masked_indices = torch.bernoulli(probability_matrix).bool()
print(masked_indices)
normal_tokenizer.pad()

tensor([[0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500,
         0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500,
         0.1500]])
[0]
tensor([[0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500,
         0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500, 0.1500,
         0.1500]])
tensor([[False, False, False, False, False, False, False,  True, False, False,
         False, False, False,  True, False,  True, False, False, False]])


In [34]:
print(encoded.input_ids)

tensor([[    1, 12325,     8,   434,   935,    68,  2034,     9,    31,    79,
            23,  3467,    57,    51,  1184,    65,  5758,     9,     2]])


In [37]:
print(normal_tokenizer.encode(normal_tokenizer.cls_token, add_special_tokens=False))
print(normal_tokenizer.encode(phonetic_tokenizer.cls_token, add_special_tokens=False))

[1]
[1]


In [95]:
from transformers import (
    PreTrainedTokenizerBase,
    BertForMaskedLM,
    Trainer,
    TrainingArguments,
    BertConfig,
    AutoTokenizer,
)
import numpy as np
from datasets import load_from_disk
from typing import List, Dict, Tuple
import torch
import os
import re
import random
from collections import defaultdict


class CustomDataCollatorForLanguageModeling:
    def __init__(
        self,
        normal_tokenizer: PreTrainedTokenizerBase,
        phonetic_tokenizer: PreTrainedTokenizerBase,
        max_length: int = 128,
        mask_probability: float = 0.15,
    ):
        self.normal_tokenizer = normal_tokenizer
        self.phonetic_tokenizer = phonetic_tokenizer
        self.max_length = max_length
        self.mask_probability = mask_probability
        self.normal_cache = defaultdict(int)
        self.phonetic_cache = defaultdict(int)

    def _create_aligned_masks(
        self,
        normal_text: str,
        phonetic_text: str,
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Create masks following standard BERT masking strategy:
        - Select 15% of tokens for potential masking
        - Of those tokens:
            - 80% are replaced with [MASK]
            - 10% are replaced with random token
            - 10% are left unchanged
        Maintains alignment between normal and phonetic texts

        Args:
            normal_text: Original text
            phonetic_text: Phonetic transcription of the text

        Returns:
            Tuple containing:
            - normal_mask: Masking tensor for normal text
            - phonetic_mask: Masking tensor for phonetic text
            - normal_encoding: Token IDs for normal text
            - phonetic_encoding: Token IDs for phonetic text
        """
        # Split texts into words
        normal_words = re.findall(r"\w+", normal_text, re.UNICODE)
        phonetic_words = phonetic_text.split()

        # Get token lengths for each word
        normal_token_lengths = [self._get_step_size(w, 0) for w in normal_words]
        phonetic_token_lengths = [self._get_step_size(w, 1) for w in phonetic_words]

        # Create cumulative sums for position mapping
        normal_cumsum = np.cumsum([0] + normal_token_lengths[:-1])
        phonetic_cumsum = np.cumsum([0] + phonetic_token_lengths[:-1])

        # Tokenize both texts
        normal_encoding = self.normal_tokenizer(
            normal_text,
            truncation=True,
            add_special_tokens=False,
            max_length=self.max_length // 2,
            return_tensors="pt",
        )["input_ids"]

        phonetic_encoding = self.phonetic_tokenizer(
            phonetic_text,
            truncation=True,
            add_special_tokens=False,
            max_length=self.max_length // 2,
            return_tensors="pt",
        )["input_ids"]

        # Initialize mask tensors (1 for MASK, 2 for random, 3 for unchanged)
        normal_mask = torch.zeros(normal_encoding.size(1), dtype=torch.long)
        phonetic_mask = torch.zeros(phonetic_encoding.size(1), dtype=torch.long)

        # Calculate number of words to mask (15% of the shorter sequence)
        num_words = min(len(normal_words), len(phonetic_words))
        num_to_mask = max(1, int(num_words * self.mask_probability))

        # Randomly select word positions to mask
        mask_indices = random.sample(range(num_words), num_to_mask)

        # Pre-calculate mask types for efficiency
        # 1: MASK, 2: random, 3: unchanged
        mask_types = np.random.choice(
            [1, 2, 3], size=len(mask_indices), p=[0.8, 0.1, 0.1]  
        )

        # Apply masks
        for word_idx, mask_type in zip(mask_indices, mask_types):
            # Mask normal text
            normal_start = normal_cumsum[word_idx]
            normal_end = normal_start + normal_token_lengths[word_idx]
            normal_mask[normal_start:normal_end] = mask_type

            # Mask phonetic text
            phonetic_start = phonetic_cumsum[word_idx]
            phonetic_end = phonetic_start + phonetic_token_lengths[word_idx]
            phonetic_mask[phonetic_start:phonetic_end] = mask_type

        return normal_mask, phonetic_mask, normal_encoding, phonetic_encoding

    def _get_step_size(self, word: str, type: int) -> int:
        """return the number of tokens in a word"""
        cache = self.normal_cache if type == 0 else self.phonetic_cache
        tokenizer = self.normal_tokenizer if type == 0 else self.phonetic_tokenizer
        if word in cache:
            return cache[word]
        tokens = tokenizer(word, add_special_tokens=False)['input_ids']
        cache[word] = len(tokens)
        return cache[word]
        

    def __call__(self, examples: List[Dict[str, str]]) -> Dict[str, torch.Tensor]:
        # Tokenize and process examples
        batch_input_ids, batch_attention_masks, batch_token_type_ids, batch_labels = [], [], [], []

        for example in examples:
            normal_text = example["original_text"]
            phonetic_text = example["text"]

            # Create masks
            normal_mask, phonetic_mask, normal_encoding, phonetic_encoding = (
                self._create_aligned_masks(normal_text, phonetic_text)
            )

            # Combine normal and phonetic text
            final_input_ids = torch.cat(
                [
                    torch.tensor([self.normal_tokenizer.cls_token_id]),  # [CLS]
                    normal_encoding[0],
                    torch.tensor([self.normal_tokenizer.sep_token_id]),  # [SEP]
                    phonetic_encoding[0],
                    torch.tensor([self.normal_tokenizer.sep_token_id]),  # Final [SEP]
                ],
            )

            # Create attention mask
            attention_mask = torch.ones(len(final_input_ids))

            # Create token type IDs
            # +1 for [SEP]
            normal_type_ids = torch.zeros(normal_encoding.size(1))
            phonetic_type_ids = torch.ones(phonetic_encoding.size(1))
            token_type_ids = torch.cat(
                [
                    torch.tensor([0]),
                    normal_type_ids,
                    torch.tensor([0]),
                    phonetic_type_ids,
                    torch.tensor([1]),
                ]
            )

            # Create labels
            labels = final_input_ids.clone()

            # Apply masks
            combined_mask = torch.cat(
                [
                    torch.tensor([0]),  # For [CLS]
                    normal_mask,
                    torch.tensor([0]),  # For [SEP]
                    phonetic_mask,
                    torch.tensor([0]),  # For final [SEP]
                ]
            )

            # Get vocabulary size for random token selection
            vocab_size = len(self.normal_tokenizer.vocab)

            # Apply different masking strategies
            for i in range(len(final_input_ids)):
                if combined_mask[i] == 1:  # 80% - Replace with [MASK]
                    final_input_ids[i] = self.normal_tokenizer.mask_token_id
                elif combined_mask[i] == 2:  # 10% - Replace with random token
                    final_input_ids[i] = random.randint(0, vocab_size - 1)

            # Set labels
            labels = torch.where(combined_mask > 0, labels, -100)

            # Pad if necessary
            if len(final_input_ids) < self.max_length:
                padding_length = self.max_length - len(final_input_ids)
                attention_mask = torch.cat([attention_mask, torch.zeros(padding_length)])
                token_type_ids = torch.cat([token_type_ids, torch.zeros(padding_length)])
                labels = torch.cat([labels, torch.tensor([-100] * padding_length)])
                final_input_ids = torch.cat(
                    [
                        final_input_ids,
                        torch.tensor([self.normal_tokenizer.pad_token_id] * padding_length),
                    ]
                )

            # Add to batch
            batch_input_ids.append(final_input_ids)
            batch_attention_masks.append(attention_mask)
            batch_token_type_ids.append(token_type_ids)
            batch_labels.append(labels)

        # Stack tensors
        return {
            "input_ids": torch.stack(batch_input_ids).long(),
            "attention_mask": torch.stack(batch_attention_masks).long(),
            "token_type_ids": torch.stack(batch_token_type_ids).long(),
            "labels": torch.stack(batch_labels).long(),
        }

In [96]:
dataloader = CustomDataCollatorForLanguageModeling(normal_tokenizer, phonetic_tokenizer)

In [97]:
dataset = load_from_disk("/home/toure215/BERT_phonetic/DATASETS/phonetic_wikitext")
dataset["train"][:5]

{'text': ['v{lkIr\\i@ kr\\An@k@lz ajIi',
  'sEndZ now v{lkIr\\i@   Vnr\\IkOr\\dId kr\\An@k@lz dZ{p@niz    lIt  v{lkIr\\i@ Vv D@ b{t@lfild   kAm@nli r\\@fr\\=d t@ {z v{lkIr\\i@ kr\\An@k@lz ajIi awtsajd dZ@p{n  Iz @ t{ktIk@l r\\owl plejIN vIdiow gejm dIvEl@pt baj sig@ {nd midi@  vIZ@n fOr\\ D@ plejstejS@n pOr\\t@b@l  r\\ilist In dZ{njuEr\\i  In dZ@p{n  It Iz D@ Tr\\=d gejm In D@ v{lkIr\\i@ sIr\\iz  EmplojIN D@ sejm fjuZ@n Vv t{ktIk@l {nd r\\il tajm gAmplej {z Its pr\\Ed@sEsr\\=z  D@ stOr\\i r\\Vnz pEr\\@lEl t@ D@ fr\\=st gejm {nd fAlowz D@  nejml@s   @ pin@l',
  'mIl@tEr\\i jun@t sr\\=vIN D@ nejS@n Vv g{li@ dUr\\IN D@ sEk@nd jUr\\owp{n wOr\\ hu pr\\=fOr\\m sikr\\@t bl{k Apr\\=ejS@nz {nd Ar\\ pIt@d @gEnst D@ ImpIr\\i@l jun@t  k@l@m@ti r\\ejv@n  ',
  'D@ gejm bIg{n dIvEl@pm@nt In   k{r\\iIN owvr\\= @ lAr\\dZ pOr\\S@n Vv D@ wr\\=k dVn An v{lkIr\\i@ kr\\An@k@lz Ii  wajl It r\\Itejnd D@ st{ndr\\=d fitSr\\=z Vv D@ sIr\\iz  It Olsow Vndr\\=wEnt mVlt@p@l @dZVstm@nts  sVtS {z mejkIN D@ gejm mOr\\

In [98]:
import re

text = "Hello, my name is Paulevec. I am a student at the University of Toronto."
def split_words_and_punctuation(text):
    return re.findall(r'\w+', text, re.UNICODE)

text_list = split_words_and_punctuation(text)
text = " ".join(text_list)
print(text)

phonetic_text = " ".join("".join(epi.xsampa_list(word)) for word in text_list)
print(phonetic_text)

Hello my name is Paulevec I am a student at the University of Toronto
h@low maj nejm Iz pOlIvIk aj {m @ stud@nt {t D@ jun@vr\=s@ti Vv tr\=Antow


In [99]:
inputs = [
    {"original_text": text, "text": phonetic_text},
]
normal_ids = normal_tokenizer(text, add_special_tokens=False, padding=False, truncation=True, max_length=50)["input_ids"]
phonetic_ids = phonetic_tokenizer(phonetic_text, add_special_tokens=False, padding=False, truncation=True, max_length=50)["input_ids"]

normal_tokens = normal_tokenizer.convert_ids_to_tokens(normal_ids)
phonetic_tokens = phonetic_tokenizer.convert_ids_to_tokens(phonetic_ids)

In [100]:
print(normal_tokens)
print(phonetic_tokens)
print(normal_ids)
print(phonetic_ids)
print(normal_tokenizer.decode(3))

['hello', 'my', 'name', 'is', 'paul', '##ev', '##ec', 'i', 'am', 'a', 'student', 'at', 'the', 'university', 'of', 'toronto']
['h', '@', 'low', 'maj', 'nejm', 'iz', 'poli', '##vik', 'aj', '{', 'm', '@', 'stud', '@', 'nt', '{', 't', 'd', '@', 'jun', '@', 'vr', '\\=', 's', '@', 'ti', 'vv', 'tr', '\\=', 'antow']
[20585, 950, 1140, 161, 2138, 2635, 116, 31, 298, 23, 3772, 159, 88, 1301, 100, 6221]
[14, 6, 265, 379, 416, 97, 12371, 11357, 96, 30, 19, 6, 652, 6, 87, 30, 25, 10, 6, 254, 6, 233, 56, 24, 6, 101, 64, 82, 56, 3290]
[PAD]


In [101]:
dataloader(inputs)

{'input_ids': tensor([[    1, 20585,   950,  1140,   161,  2138,  2635,   116,    31, 21618,
             23,  3772,   159,    88,     4,   100,  6221,     2,    14,     6,
            265,   379,   416,    97, 12371, 11357,    96,  4743, 28520,     6,
            652,     6,    87,    30,    25,    10,     6,     4,     4,     4,
              4,     4,     4,     4,    64,    82,    56,  3290,     2,     3,
              3,     3,     3,     3,     3,     3,     3,     3,     3,     3,
              3,     3,     3,     3,     3,     3,     3,     3,     3,     3,
              3,     3,     3,     3,     3,     3,     3,     3,     3,     3,
              3,     3,     3,     3,     3,     3,     3,     3,     3,     3,
              3,     3,     3,     3,     3,     3,     3,     3,     3,     3,
              3,     3,     3,     3,     3,     3,     3,     3,     3,     3,
              3,     3,     3,     3,     3,     3,     3,     3,     3,     3,
              3,     3,    

In [105]:
tokenizer_ipa = AutoTokenizer.from_pretrained("/home/toure215/BERT_phonetic/tokenizers/tokenizer_phonetic_BPE_IPA")

In [106]:
text = "Hello, my name is Paulevec. I am a student at the University of Toronto."
text = epi.transliterate(text)
tokenized_text = tokenizer_ipa(text)
tokens = tokenizer_ipa.convert_ids_to_tokens(tokenized_text["input_ids"])
print(tokens)

['[CLS]', 'həlow', ',', 'maj', 'nejm', 'ɪz', 'pɔl', 'ɪvɪk', '.', 'aj', 'æm', 'ə', 'studənt', 'æt', 'ðə', 'junəvɹsəti', 'ʌv', 'tɹɑntow', '.', '[SEP]']
