# Char-level model

### Constant 

In [1]:
# For PC assign 0 for colab assign 1
PC_OR_COLAB = 0

# Resolve the base path depending on your running environment
colab_base = '/content/drive/MyDrive/ashraful/paper-1/'
pc_base = './'

if PC_OR_COLAB == 1:
    base = colab_base
else:
    base = pc_base


In [2]:
ALL = 0
WORD_LEVEL = 1
ENCODER_DECODER = 2

test_dataset_path = base + 'dataset/test-dataset.txt'
dataset_paths = [test_dataset_path]

input_tokenizer_dir = base + 'dataset/input-tokenizer_char.pickle'
target_tokenizer_dir = base + 'dataset/target-tokenizer_char.pickle'

# char-level-model paths
model_weights_path = base + 'saved-weights/char-level-model/weights'
input_tokenizer_dir_char = base + 'dataset/input-tokenizer_char.pickle'
target_tokenizer_dir_char = base + 'dataset/target-tokenizer_char.pickle'

# word-level-model paths
saved_model_word = base + 'saved-weights/word-level-model/word-level.h5'
input_tokenizer_dir_word = base + 'dataset/input-tokenizer_word.pickle'
target_tokenizer_dir_word = base + 'dataset/target-tokenizer_word.pickle'


In [3]:
# If you are using colab then this is meaningful
if PC_OR_COLAB == 1:
    from google.colab import drive
    drive.mount('/content/drive')
    !pip install tensorflow-addons


In [4]:
import tensorflow as tf
import tensorflow_addons as tfa

from sklearn.model_selection import train_test_split

import unicodedata
import re
import numpy as np
import os
import io
import time
import pickle
import numpy as np
import urllib3
import shutil
import zipfile
import itertools
from threading import Thread
import random


### Create Dataset

In [5]:
# Splitting dataset into train test
lines = list()

for path in dataset_paths:
    lines.extend(io.open(path, encoding='UTF-8').read().strip().split('\n'))

# lines = list(lines)
lines.sort()
print(len(lines))


10000


In [6]:
word_pairs = [[w for w in l.split(',')] for l in lines]

print(word_pairs[0][0])
print(word_pairs[0][1])

inp_lang, targ_lang = zip(*word_pairs)
inp_lang_train, inp_lang_val, targ_lang_train, targ_lang_val = \
    train_test_split(inp_lang, targ_lang, test_size=0.2, random_state=4651)


abagprobon
আবেগপ্রবণ


### Char-Level Model

In [7]:
# Retrieving tokenizer for char-level
with open(input_tokenizer_dir_char, mode='rb') as data_file:
    inp_lang_tokenizer_char = pickle.load(data_file)
with open(target_tokenizer_dir_char, mode='rb') as data_file:
    tar_lang_tokenizer_char = pickle.load(data_file)


In [8]:
BATCH_SIZE = 64

vocab_inp_size = len(inp_lang_tokenizer_char.word_index) + 1
vocab_tar_size = len(tar_lang_tokenizer_char.word_index) + 1
max_length_input = 20
max_length_output = 20

# print(inp_lang_tokenizer_char.word_index)
# print(tar_lang_tokenizer_char.word_index)

embedding_dims = 32
rnn_units = 256
dense_units = 256
Dtype = tf.float32

Tx = 20
Ty = 20

# print(vocab_inp_size, vocab_tar_size)


In [10]:

class MyModel(tf.keras.Model):
    def __init__(self, input_vocab_size, output_vocab_size, embedding_dims, rnn_units):
        super().__init__()
        # Encoder
        self.input_vocab_size = input_vocab_size
        self.encoder_embedding = tf.keras.layers.Embedding(input_dim=input_vocab_size,
                                                           output_dim=embedding_dims)
        self.encoder_rnnlayer1 = tf.keras.layers.Bidirectional(
            tf.keras.layers.LSTM(rnn_units, return_sequences=True))
        self.encoder_rnnlayer2 = tf.keras.layers.LSTM(rnn_units,
                                                      return_sequences=True,
                                                      return_state=True)
        self.encoder_norm = tf.keras.layers.BatchNormalization()

        # Decoder
        self.output_vocab_size = output_vocab_size
        self.decoder_embedding = tf.keras.layers.Embedding(input_dim=output_vocab_size,
                                                           output_dim=embedding_dims)
        self.dense_layer = tf.keras.layers.Dense(output_vocab_size)
        self.decoder_rnncell = tf.keras.layers.LSTMCell(rnn_units)
        # Sampler
        self.sampler = tfa.seq2seq.sampler.TrainingSampler()
        self.attention_mechanism = self.build_attention_mechanism(
            dense_units, None, BATCH_SIZE*[Tx])
        self.rnn_cell = self.build_rnn_cell(BATCH_SIZE)
        self.decoder = tfa.seq2seq.BasicDecoder(self.rnn_cell,
                                                sampler=self.sampler,
                                                output_layer=self.dense_layer)

        self.attention_mechanism.memory_initialized
        self.decoder_embedding_matrix = None

    def initialize_initial_state(self):
        self.initial_state = [
            tf.zeros((BATCH_SIZE, rnn_units)), tf.zeros((BATCH_SIZE, rnn_units))]

    def build_attention_mechanism(self, units, memory, memory_sequence_length):
        return tfa.seq2seq.LuongAttention(units,
                                          memory=memory,
                                          memory_sequence_length=memory_sequence_length)
        # return tfa.seq2seq.BahdanauAttention(units, memory = memory, memory_sequence_length=memory_sequence_length)

    # wrap decoder rnn cell
    def build_rnn_cell(self, batch_size):
        rnn_cell = tfa.seq2seq.AttentionWrapper(self.decoder_rnncell, self.attention_mechanism,
                                                attention_layer_size=dense_units)
        return rnn_cell

    def build_decoder_initial_state(self, batch_size, encoder_state, Dtype):
        decoder_initial_state = self.rnn_cell.get_initial_state(batch_size=batch_size,
                                                                dtype=Dtype)
        decoder_initial_state = decoder_initial_state.clone(
            cell_state=encoder_state)
        return decoder_initial_state

    def call(self, inputs, training=False):
        encoder_input, decoder_input = inputs

        x = self.encoder_embedding(encoder_input)
        x = self.encoder_rnnlayer1(x)
        x = self.encoder_norm(x, training=training)
        a, a_tx, c_tx = self.encoder_rnnlayer2(x)

        decoder_emb_inp = self.decoder_embedding(decoder_input)
        self.attention_mechanism.setup_memory(a)
        decoder_initial_state = self.build_decoder_initial_state(BATCH_SIZE,
                                                                 encoder_state=[
                                                                     a_tx, c_tx],
                                                                 Dtype=tf.float32)

        outputs, _, _ = self.decoder(decoder_emb_inp,
                                     initial_state=decoder_initial_state,
                                     sequence_length=BATCH_SIZE*[Ty-1])

        return outputs

    def evaluate(self, inputs, beam_width=3):
        if self.decoder_embedding_matrix is None:
            self.decoder_embedding_matrix = tf.train.load_variable(
                model_weights_path, 'decoder_embedding/embeddings/.ATTRIBUTES/VARIABLE_VALUE')
            print(self.decoder_embedding_matrix.shape)

        inference_batch_size = inputs.shape[0]
        # print(inputs.shape)

        x = self.encoder_embedding(inputs)
        # x = tf.one_hot(inputs, depth=self.input_vocab_size)
        x = self.encoder_rnnlayer1(x)
        x = self.encoder_norm(x, training=False)
        enc_out, enc_h, enc_c = self.encoder_rnnlayer2(x)

        dec_h = enc_h
        # dec_c = enc_c

        start_tokens = tf.fill([inference_batch_size],
                               tar_lang_tokenizer_char.word_index['<'])
        end_token = tar_lang_tokenizer_char.word_index['>']

        enc_out = tfa.seq2seq.tile_batch(enc_out, multiplier=beam_width)
        self.attention_mechanism.setup_memory(enc_out)

        hidden_state = tfa.seq2seq.tile_batch(
            [enc_h, enc_c], multiplier=beam_width)
        decoder_initial_state = self.rnn_cell.get_initial_state(batch_size=beam_width * inference_batch_size,
                                                                dtype=tf.float32)
        decoder_initial_state = decoder_initial_state.clone(
            cell_state=hidden_state)

        decoder_instance = tfa.seq2seq.BeamSearchDecoder(self.rnn_cell,
                                                         beam_width=beam_width,
                                                         output_layer=self.dense_layer)
        decoder_instance.maximum_iterations = tf.round(tf.reduce_max(Tx) * 2)

        outputs, final_state, sequence_lengths = decoder_instance(self.decoder_embedding_matrix,
                                                                  start_tokens=start_tokens,
                                                                  end_token=end_token,
                                                                  initial_state=decoder_initial_state)

        final_outputs = tf.transpose(outputs.predicted_ids, perm=(0, 2, 1))
        beam_scores = tf.transpose(
            outputs.beam_search_decoder_output.scores, perm=(0, 2, 1))

        return final_outputs.numpy(), beam_scores.numpy()


model = MyModel(vocab_inp_size, vocab_tar_size, embedding_dims, rnn_units)
model.load_weights(filepath=model_weights_path)


<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f282017abb0>

In [11]:
def get_bangla(array):
    bangla_list = list(
        map(lambda x: tar_lang_tokenizer_char.index_word[x] if x != 0 else '', array))
    bangla_list.append('>')
    return "".join(bangla_list[0:bangla_list.index('>')])


In [12]:
def preprocess_char(word):
    word = [[char for char in ('<' + word.rstrip().lstrip() + '>')]]
    word = inp_lang_tokenizer_char.texts_to_sequences(word)
    inputs = tf.keras.preprocessing.sequence.pad_sequences(word,
                                                           padding='post',
                                                           maxlen=20,
                                                           truncating='post')
    # print(inputs)
    return tf.convert_to_tensor(inputs)


In [14]:
def predict_for_char(english_word, result, k=10):
    # start = time.time()
    outputs, score = model.evaluate(preprocess_char(english_word), k)
    outputs = [list(map(get_bangla, output)) for output in outputs]
    result[0] = outputs[0]

    # print(f'Time taken: {(time.time() - start)*1000:.2f} ms\n')


In [70]:
predictions = [None]
predict_for_char("desher", predictions)
print(predictions[0])


['দেশের', 'দ্যাশের', 'দাসের', 'দাশের', 'দশের', 'ডিসের', 'ডিশের', 'দেশার', 'দেশীর', 'দ্বেষের']


### Word-level

In [16]:
# Loading word-level model
word_model = tf.keras.models.load_model(saved_model_word)
word_model.summary()


Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_1 (Embedding)     (None, 12, 32)            928       
                                                                 
 bidirectional_1 (Bidirectio  (None, 256)              164864    
 nal)                                                            
                                                                 
 dense_4 (Dense)             (None, 128)               32896     
                                                                 
 dense_5 (Dense)             (None, 64)                8256      
                                                                 
 dense_6 (Dense)             (None, 32)                2080      
                                                                 
 dense_7 (Dense)             (None, 50001)             1650033   
                                                      

In [20]:
# Retrieving tokenizer for word-level
with open(input_tokenizer_dir_word, mode='rb') as data_file:
    inp_lang_tokenizer_word = pickle.load(data_file)
with open(target_tokenizer_dir_word, mode='rb') as data_file:
    tar_lang_tokenizer_word = pickle.load(data_file)


In [21]:
def preprocess_word(word):
    word = [[char for char in ('<' + word.rstrip().lstrip() + '>')]]
    word = inp_lang_tokenizer_word.texts_to_sequences(word)
    inputs = tf.keras.preprocessing.sequence.pad_sequences(word,
                                                           padding='post',
                                                           maxlen=12,
                                                           truncating='post')

    return tf.convert_to_tensor(inputs)


In [22]:
def predict_for_word(input, result, k=10):
    if len(input) > 10:
        result[0] = []
        return

    predictions = tf.math.top_k(tf.reshape(
        word_model.predict(preprocess_word(input)), [-1]), k)
    values = predictions.values.numpy()
    indices = predictions.indices.numpy()

    for i in range(1, k):
        # print(values[i-1]/values[i])
        if values[i - 1] / values[i] > 5 or values[i] <= 0.20:
            result[0] = (list(
                map(lambda x: tar_lang_tokenizer_word.index_word[x], indices[0:i])), values[0:i])
            return

    result[0] = (
        list(map(lambda x: tar_lang_tokenizer_word.index_word[x], indices)), values)


In [23]:
predictions = [None]
predict_for_word("kumir", predictions)
print(predictions[0])


(['কুমীর'], array([0.8008342], dtype=float32))


## Shabdik-Model

In [24]:
# Fixed mapping for single character word
single_word_dic = {
    'a': ['আ', 'া', 'অ', 'এ', 'ে', 'আঃ', 'া' + 'ঁ', 'আ' + 'ঁ'],
    'b': ['ব', 'ভ', '্' + 'ব', 'ব' + 'ি'],
    'c': ['চ', 'ছ', 'ক', 'ঁ', '্' + 'চ', '্' + 'ছ', 'স' + 'ি'],
    'd': ['ড', 'দ', 'ধ', 'দঃ', 'ডঃ', 'ঢ', '্' + 'ধ', 'ড' + 'ি', 'দ' + 'ি'],
    'e': ['এ', 'ে', 'আ', 'া', 'ই', 'ি', 'ঈ', 'ী', '্' + 'য' + 'া'],
    'f': ['ফ', 'প', '্' + 'ফ', '্' + 'প', 'এ' + 'ফ'],
    'g': ['গ', 'ঘ', 'য', 'জ', 'ং', 'ঞ', '্', 'জ' + 'ি'],
    'h': ['হ', '্', 'এ' + 'ই' + 'চ'],
    'i': ['ই', 'ি', 'ঈ', 'ী', 'ৈ', 'আ' + 'ই'],
    'j': ['জ', 'য', 'ঝ', 'জ্ব', '্' + 'জ', '্' + 'য', '্' + 'ঝ', 'জ' + 'ে'],
    'k': ['ক', 'খ', 'ক্ষ', '্' + 'ক', '্' + 'খ', 'ক' + 'ে'],
    'l': ['ল', '্' + 'ল', 'এ' + 'ল'],
    'm': ['ম', '্' + 'ম', 'এ' + 'ম'],
    'n': ['ন', 'ণ', 'ঙ', 'ঞ', 'ং', '্' + 'ন', '্' + 'ণ', '্' + 'ঙ', 'এ' + 'ন'],
    'o': ['ও', 'ঐ', 'ো', 'য়', 'ঃ', 'ওঁ', 'ঔ', 'ৌ', 'ৈ'],
    'p': ['প', 'ফ', '্' + 'প', '্' + 'ফ', 'প' + 'ি'],
    'q': ['ক', '্' + 'ক', 'ক' + 'ি' + 'উ'],
    'r': ['র', 'ঢ়', 'ড়', 'ঋ', 'ৃ', '্' + 'র', 'র' + '্', 'আ' + 'র'],
    's': ['স', 'শ', 'ষ', 'সঃ', '্' + 'স', '্' + 'শ', '্' + 'ষ', 'এ' + 'স'],
    't': ['ট', 'ঠ', 'ত', 'থ', 'তঁ', 'ৎ', '্' + 'ট', '্' + 'ঠ', 'ট' + 'ি'],
    'u': ['উ', 'ু', 'ঊ', 'ূ', 'উঃ', 'উঁ', 'ই' + 'উ'],
    'v': ['ভ', 'ব', 'ভঁ', '্' + 'ভ', '্' + 'ব', 'ভ' + 'ি'],
    'w': ['ও', 'ৌ', 'ওঃ', 'ওঁ', 'ড' + 'া' + 'ব' + 'ল' + 'ু'],
    'x': ['ক্স', 'এক্স'],
    'y': ['য়', 'ইয়', 'ই', 'ি', 'ঈ', 'ী', 'ে', 'ও' + 'য়' + 'া' + 'ই'],
    'z': ['য', 'জ', 'ঝ', '্' + 'য', '্' + 'জ', '্' + 'ঝ', 'জ' + 'এ' + 'ড', 'জ' + 'ি']
}

for key in single_word_dic:
    for i in range(len(single_word_dic[key]), 9):
        single_word_dic[key].append('')


In [25]:
def predict(english_word, option=ALL, k=10):
    if type(english_word) != type(''):
        return []

    english_word = english_word.lower()

    if len(english_word) == 0:
        return []

    # print(english_word)

    if len(english_word) == 1:
        return single_word_dic[english_word][0:k]

    word_result = [None]
    char_result = [None]

    if option == WORD_LEVEL:
        if len(english_word) > 10:
            return []
        predict_for_word(english_word, word_result, k)
        # print(word_result)
    elif option == ENCODER_DECODER or (option == ALL and len(english_word) > 10):
        predict_for_char(english_word, char_result, k)
    else:
        t1 = Thread(target=predict_for_char, args=(
            english_word, char_result, k, ))
        t1.start()
        t2 = Thread(target=predict_for_word, args=(
            english_word, word_result, k, ))
        t2.start()
        t2.join()
        t1.join()

    char_values = np.array([i for i in range(k+1, 0, -1)])
    char_values = char_values / np.sum(char_values)
    results = {}

    # print(word_result[0])
    # print(char_result[0])

    word_result = word_result[0]
    char_result = char_result[0]

    # print(char_result)
    # print(word_result)

    if word_result is not None and len(word_result) > 0:
        for i in range(len(word_result[1])):
            # if word_result[0][i] in results:
            results[word_result[0][i]] = word_result[1][i]

    if char_result is not None:
        for i in range(len(char_result)):
            if char_result[i] in results:
                results[char_result[i]] += char_values[i]
            else:
                results[char_result[i]] = char_values[i]

    return [key for (key, value) in sorted(results.items(), key=lambda x: x[1], reverse=True)[:k]]


In [26]:
def get_result(true, prediction):
    results = [0, 0, 0, 0]
    if prediction is None or len(prediction) == 0:
        return results

    if true == prediction[0]:
        results[0] += 1
    if true in prediction[0:3]:
        results[1] += 1
    if true in prediction[0:5]:
        results[2] += 1
    if true in prediction:
        results[3] += 1
    return results


In [79]:
start = time.time()
predictions = [predict(inp_lang[i], k=10) for i in range(len(inp_lang))]
combined_results = [get_result(targ_lang[i], predictions[i])
                    for i in range(len(inp_lang))]
combined_results = np.array(combined_results)
print("combined", np.sum(combined_results,axis=0) / len(combined_results))
print(f'Time taken for {i+1} examples: {(time.time() - start):.2f} s\n')


combined [0.8913 0.9408 0.9526 0.9657]
Time taken for 9 examples: 0.03 s



In [63]:
start = time.time()
print(predict("pani", k=10))
print(f'Time taken: {(time.time() - start):.2f} s\n')


['পনি', 'পানি', 'পানী', 'পিনি', 'প্যানি', 'পেনি', 'পাণী', 'পণি', 'পানই', 'পেনী']
Time taken: 0.30 s

