<a href="https://colab.research.google.com/github/gyasifred/msc-thesis/blob/main/build_subword_tokenizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q -U "tensorflow-text==2.8.*"

In [2]:
import tensorflow_text as text
import tensorflow as tf
import os
import pathlib
import numpy as np
import time
# import BERT tool for building tokenizer
from tensorflow_text.tools.wordpiece_vocab import bert_vocab_from_dataset as bert_vocab

In [3]:
# This code was adapted from https://github.com/GhanaNLP/kasa/blob/master/Kasa/Preprocessing.py
# A subclass of the TFT  for preprocessing data
# import required library
import re
import unicodedata


class Preprocessing:
    # dummy initialization method
    def __init__(self):
        # initialize with some default parameters here later
        pass

    # read in parallel twi - english dataset
    def read_parallel_dataset(self, filepath_twi, filepath_french):

        # read french data
        french_data = []
        with open(filepath_french, encoding='utf-8') as file:
            line = file.readline()
            cnt = 1
            while line:
                french_data.append(line.strip())
                line = file.readline()
                cnt += 1

        # read twi data
        twi_data = []
        with open(filepath_twi, encoding='utf-8') as file:

            # twi=file.read()
            line = file.readline()
            cnt = 1
            while line:
                twi_data.append(line.strip())
                line = file.readline()
                cnt += 1

        return twi_data, french_data

    # Define a helper function to remove string accents

    def removeStringAccent(self, s):
        return ''.join(
            c for c in unicodedata.normalize('NFD', s)
            if unicodedata.category(c) != 'Mn'
        )

    # normalize input twi sentence
    def normalize_twi(self, s):
        s = self.removeStringAccent(s)
        s = re.sub(r'([!.?])', r' \1', s)
        s = re.sub(r'[^a-zA-Z.ƆɔɛƐ!?’]+', r' ', s)
        s = re.sub(r'\s+', r' ', s)
        return s

    # normalize input french sentence
    def normalize_fr(self, s):
        s = self.removeStringAccent(s)
        s = re.sub(r'([!.?])', r' \1', s)
        s = re.sub(r'[^a-zA-Z.!?]+', r' ', s)
        s = re.sub(r'\s+', r' ', s)
        return s


In [4]:
# import preprocessing class 

#from tft.preprocessing import Preprocessing # note form of library import

# Create an instance of tft preprocessing class
TwiFrPreprocessor = Preprocessing()

# Read raw parallel dataset
raw_data_twi,raw_data_fr = TwiFrPreprocessor.read_parallel_dataset(
        filepath_twi='/content/verified_twi.txt',
        filepath_french='/content/verified_french.txt')

# Normalize the raw data
raw_data_fr = [TwiFrPreprocessor.normalize_fr(data) for data in raw_data_fr]
raw_data_twi = [TwiFrPreprocessor.normalize_twi(data) for data in raw_data_twi]

In [5]:
# write the preprocess dataset to a file
with open('training_twi.txt', 'w') as f:
    for line in raw_data_twi:
        f.write(f"{line}\n")

with open('training_fr.txt', 'w') as f:
    for line in raw_data_fr:
        f.write(f"{line}\n")

In [6]:
# build TF datasets from input sentences in both languages
lines_dataset_fr = tf.data.TextLineDataset('/content/training_fr.txt')
lines_dataset_tw = tf.data.TextLineDataset('/content/training_twi.txt')

In [7]:
# combine languages into single dataset
combined = tf.data.Dataset.zip((lines_dataset_tw, lines_dataset_fr))

In [8]:
# set tokenizer parameters and add reserved tokens; input files already lower-cased, but
# lower_case option does NFD normalization, which is needed
bert_tokenizer_params=dict(lower_case=True)
reserved_tokens=["[PAD]", "[UNK]", "[START]", "[END]"]
# main parameter here that could be tuned is vocab size
bert_vocab_args = dict(
  # The target vocabulary size
  vocab_size = 10000,
  # Reserved tokens that must be included in the vocabulary
  reserved_tokens=reserved_tokens,
  # Arguments for `text.BertTokenizer`
  bert_tokenizer_params=bert_tokenizer_params,
  # Arguments for `wordpiece_vocab.wordpiece_tokenizer_learner_lib.learn`
  learn_params={},
  )

In [9]:
# build French vocab file (takes several mins)
# this is the bert_vocab module building its vocab file from the raw French sentences
%%time
fr_vocab = bert_vocab.bert_vocab_from_dataset(
    lines_dataset_fr,
    **bert_vocab_args
    )

CPU times: user 39 s, sys: 1.24 s, total: 40.3 s
Wall time: 43.2 s


In [10]:
# confirm French sub-word vocab built correctly (last line will look strange; this is expected with
# bert_vocab)
print(fr_vocab[:10])
print(fr_vocab[1000:1010])
print(fr_vocab[-10:])
print(len(fr_vocab))

['[PAD]', '[UNK]', '[START]', '[END]', '!', '.', '?', 'a', 'b', 'c']
['amour', 'arrivera', 'avaient', 'batiment', 'blanc', 'canadien', 'completement', 'court', 'danser', 'debut']
['vouloir', 'voulu', 'vus', '##!', '##.', '##?', '##j', '##q', '##v', '##w']
2093


In [11]:
# write French vocab to file
# this file will be used to build tokenizer
def write_vocab_file(filepath, vocab):
  with open(filepath, 'w') as f:
    for token in vocab:
      print(token, file=f)

In [12]:
write_vocab_file('fr_vocab.txt', fr_vocab)

In [13]:
# build Twi vocab file 
%%time
twi_vocab = bert_vocab.bert_vocab_from_dataset(
lines_dataset_tw,
**bert_vocab_args
)

CPU times: user 27.1 s, sys: 1.3 s, total: 28.4 s
Wall time: 21.7 s


In [14]:
# confirm TWI sub-word vocab built correctly (last line will look strange; this is expected with
# bert_vocab)
print(twi_vocab[:10])
print(twi_vocab[100:110])
print(twi_vocab[1000:1010])
print(twi_vocab[-10:])
print(len(twi_vocab))

['[PAD]', '[UNK]', '[START]', '[END]', '!', '.', '?', 'a', 'b', 'c']
['di', 'paa', 'sen', 'ɔyɛ', 'misusuw', 'bɛn', 'pɛ', '##m', 'kwan', 'ankasa']
['gyaade', 'kotoku', 'mabrɛ', 'mansusuw', 'menom', 'nnera', 'nokwasɛm', 'nɔma', 'pefee', 'tia']
['##.', '##?', '##b', '##j', '##p', '##q', '##v', '##x', '##z', '##’']
1993


In [15]:
# write TWI vocab to file and confirm both vocab files now present
# this file will be used to build tokenizer
write_vocab_file('twi_vocab.txt', twi_vocab)

In [20]:
# Build full language-agnostic tokenizer class; this is standard Google code
# import BERT tool for building tokenizer
from tensorflow_text.tools.wordpiece_vocab import bert_vocab_from_dataset as bert_vocab
import tensorflow as tf
import tensorflow_text as text
import pathlib
import numpy as np
import re


class CustomTokenizer(tf.Module):
    def __init__(self, reserved_tokens, vocab_path):
        self.tokenizer = text.BertTokenizer(vocab_path, lower_case=True)
        self._reserved_tokens = reserved_tokens
        self._vocab_path = tf.saved_model.Asset(vocab_path)
        vocab = pathlib.Path(vocab_path).read_text().splitlines()
        self.vocab = tf.Variable(vocab)
        # Create the signatures for export:
        # Include a tokenize signature for a batch of strings.
        self.tokenize.get_concrete_function(
            tf.TensorSpec(shape=[None], dtype=tf.string))
        # Include `detokenize` and `lookup` signatures for:
        # * `Tensors` with shapes [tokens] and [batch, tokens]
        # * `RaggedTensors` with shape [batch, tokens]
        self.detokenize.get_concrete_function(
            tf.TensorSpec(shape=[None, None], dtype=tf.int64))
        self.detokenize.get_concrete_function(
            tf.RaggedTensorSpec(shape=[None, None], dtype=tf.int64))
        self.lookup.get_concrete_function(
            tf.TensorSpec(shape=[None, None], dtype=tf.int64))
        self.lookup.get_concrete_function(
            tf.RaggedTensorSpec(shape=[None, None], dtype=tf.int64))
        # These `get_*` methods take no arguments
        self.get_vocab_size.get_concrete_function()
        self.get_vocab_path.get_concrete_function()
        self.get_reserved_tokens.get_concrete_function()

    @tf.function
    def add_start_end(self,ragged):
        START = tf.argmax(tf.constant(self._reserved_tokens) == "[START]")
        END = tf.argmax(tf.constant(self._reserved_tokens) == "[END]")
        count = ragged.bounding_shape()[0]
        starts = tf.fill([count, 1], START)
        ends = tf.fill([count, 1], END)
        return tf.concat([starts, ragged, ends], axis=1)

    # Function to remove reserved tokens after detokenization
    @tf.function
    def cleanup_text(self, reserved_tokens,token_txt):
        # Drop the reserved tokens, except for "[UNK]".
        bad_tokens = [re.escape(tok)
                      for tok in reserved_tokens if tok != "[UNK]"]
        bad_token_re = "|".join(bad_tokens)
        bad_cells = tf.strings.regex_full_match(token_txt, bad_token_re)
        result = tf.ragged.boolean_mask(token_txt, ~bad_cells)
        # Join them into strings.
        result = tf.strings.reduce_join(result, separator=' ', axis=-1)
        return result

    @tf.function
    def tokenize(self, strings):
        enc = self.tokenizer.tokenize(strings)
        # Merge the `word` and `word-piece` axes.
        enc = enc.merge_dims(-2, -1)
        enc = self.add_start_end(enc)
        return enc

    @tf.function
    def detokenize(self, tokenized):
        words = self.tokenizer.detokenize(tokenized)
        return self.cleanup_text(self._reserved_tokens, words)

    @tf.function
    def lookup(self, token_ids):
        return tf.gather(self.vocab, token_ids)

    @tf.function
    def get_vocab_size(self):
        return tf.shape(self.vocab)[0]

    @tf.function
    def get_vocab_path(self):
        return self._vocab_path

    @tf.function
    def get_reserved_tokens(self):
        return tf.constant(self._reserved_tokens)


In [22]:
# Instantiate tokenizer class for both TWI and FRENCH
tokenizers = tf.Module()
tokenizers.fr = CustomTokenizer(reserved_tokens, 'fr_vocab.txt')
tokenizers.twi = CustomTokenizer(reserved_tokens, 'twi_vocab.txt')

In [23]:
# Save tokenizer model
model_name = '/content/drive/MyDrive/translate_fr_twi_converter'
tf.saved_model.save(tokenizers, model_name)

In [24]:
# Verify tokenizer model can be reloaded
tokenizers = tf.saved_model.load(model_name)
tokenizers.fr.get_vocab_size().numpy()

2093

In [25]:
# Verify tokenizer works on test sentence
tokens = tokenizers.fr.tokenize(['je suis étudiant'])
text_tokens = tokenizers.fr.lookup(tokens)
text_tokens

<tf.RaggedTensor [[b'[START]', b'je', b'suis', b'etudiant', b'[END]']]>

In [26]:
# Remove token markers to get original sentence
round_trip = tokenizers.fr.detokenize(tokens)
print(round_trip.numpy()[0].decode('utf-8'))

je suis etudiant
