In [16]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [17]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping



In [18]:
train_url='/content/drive/MyDrive/Colab Notebooks/train.csv'
test_url='/content/drive/MyDrive/Colab Notebooks/test.csv'


train_df = pd.read_csv(train_url)
test_df = pd.read_csv(test_url)

Data Preprocessing

In [19]:
def replace_with_homophones(word):
    adjacent_keys = {
            "অ": "আও",
            "আ": "অও",
            "ই": "ঈউই",
            "ঈ": "ইঈ",
            "উ": "ঊউই",
            "ঊ": "উঊ",
            "ঋ": "ঋ",
            "এ": "ঐএই",
            "ঐ": "এঐই",
            "ও": "ঔঅও",
            "ঔ": "ওঔ",
            "ক": "খগ",
            "খ": "কগ",
            "গ": "ঘগ্",
            "ঘ": "গগ্",
            "ঙ": "ঙং",
            "চ": "ছজ",
            "ছ": "চজ",
            "জ": "ঝয",
            "ঝ": "জয",
            "ঞ": "ঞম",
            "ট": "ঠড",
            "ঠ": "টডথ",
            "ড": "ঢদধ",
            "ঢ": "ডদধ",
            "ণ": "ণনম",
            "ত": "থদত",
            "থ": "তদদ্",
            "দ": "ধড",
            "ধ": "দড",
            "ন": "ণম",
            "প": "ফব",
            "ফ": "প",
            "ব": "ভব্",
            "ভ": "ব",
            "ম": "মন",
            "য": "জঝ",
            "র": "লর্যড়ঢ়য়",
            "ল": "রল",
            "শ": "সষ",
            "ষ": "শস",
            "স": "শষ",
            "হ": "হ্",
            "ড়": "ঢ়য়র",
            "ঢ়": "ড়য়র",
            "য়": "ড়ঢ়্",
            "ৎ": "ৎ্তট",
            "ং": "ঙ্",
            "ঃ": "ঃ্",
            "ঁ": "ঁ্",
        }
    diacritic_mapping = {

        "া": "িীুূৃেৈোৌ",
        "ি": "ীাুূ",
        "ী": "িাুূ",
        "ু": "ূিীা",
        "ূ": "ুিীা",
        "ৃ": "েৈা",
        "ে": "ৈৃো",
        "ৈ": "েৃো",
        "ো": "ৌেৈা",
        "ৌ": "োেৈা",
    }

    idx = random.randint(0, len(word) - 1)
    char = word[idx]

    if char in adjacent_keys:
        word = word[:idx] + random.choice(adjacent_keys[char]) + word[idx + 1:]
        return word
    elif char in diacritic_mapping:
        new_diacritic = random.choice(diacritic_mapping[char])
        word = word[:idx] + new_diacritic + word[idx + 1:]
    return word

In [20]:
def swap_adjacent_chars(word):
    if len(word) < 2:
        return word
    idx = random.randint(0, len(word) - 2)
    return word[:idx] + word[idx + 1] + word[idx] + word[idx + 2:]

def remove_char(word):
    if len(word) < 2:
        return word
    idx = random.randint(0, len(word) - 1)
    return word[:idx] + word[idx + 1:]

def insert_char(word):
    idx = random.randint(0, len(word))
    char = random.choice(string.ascii_lowercase)
    return word[:idx] + char + word[idx:]

def combine_words(words):
    idx = random.randint(0, len(words) - 2)
    words[idx] = words[idx] + words[idx + 1]
    del words[idx + 1]
    return words

def transpose_char(word):
    if len(word) < 2:
        return word
    idx = random.randint(0, len(word) - 2)
    word = word[:idx] + word[idx + 1] + word[idx] + word[idx + 2:]
    return word

def repeat_char(word):
    if len(word) < 1:
        return word
    idx = random.randint(0, len(word) - 1)
    word = word[:idx] + word[idx] + word[idx] + word[idx + 1:]
    return word

def remove_diacritic(word):
    diacritics = "ািীুূৃেৈোৌ"
    new_word = ''
    for char in word:
        if char in diacritics and random.random() < 0.5:
            continue
        new_word += char
    return new_word if new_word else word

def replace_wrong_diacritic(word):
    wrong_diacritic = {
        "া": "ে",
        "ি": "ী",
        "ী": "ি",
        "ু": "ূ",
        "ূ": "ু",
    }
    new_word = ''
    for char in word:
        if char in wrong_diacritic and random.random() < 0.5:
            new_word += wrong_diacritic[char]
        else:
            new_word += char
    return new_word

In [21]:
def modify_word_based_on_error_type(word, error_type):
    if error_type == "swap":
        return swap_adjacent_chars(word)
    elif error_type == "remove":
        return remove_char(word)
    elif error_type == "insert":
        return insert_char(word)
    elif error_type == "adjacent":
        return replace_with_homophones(word)
    elif error_type == "combine":
        return word
    elif error_type == "transpose":
        return transpose_char(word)
    elif error_type == "repeat":
        return repeat_char(word)
    elif error_type == "remove_diacritic":
        return remove_diacritic(word)
    elif error_type == "replace_wrong_diacritic":
        return replace_wrong_diacritic(word)

In [22]:
def introduce_errors(query, error_rate):
    words = query.split()
    if len(words) == 0:
        return query
    num_errors = random.randint(0, 1) #int((len(words) - 1) * (error_rate - 0.5)))
    for _ in range(num_errors):
        if random.random() < error_rate:
            idx = random.randint(0, len(words) - 1)
            error_types = ["swap", "remove", "insert", "adjacent", "combine",
                           "transpose", "repeat", "remove_diacritic", "replace_wrong_diacritic"]
            error_type = random.choice(error_types)
            if error_type == "combine" and len(words) > 1:
                words = combine_words(words)
            else:
                words[idx] = modify_word_based_on_error_type(words[idx], error_type)
    return " ".join(words)

In [23]:
def generate_dataset(input_file, target_file, error_rate):
    pairs = []
    for i, t in zip(input_file, target_file):
        if not i.strip() and t.strip():
            continue
        erroneous_query = introduce_errors(i, error_rate)
        pairs.append((erroneous_query, t))
    return pairs

def extract(data,dataset_type, column):
    target_data = data[column] #target,input
    target_data = target_data.dropna()
    def replace_multiple_digits(text):
        return re.sub(r"(\d)\1+", r"\1", text)
    cleaned_target_data = target_data.apply(replace_multiple_digits)
    cleaned_target_list = cleaned_target_data.tolist()
    json_data = json.dumps(cleaned_target_list, ensure_ascii=False)
    with open(f"{dataset_type}_{column}.json", "w", encoding="utf-8") as f:
        f.write(json_data)

def get_json(path):
    with open(path, "r", encoding="utf-8") as f:
        json_data = json.load(f)
        return json_data

def create_final_dataset(path, dataset_type, error_rate): #path of the original csv data
    data = pd.read_csv(path)
    print(data.shape)
    extract(data, dataset_type, 'Input')
    extract(data, dataset_type, 'Target')
    input_file = get_json(f"{output_dir}{dataset_type}_Input.json")
    target_file = get_json(f"{output_dir}{dataset_type}_Target.json")
    dataset=generate_dataset(input_file, target_file, error_rate)
    with open(f"{dataset_type}_data.json", "w", encoding="utf-8") as f:
        json.dump(dataset, f, ensure_ascii=False)
    return dataset

In [None]:
# Convert all entries to strings
train_df['Input'] = train_df['Input'].astype(str)
train_df['Target'] = train_df['Target'].astype(str)
test_df['Input'] = test_df['Input'].astype(str)
test_df['Target'] = test_df['Target'].astype(str)

# Concatenate all texts for tokenization
all_texts = list(train_df['Input']) + list(train_df['Target']) + list(test_df['Input']) + list(test_df['Target'])

# Initialize the tokenizer with a smaller vocabulary size
max_vocab_size = 500 # Reduced vocabulary size
tokenizer = Tokenizer(num_words=max_vocab_size)
tokenizer.fit_on_texts(all_texts)

# Convert texts to sequences
def texts_to_sequences(texts):
    return tokenizer.texts_to_sequences(texts)

# Tokenize the inputs and targets
X_train = texts_to_sequences(train_df['Input'])
y_train = texts_to_sequences(train_df['Target'])
X_test = texts_to_sequences(test_df['Input'])
y_test = texts_to_sequences(test_df['Target'])

# Get maximum sequence lengths
max_input_length = max(len(seq) for seq in X_train)
max_target_length = max(len(seq) for seq in y_train)

# Pad sequences
X_train = pad_sequences(X_train, maxlen=max_input_length, padding='post')
y_train = pad_sequences(y_train, maxlen=max_target_length, padding='post')
X_test = pad_sequences(X_test, maxlen=max_input_length, padding='post')
y_test = pad_sequences(y_test, maxlen=max_target_length, padding='post')

# Define parameters
vocab_size = min(len(tokenizer.word_index) + 1, max_vocab_size)  # Ensure vocab size does not exceed the limit
embedding_dim = 64  # Reduced embedding dimension
units = 128  # Reduced units in LSTM

# Define the encoder
encoder_inputs = Input(shape=(None,), name='encoder_inputs')
encoder_embedding = Embedding(vocab_size, embedding_dim)(encoder_inputs)
encoder_lstm = LSTM(units, return_sequences=True, return_state=True)
encoder_outputs, encoder_state_h, encoder_state_c = encoder_lstm(encoder_embedding)

# Define the decoder
decoder_inputs = Input(shape=(None,), name='decoder_inputs')
decoder_embedding = Embedding(vocab_size, embedding_dim)(decoder_inputs)
decoder_lstm = LSTM(units, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=[encoder_state_h, encoder_state_c])
decoder_dense = Dense(vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

Build The Model


In [None]:
from transformers import MBartForConditionalGeneration, MBart50TokenizerFast

model = MBartForConditionalGeneration.from_pretrained("facebook/mbart-large-50")
tokenizer = MBart50TokenizerFast.from_pretrained("facebook/mbart-large-50", src_lang="en_XX", tgt_lang="ro_RO")

Input_text = "তোমার মনে সুত আছে তাই তুমি এখন সুখী"
Target_text =  "তোমার মনে সুখ আছে তাই তুমি এখন সুখী।"

# Use Input_text instead of src_text
model_inputs = tokenizer(Input_text, return_tensors="pt")
with tokenizer.as_target_tokenizer():
    # Use Target_text instead of tgt_text
    labels = tokenizer(Target_text, return_tensors="pt").input_ids

model(**model_inputs, labels=labels) # forward pass

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.42k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/2.44G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/261 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/531 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/649 [00:00<?, ?B/s]



Seq2SeqLMOutput(loss=tensor(9.6226, grad_fn=<NllLossBackward0>), logits=tensor([[[58.6929, -1.4571, 36.5999,  ...,  5.7734, -0.9779, 14.9104],
         [58.6327, -1.4536, 36.3728,  ...,  5.7715, -0.9605, 14.8065],
         [-8.3387, -0.4806, 11.4270,  ...,  1.8540, -1.5911,  4.4546],
         ...,
         [-0.2996, -0.3411, 15.7467,  ...,  0.9878,  0.4053,  3.4122],
         [ 0.1940, -0.4484, 23.3401,  ...,  1.1744, -1.4489,  8.9990],
         [ 4.2647, -0.3939, 20.8198,  ...,  2.2280, -0.5391,  7.1224]]],
       grad_fn=<AddBackward0>), past_key_values=None, decoder_hidden_states=None, decoder_attentions=None, cross_attentions=None, encoder_last_hidden_state=tensor([[[ 0.0110, -0.0078, -0.0030,  ..., -0.0410, -0.0082,  0.0149],
         [-0.8845,  0.6545, -0.1895,  ..., -0.3617,  0.0206,  0.2807],
         [-0.6110, -0.2568, -0.1453,  ..., -0.8456, -0.3946,  0.1803],
         ...,
         [-0.8859, -0.0531,  0.6574,  ...,  0.2838,  0.1188, -0.0015],
         [-0.1539,  1.0433,  0.4

In [None]:
# Build the model
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])


In [None]:
import tensorflow as tf

# Use CPU instead of GPU
with tf.device('/CPU:0'):
    model.fit(
        [X_train, y_train[:, :-1]],
        np.expand_dims(y_train[:, 1:], -1),
        epochs=5,
        batch_size=32,
        validation_data=([X_test, y_test[:, :-1]], np.expand_dims(y_test[:, 1:], -1))
    )



Epoch 1/5
[1m42385/42385[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2507s[0m 59ms/step - accuracy: 0.8483 - loss: 0.9026 - val_accuracy: 0.8526 - val_loss: 0.7749
Epoch 2/5
[1m15088/42385[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m29:02[0m 64ms/step - accuracy: 0.9077 - loss: 0.5604

In [None]:
# Save model
model.save('model.h5')

# Load model (if needed)
from tensorflow.keras.models import load_model
model = load_model('model.h5')



In [None]:
import pickle
from google.colab import files

# Save the tokenizer to a pickle file
with open('tokenizer.pickle', 'wb') as handle:
    pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
# Download the tokenizer pickle file
files.download('tokenizer.pickle')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
!pip install joblib
import joblib



In [None]:
model=joblib.dump(model, 'model.pkl')

In [None]:
from google.colab import files
files.download('model.pkl')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
from nltk.translate.bleu_score import sentence_bleu

def calculate_bleu(reference, candidate):
  """
  Calculate BLEU score.

  Args:
      reference: The reference sentence.
      candidate: The candidate sentence.

  Returns:
      The BLEU score as a float.
  """
  reference = [reference.split()]
  candidate = candidate.split()
  return sentence_bleu(reference, candidate)

# Example usage
reference = "তোমার মনে সুত আছে তাই তুমি এখন সুখী"
candidate = "তোমার মনে সুখ আছে তাই তুমি এখন সুখী"

bleu_score = calculate_bleu(reference, candidate)

print(f"BLEU Score: {bleu_score:.3f}")

BLEU Score: 0.595


In [None]:
!pip install jiwer
import jiwer

def calculate_wer(ground_truth, hypothesis):

  return jiwer.wer(ground_truth, hypothesis)

def calculate_cer(ground_truth, hypothesis):
  """
  Calculate Character Error Rate (CER).

  Args:
      ground_truth: The correct sentence.
      hypothesis: The predicted sentence.

  Returns:
      The CER as a float.
  """
  return jiwer.cer(ground_truth, hypothesis)

# Example usage
ground_truth = "তোমার মনে সুত আছে তাই তুমি এখন সুখী"
hypothesis = "তোমার মনে সুখ আছে তাই তুমি এখন সুখী"

wer = calculate_wer(ground_truth, hypothesis)
cer = calculate_cer(ground_truth, hypothesis)

print(f"WER: {wer:.3f}")
print(f"CER: {cer:.3f}")

WER: 0.125
CER: 0.029
