In [None]:
!pip install torchtext==0.6.0


In [None]:
!pip install fugashi[unidic-lite]


In [None]:
!pip install -U spacy


In [None]:
#english
!python -m spacy download en_core_web_sm


In [None]:
#japanese
!python -m spacy download ja_core_news_sm


In [None]:
#chinese,arabic,russian
!python -m spacy download zh_core_web_sm
!python -m spacy download xx_ent_wiki_sm
!python -m spacy download ru_core_news_sm



In [None]:
#dataset pth
import numpy as np
import pandas as pd

path = "/content/normalization_assesment_dataset_10k.csv"
df = pd.read_csv(path, usecols=[0,1], header=0)

In [None]:
import torch
import spacy
import random
import math
import time
import numpy as np
import torch.nn as nn
import pandas as pd
import torch.optim as optim

from torchtext.data import Field, BucketIterator, TabularDataset

In [None]:
import pandas as pd
from torchtext.data import Field, TabularDataset

In [None]:
from nltk.tokenize import word_tokenize

In [None]:
# Creating tokenization function
def tokenize_text(text):
    #tokenize text
    tokens = word_tokenize(text)
    return tokens


In [None]:
# Creating fields
SRC = Field(tokenize=tokenize_text,
            init_token='<sos>',
            eos_token='<eos>',
            lower=False)

TRG = Field(tokenize=tokenize_text,
            init_token='<sos>',
            eos_token='<eos>',
            lower=False)

In [None]:
data_fields = [('src', SRC), ('trg', TRG)]

In [None]:
import pandas as pd

# Load CSV file
input_file = '/content/normalization_assesment_dataset_10k.csv'  # Replace with the path to your input file
df = pd.read_csv(input_file)

# Ensure the dataframe has the correct number of rows (10,000)
assert len(df) == 10000, "The CSV file must have exactly 10,000 rows"

# Split the dataframe into the three parts
train_df = df.iloc[:5000]
test_df = df.iloc[5000:7500]
valid_df = df.iloc[7500:10000]

# Save each part as a new CSV
train_df.to_csv('train.csv', index=False)
test_df.to_csv('test.csv', index=False)
valid_df.to_csv('valid.csv', index=False)

print("CSV files split and saved successfully!")


In [None]:
 import nltk
 nltk.download('punkt_tab')

In [None]:
# Splitting the dataset into train, validation, and test sets
train, val, test = TabularDataset.splits(
    path='./',
    train='train.csv',
    validation='valid.csv',
    test='test.csv',
    format='csv',
    fields=data_fields,  # Specify the fields mapping
    skip_header=True
)

# Check the first example in the training data
print(vars(train.examples[0]))

# Check the length of the datasets
print(f"Train dataset size: {len(train.examples)}")
print(f"Validation dataset size: {len(val.examples)}")
print(f"Test dataset size: {len(test.examples)}")

# Build vocabulary for SRC and TRG using the training dataset
SRC.build_vocab(train, min_freq=2)  # Set min_freq as needed to filter rare tokens
TRG.build_vocab(train, min_freq=2)

# Print the vocabulary sizes
print(f"Source vocabulary size: {len(SRC.vocab)}")
print(f"Target vocabulary size: {len(TRG.vocab)}")

In [None]:
print(train.examples[0])


In [None]:
len(train.examples), len(val.examples), len(test.examples)


In [None]:
SRC.build_vocab(train, min_freq = 2)
TRG.build_vocab(train, min_freq = 2)

In [None]:
print(len(SRC.vocab))
print(len(TRG.vocab))

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
BATCH_SIZE = 20

train_iter = BucketIterator(
    train,
    batch_size=BATCH_SIZE,
    device = device
)

valid_iter = BucketIterator(
    val,
    batch_size=BATCH_SIZE,
    device = device
)

test_iter = BucketIterator(
    test,
    batch_size = BATCH_SIZE,
    device = device
)

In [None]:
#Encoder of seq2seq model
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout = dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        #src = [src len, batch size]
        embedded = self.dropout(self.embedding(src))
        #embedded = [src len, batch size, emb dim]
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell

In [None]:
#Decoder of the seq2seq model
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.output_dim = output_dim
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout = dropout)
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
      #input = [1, batch size]
        embedded = self.dropout(self.embedding(input))
        #embedded = [1, batch size, emb dim]
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        #prediction = [batch size, output dim]
        return prediction, hidden, cell

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

        assert encoder.hid_dim == decoder.hid_dim, \
            "Hidden dimensions of encoder and decoder must be equal!"
        assert encoder.n_layers == decoder.n_layers, \
            "Encoder and decoder must have equal number of layers!"

    def forward(self, src, trg, teacher_forcing_ratio = 0.5):

        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        #tensor to store decoder outputs
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        #last hidden state of the encoder is used as the initial hidden state of the decoder
        hidden, cell = self.encoder(src)

        #first input to the decoder is the <sos> tokens
        input = trg[0,:]

        for t in range(1, trg_len):

            output, hidden, cell = self.decoder(input, hidden, cell)
            #place predictions in a tensor holding predictions for each token
            outputs[t] = output
            #decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio
            #get the highest predicted token from our predictions
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1

        return outputs

In [None]:
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT)

model = Seq2Seq(enc, dec, device).to(device)

In [None]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)

model.apply(init_weights)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

In [None]:
optimizer = optim.Adam(model.parameters())

In [None]:
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]

criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)

In [None]:
def train(model, iterator, optimizer, criterion, clip =1):

    model.train()

    epoch_loss = 0

    for i, batch in enumerate(iterator):
        src = batch.src
        trg = batch.trg
        optimizer.zero_grad()
        output = model(src, trg)

        #trg = [trg len, batch size]
        #output = [trg len, batch size, output dim]

        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)


        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [None]:
def evaluate(model, iterator, criterion):

    model.eval()

    epoch_loss = 0

    with torch.no_grad():

        for i, batch in enumerate(iterator):
            src = batch.src   #src!
            trg = batch.trg    #trg!
            output = model(src, trg, 0)
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)

            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
N_EPOCHS = 15  # train for 15 epochs

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()

    train_loss = train(model,
              train_iter,
              optimizer,
              criterion)
    valid_loss = evaluate(model,
              valid_iter,
              criterion)

    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut1-model.pt')

    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f}')

In [None]:
model.load_state_dict(torch.load('tut1-model.pt'))
test_loss = evaluate(model, test_iter, criterion)
print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')

In [None]:
!pip install langdetect


In [None]:
!pip install stanza


In [None]:
import torch
import spacy
import langdetect  # To detect language
import jieba  # For Chinese tokenization
import stanza  # For Russian, Arabic, and other languages

# Load spaCy models for different languages
spacy_en = spacy.load('en_core_web_sm')  # English tokenizer
spacy_zh = spacy.load('zh_core_web_sm')  # Chinese tokenizer
spacy_ar = spacy.load('xx_ent_wiki_sm')  # A multilingual tokenizer for Arabic, etc.
# For Russian and other languages
stanza_ru = stanza.Pipeline(lang='ru', processors='tokenize')
stanza_ar = stanza.Pipeline(lang='ar', processors='tokenize')

# Function to detect language
def detect_language(text):
    return langdetect.detect(text)

def normalize_sentence(sentence, src_field, trg_field, model, device, max_len=50):
    model.eval()  # Set model to evaluation mode

    # If the input sentence is a list of tokens, join them into a single string
    if isinstance(sentence, list):
        sentence = ' '.join(sentence)  # Join the list of tokens into a string

    # Detect the language of the input sentence (now it's a string)
    language = detect_language(sentence)

    # Tokenize based on language
    if language == 'en':  # English
        nlp = spacy_en
        tokens = [token.text for token in nlp(sentence)]
    elif language == 'zh':  # Chinese
        tokens = list(jieba.cut(sentence))
    elif language == 'ru':  # Russian
        doc = stanza_ru(sentence)
        tokens = [word.text for sent in doc.sentences for word in sent.words]
    elif language == 'ar':  # Arabic
        doc = stanza_ar(sentence)
        tokens = [word.text for sent in doc.sentences for word in sent.words]
    else:  # Other languages
        nlp = spacy_zh  # Using a multilingual model for unsupported languages
        tokens = [token.text for token in nlp(sentence)]

    # Debugging step: print tokenized sentence
    print(f"Tokenized sentence: {tokens}")

    # Add start and end tokens
    tokens = [src_field.init_token] + tokens + [src_field.eos_token]

    # Convert tokens to their index representations in the vocabulary
    src_indexes = [src_field.vocab.stoi[token] for token in tokens]
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)

    # Pass the source tensor through the encoder
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    # Initialize the target sequence with the <sos> token
    trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]

    # Generate output sequence step by step
    for i in range(max_len):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)

        # Get the predicted token (index)
        pred_token = output.argmax(1).item()

        # Append the predicted token to the target sequence
        trg_indexes.append(pred_token)

        # Stop if we reach the <eos> token
        if pred_token == trg_field.vocab.stoi[trg_field.eos_token]:
            break

    # Convert the indices back to tokens (the predicted normalized text)
    trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]

    # Return the normalized sentence, removing <sos> and <eos> tokens
    return trg_tokens[1:-1]  # Removing <sos> and <eos> when returning the output



In [None]:
from torchtext.data.metrics import bleu_score

def calculate_bleu(data, src_field, trg_field, model, device, max_len = 50):

    trgs = []
    pred_trgs = []

    for datum in data:

        src = vars(datum)['src']
        trg = vars(datum)['trg']

        pred_trg = normalize_sentence(src, src_field, trg_field, model, device, max_len)

        #cut off <eos> token
        #pred_trg = pred_trg[:-1]

        pred_trgs.append(pred_trg)
        trgs.append([trg])

    return bleu_score(pred_trgs, trgs)

In [None]:
bleu_value = calculate_bleu(test, SRC, TRG, model, device)

print(f'BLEU score = {bleu_value*100:.2f}')