<a href="https://colab.research.google.com/github/LidiiaMelnyk95/FSU_Jena_scripts/blob/main/t5_grammar.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
from datasets import load_dataset
from tqdm import tqdm
import argparse
import glob
import os
import json
import time
import logging
import random
import re
from itertools import chain
from string import punctuation

import nltk
nltk.download('punkt')
from nltk.tokenize import sent_tokenize

import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

from transformers import (
    AdamW,
    T5ForConditionalGeneration,
    T5Tokenizer,
    get_linear_schedule_with_warmup
)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
def set_seed(seed):
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)

set_seed(42)


In [None]:

df = pd.read_csv('/content/data_augmented_df.csv', sep = ';')

In [None]:
df = df.rename(columns = {"SPELLING": 'input', 'Comment': 'output'})
df = df[['input', 'output']]

In [None]:
import torch
torch.cuda.is_available()

True

In [None]:
from transformers import (
    T5ForConditionalGeneration, T5Tokenizer,
    Seq2SeqTrainingArguments, Seq2SeqTrainer, DataCollatorForSeq2Seq
  )

from torch.utils.data import Dataset, DataLoader


In [None]:
model_name = 't5-base'
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)


In [None]:

def calc_token_len(example):
    return len(tokenizer(example).input_ids)


In [None]:

from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(df, test_size=0.05, shuffle=True)


In [None]:

test_df['input_token_len'] = test_df['input'].apply(calc_token_len)

Token indices sequence length is longer than the specified maximum sequence length for this model (585 > 512). Running this sequence through the model will result in indexing errors


In [None]:

from datasets import Dataset
train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
from torch.utils.data import Dataset, DataLoader
class GrammarDataset(Dataset):
    def __init__(self, dataset, tokenizer,print_text=False):
        self.dataset = dataset
        self.pad_to_max_length = False
        self.tokenizer = tokenizer
        self.print_text = print_text
        self.max_len = 64

    def __len__(self):
        return len(self.dataset)


    def tokenize_data(self, example):
        input_, target_ = example['input'], example['output']

        # tokenize inputs
        tokenized_inputs = tokenizer(input_, pad_to_max_length=self.pad_to_max_length,
                                            max_length=self.max_len,
                                            return_attention_mask=True)

        tokenized_targets = tokenizer(target_, pad_to_max_length=self.pad_to_max_length,
                                            max_length=self.max_len,
                                            return_attention_mask=True)

        inputs={"input_ids": tokenized_inputs['input_ids'],
            "attention_mask": tokenized_inputs['attention_mask'],
            "labels": tokenized_targets['input_ids']
        }

        return inputs


    def __getitem__(self, index):
        inputs = self.tokenize_data(self.dataset[index])

        if self.print_text:
            for k in inputs.keys():
                print(k, len(inputs[k]))

        return inputs

In [None]:

dataset = GrammarDataset(test_dataset, tokenizer, True)
print(dataset[15])

Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


input_ids 64
attention_mask 64
labels 64
{'input_ids': [3, 7422, 632, 4209, 8775, 632, 308, 8491, 680, 3, 15, 7, 78, 548, 13832, 745, 6, 211, 3, 362, 15638, 17955, 5335, 218, 3494, 1662, 20899, 425, 36, 18992, 35, 6368, 6, 1352, 615, 211, 236, 5964, 31499, 5, 196, 51, 3, 30240, 2010, 3, 362, 96, 19629, 2626, 4039, 157, 13513, 121, 19642, 561, 177, 10122, 29, 7937, 501, 2149, 649, 1], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 'labels': [3, 7422, 632, 4209, 8775, 632, 308, 8491, 680, 3, 15, 7, 78, 548, 13832, 745, 6, 211, 3, 362, 15638, 17955, 5335, 218, 3494, 1662, 20899, 425, 36, 18992, 35, 6368, 6, 1352, 615, 211, 236, 5964, 31499, 5, 196, 51, 3, 30240, 2010, 3, 362, 96, 19629, 2626, 4039, 157, 13513, 121, 19642, 561, 177, 10122, 29, 7937, 501, 2149, 649, 1]}


In [None]:

!pip install rouge_score


In [None]:

from datasets import load_metric
rouge_metric = load_metric("rouge")

In [None]:
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model, padding='longest', return_tensors='pt')


In [None]:
# defining training related arguments
batch_size = 5
args = {
    "output_dir": "/content/drive/MyDrive/c4_200m/weights",
    "evaluation_strategy": "steps",
    "per_device_train_batch_size": batch_size,
    "per_device_eval_batch_size": batch_size,
    "learning_rate": 2e-5,
    "num_train_epochs": 1,
    "weight_decay": 0.01,
    "save_total_limit": 2,
    "predict_with_generate": True,
    "gradient_accumulation_steps": 6,
    "eval_steps": 5,
    "save_steps": 5,
    "load_best_model_at_end": True,
    "logging_dir": "/logs"
}


In [None]:
import nltk
nltk.download('punkt')
import numpy as np

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    # Replace -100 in the labels as we can't decode them.
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

    # Rouge expects a newline after each sentence
    decoded_preds = ["\n".join(nltk.sent_tokenize(pred.strip())) for pred in decoded_preds]
    decoded_labels = ["\n".join(nltk.sent_tokenize(label.strip())) for label in decoded_labels]

    result = rouge_metric.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)
    # Extract a few results
    result = {key: value.mid.fmeasure * 100 for key, value in result.items()}

    # Add mean generated length
    prediction_lens = [np.count_nonzero(pred != tokenizer.pad_token_id) for pred in predictions]
    result["gen_len"] = np.mean(prediction_lens)
    return {k: round(v, 4) for k, v in result.items()}

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
trainer = Seq2SeqTrainer(model=model,
                args=args,
                train_dataset= GrammarDataset(train_dataset, tokenizer),
                eval_dataset=GrammarDataset(test_dataset, tokenizer),
                tokenizer=tokenizer,
                data_collator=data_collator,
                compute_metrics=compute_metrics)


In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"
trainer.train()

In [None]:
trainer.predict(test_dataset= test_dataset)

In [None]:
import torch
from torchviz import make_dot
from transformers import T5ForConditionalGeneration

model_name = 'deep-learning-analytics/GrammarCorrector'
torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name).to(torch_device)

def correct_grammar(input_text, num_return_sequences):
    input_ids = tokenizer.encode(input_text, return_tensors="pt").to(torch_device)
    translated = model.generate(input_ids,
                                max_length=512,
                                num_beams=4,
                                num_return_sequences=num_return_sequences,
                                temperature=1.5)


    tgt_text = tokenizer.batch_decode(translated, skip_special_tokens=True)
    return tgt_text

# Example usage
input_text = test_df['input'][70]
tgt_text = correct_grammar(input_text, num_return_sequences = 1)



In [None]:
import torch
from torchsummary import summary
from transformers import T5ForConditionalGeneration

model_name = 'deep-learning-analytics/GrammarCorrector'
torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name).to(torch_device)

# Define an example input shape
input_shape = (1, 512)  # Replace with your desired input shape

# Use torchsummary to visualize the model architecture
summary(model, 512)


In [None]:
test_df['preds'] = test_df['input'].apply(lambda x: correct_grammar(x, num_return_sequences=1))

In [None]:
test_df.head()

In [None]:
import torch
import torchsummary
from transformers import T5Tokenizer, T5ForConditionalGeneration

model_name = 'deep-learning-analytics/GrammarCorrector'
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)

# Move the model to CPU
model.to('cpu')

# Get the input shape of the model
input_text = test_df['input'][70]
input_ids = tokenizer.encode(input_text, return_tensors="pt")
input_shape = tuple(input_ids.shape[1:])

# Use torchsummary to visualize the model architecture
torchsummary.summary(model, input_size=input_shape, device='cpu')


In [None]:
!pip install torchviz

In [None]:
import torch
from torchviz import make_dot
from transformers import T5ForConditionalGeneration

model_name = 'deep-learning-analytics/GrammarCorrector'
model = T5ForConditionalGeneration.from_pretrained(model_name)

# Create a dummy input
input_text = test_df['input'][70]
input_ids = tokenizer.encode(input_text, return_tensors="pt")

# Forward pass through the model
outputs = model(input_ids)

# Create a graph of the model
graph = make_dot(outputs)

# Save the graph as a PDF
graph.render("model_graph")


In [None]:
test_df.to_csv('compared_df.csv')

In [None]:
import nltk

nltk.download('punkt')
nltk.download('wordnet')

In [None]:
from nltk.util import ngrams
from nltk import word_tokenize
from nltk.corpus import wordnet as wn

def precision(candidate, reference, n):
    """
    Calculate the precision of n-grams in a text.

    Args:
    - candidate (str): The generated text
    - reference (str): The reference text
    - n (int): The n-gram order

    Returns:
    - float: The precision score
    """
    candidate_ngrams = ngrams(word_tokenize(candidate), n)
    reference_ngrams = ngrams(word_tokenize(reference), n)
    candidate_ngrams_set = set(candidate_ngrams)
    reference_ngrams_set = set(reference_ngrams)
    common_ngrams = candidate_ngrams_set.intersection(reference_ngrams_set)
    precision = len(common_ngrams) / len(candidate_ngrams_set)
    return precision

In [None]:
def brevity_penalty(candidate, reference):
    """
    Calculate the brevity penalty for the precision score.

    Args:
    - candidate (str): The generated text
    - reference (str): The reference text

    Returns:
    - float: The brevity penalty
    """
    candidate_length = len(word_tokenize(candidate))
    reference_length = len(word_tokenize(reference))
    if candidate_length > reference_length:
        brevity_penalty = 1
    else:
        brevity_penalty = np.exp(1 - reference_length / candidate_length)
    return brevity_penalty

In [None]:
def gleu(candidate, reference, max_order=4):
    """
    Calculate the GLEU score for a generated text compared to a reference text.

    Args:
    - candidate (str): The generated text
    - reference (str): The reference text
    - max_order (int): The maximum n-gram order to consider (default: 4)

    Returns:
    - float: The GLEU score
    """
    precision_scores = []
    for n in range(1, max_order + 1):
        precision_scores.append(precision(candidate, reference, n))
    brevity_penalty_score = brevity_penalty(candidate, reference)
    gleu_score = brevity_penalty_score * np.exp(np.mean(np.log(precision_scores)))
    return gleu_score

In [None]:
test_df['preds'] = test_df['preds'].apply(lambda x: x[0])


In [None]:
test_df = pd.read_csv('/content/compared_df.csv')

In [None]:
test_df.isna().sum()

In [None]:
for i, row in test_df.iterrows():
    try:
        test_df.at[i, 'gleu'] = gleu(row['preds'], row['output'], max_order=20)
    except ZeroDivisionError:
        pass

In [None]:
gleu_score = test_df.gleu.mean()

In [None]:
gleu_score

In [None]:
df_test = pd.read_csv('')

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

# Install the happytransformer library
!pip install happytransformer

In [None]:
from happytransformer import HappyTextToText, TTSettings
happy_tt = HappyTextToText("T5", "google/flan-t5-large")

In [None]:
args = TTSettings(num_beams=5, min_length=1)

In [None]:
test_df['preds_2'] = test_df['input'].apply(lambda x:  happy_tt.generate_text("grammar: {}".format(x), args=args).text)

In [None]:
test_df.to_csv('t5_large_corrected.csv')

In [None]:
for i, row in test_df.iterrows():
    try:
        test_df.at[i, 'gleu_2'] = gleu(row['preds_2'], row['output'], max_order=20)
    except ZeroDivisionError:
        pass

In [None]:
test_df['gleu_2'].mean()

In [None]:
from nltk.translate.bleu_score import sentence_bleu


In [None]:
for i, row in test_df.iterrows():
    try:
        test_df.at[i, 'bleu'] = sentence_bleu([row['preds']], row['output'])
        test_df.at[i, 'bleu_2'] = sentence_bleu([row['preds_2']], row['output'])
    except ZeroDivisionError:
        pass

In [None]:
test_df.bleu.mean()

In [None]:
test_df.bleu_2.mean()

In [None]:
from sklearn.metrics import precision_score


In [None]:
matching_sentences = sum([1 for ref, corr in zip(test_df['output'], test_df['preds_2']) if ref == corr])
precision = matching_sentences / len(test_df['preds'].values)

In [None]:
!pip install Levenshtein
from Levenshtein import distance

In [None]:
for i, row in test_df.iterrows():
    try:
        test_df.at[i, 'distance'] = distance([row['preds']], row['output'])
        test_df.at[i, 'distance_2'] = distance([row['preds_2']], row['output'])
    except ZeroDivisionError:
        pass

In [None]:
test_df.distance.mean()

In [None]:
test_df.distance_2.mean()

Basically here we tried to aaply the t5 model, which was trained on the huge dataset and is applied through finding the grammatically correct translation.
It seems like the model performs even worse than language tool itself.
therefore, will try to implement this
https://towardsdatascience.com/deep-text-corrector-using-monotonic-attention-with-dataset-creation-1e1a3f5a1b9e