In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pathlib
import random
import string
import re
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

In [None]:
text_file = "/content/drive/MyDrive/Speech Conversion/chatgaiya-v4.txt"
text_pairs = []

for line in open(text_file):
    if '\t' not in line:
        continue
    eng,ctg = line.rstrip().split("\t")
    ctg = "[start] " + ctg + " [end]"
    text_pairs.append((eng, ctg))


In [None]:
random.shuffle(text_pairs)
num_val_samples = int(0.05 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

4913 total pairs
4423 training pairs
245 validation pairs
245 test pairs


In [None]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 15000
sequence_length = 20
batch_size = 64


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


source_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
target_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)

train_source_texts = [pair[0] for pair in train_pairs]
train_target_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_source_texts)
target_vectorization.adapt(train_target_texts)

print(len(train_source_texts),len(train_target_texts))


4423 4423


In [None]:

def format_dataset(source, target):
    source = source_vectorization(source)
    target = target_vectorization(target)
    return ({"encoder_inputs": source, "decoder_inputs": target[:, :-1],}, target[:, 1:])


def make_dataset(pairs):
    source_texts, target_texts = zip(*pairs)
    source_texts = list(source_texts)
    target_texts = list(target_texts)
    dataset = tf.data.Dataset.from_tensor_slices((source_texts, target_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
        })
        return config


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)
    def get_config(self):
        config = super().get_config()
        config.update({
            "sequence_length": self.sequence_length,
            "vocab_size": self.vocab_size,
            "embed_dim": self.embed_dim,
        })
        return config


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "latent_dim": self.latent_dim,
            "num_heads": self.num_heads,
        })
        return config


In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8
vocab_size = 15000
sequence_length = 20
batch_size = 64

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [None]:
epochs = 100

transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

In [None]:
transformer.summary()

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   3845120     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

In [None]:
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

In [None]:
target_vocab = target_vectorization.get_vocabulary()
target_index_lookup = dict(zip(range(len(target_vocab)), target_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = target_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence


predictions = []
references = [[]]
test_source_texts = [pair[0] for pair in test_pairs]
test_target_texts = [pair[1] for pair in test_pairs]

# for _ in range(len(test_target_texts)):
#     i = np.random.randint(0,len(test_source_texts)-1)
#     input_sentence = test_source_texts[i]
#     refs = test_target_texts[i]
#     translated = decode_sequence(input_sentence)

#     translated = translated.replace("[start]","")
#     translated = translated.replace("[end]","")

#     refs = refs.replace("[start]","")
#     refs = refs.replace("[end]","")
#     refs = [refs]

#     references.append(refs)
#     predictions.append(translated)

In [None]:
references.pop(0)
print(len(references) , len(predictions))

443 443


In [None]:
for i in range(10):
    input = test_source_texts[i]
    output = decode_sequence(input).replace("[start]","")
    output = output.replace("[end]","")
    print("input:" + input,"\toutput:" + output)

input:আমি বিলটা মিটিয়ে দেব। 	output: আই বিলটা ইয়েনচাইত পারি 
input:আমি দেউলিয়া হয়ে গেছি। 	output: আই একট হয়ে গেয়ি্য। 
input:টম এটা দেখলো। 	output: টম এহেন দেখলো। 
input:কেউ বাড়ি ছিলো না। 	output: কেউ বাড়ি ছিলো না। 
input:তাঁরা চেঁচালেন। 	output: তাঁরা চেঁচালেন। 
input:টম এখানে কাজ করতো। 	output: টম এখানে কাজ গরতো। 
input:প্রবেশমূল্য কত? 	output: প্রবেশমূল্য হতো 
input:আপনি বোঝেন না। 	output: অনে বোঝেন না। 
input:আমি ওখানে গিয়েছি। 	output: আই ওখানে থাকা 
input:টম মনযোগ দিয়ে শুনছে। 	output: টম স্কুটার দিয়ে হোনো। 


In [None]:
input = "কোথায় চলে গিয়েছিলে?"
output = decode_sequence(input).replace("[start]","")
output = output.replace("[end]","")
print("input:" + input,"\toutput:" + output)

input:কোথায় চলে গিয়েছিলে? 	output: হডে সলে যাইবে। 


In [None]:
from itertools import chain
f = open("/content/result_chakma_data.txt","w")
cnt = 1

one_dim_ref = list(chain.from_iterable(references))
for i in range(len(predictions)):
    f.write("ref{}:".format(cnt) + one_dim_ref[i] + "\n")
    f.write("pred{}:".format(cnt) + predictions[i] + "\n")
    cnt += 1

len(predictions)

443

In [None]:
!pip install evaluate bert_score

In [None]:
import evaluate

bleu = evaluate.load('bleu')

print(bleu.compute(predictions=predictions , references=references))

In [None]:
flatten_list = [j for references in references for j in references]
bertscore = evaluate.load("bertscore")
results = bertscore.compute(predictions=predictions, references=flatten_list, model_type="distilbert-base-uncased")

average_f1 = sum(results['f1']) / len(results['f1'])
print("Average F1-score:", average_f1)

In [None]:
transformer.save_weights("/content/bn_to_ctg.h5")

In [None]:
print(len(train_source_texts),len(train_target_texts))

f1 = open("/content/train_bangla_data.txt","w")
f2 = open("/content/train_chatgaiya_data.txt","w")

for i in range(len(train_source_texts)):
    f1.write(train_source_texts[i] + "\n")
    f2.write(train_target_texts[i] + "\n")

print(len(train_source_texts),len(train_target_texts))

4423 4423
4423 4423


In [None]:
print(len(flatten_list),len(references))

443 443
