In [None]:
# !pip install evaluate

In [None]:
from transformers import AutoTokenizer, GPT2Tokenizer, T5Tokenizer
from transformers import DataCollatorForSeq2Seq, DataCollatorWithPadding, DefaultDataCollator
from transformers import TrainingArguments, Trainer
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer
from transformers import GPT2Model, AutoModel, T5ForConditionalGeneration

import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd

In [None]:
torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
### load train and val data
train = pd.read_csv('./wi+locness_data/train.csv')
validate = pd.read_csv('./wi+locness_data/validate.csv')
complete_train = pd.concat([train, validate], axis = 0)

test = pd.read_csv('./wi+locness_data/test.csv')


In [None]:
### model
model_name = 't5-small'
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name).to(torch_device)
model.config.max_length = 300
model.config.min_length = None

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [None]:
# build dataloader
class GrammarDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, tokenizer, print_text=False):
        self.dataset = dataset
        self.pad_to_max_length = False
        self.tokenizer = tokenizer
        self.print_text = print_text
        self.max_len = 300

    def __len__(self):
        return len(self.dataset)

    def tokenize_data(self, example):
        input_, target_ = example['incorrect'], example['correct']

        tokenized_inputs = tokenizer(input_, pad_to_max_length=self.pad_to_max_length,
                                            max_length=self.max_len,
                                            return_attention_mask=True,
                                            truncation = True)

        tokenized_targets = tokenizer(target_, pad_to_max_length=self.pad_to_max_length,
                                            max_length=self.max_len,
                                            return_attention_mask=True,
                                            truncation = True)

        inputs = {"input_ids": tokenized_inputs['input_ids'],
            "attention_mask": tokenized_inputs['attention_mask'],
            "labels": tokenized_targets['input_ids']
        }
        return inputs


    def __getitem__(self, index):
        inputs = self.tokenize_data(self.dataset.iloc[index])
        # inputs = self.tokenize_data(self.dataset[index])

        if self.print_text:
            for k in inputs.keys():
                print(k, len(inputs[k]))
        return inputs

ds_train = GrammarDataset(train, tokenizer)
ds_val = GrammarDataset(validate, tokenizer)
ds_complete_train = GrammarDataset(complete_train, tokenizer)



# data collator for easy batching
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model, padding='longest', return_tensors='pt')


In [None]:
!pip install transformers[torch]
!pip install accelerate==0.27.2 -U



In [None]:
import accelerate


In [None]:
### Training
batch_size = 16
save_dir = "t5_small_complete_config_2e-4_lr_6epochs"

args = Seq2SeqTrainingArguments(output_dir= save_dir,
                        evaluation_strategy = 'no',
                        # evaluation_strategy="epoch",
                        per_device_train_batch_size=batch_size,
                        per_device_eval_batch_size=batch_size,
                        learning_rate=2e-4,
                        num_train_epochs=6,
                        weight_decay=0.01,
                        save_total_limit=1,
                        fp16 = True,
                        gradient_accumulation_steps = 6,
                        # eval_steps = 1000,
                        # max_grad_norm = 0.5,
                        # lr_scheduler_type = 'constant'
                        # save_steps = 250,
                        # load_best_model_at_end=True,
                        # logging_dir="/logs",
                        # report_to="wandb"
                        )


trainer = Seq2SeqTrainer(model=model,
                args=args,
                train_dataset = ds_complete_train,
                # train_dataset = ds_train,
                # eval_dataset = ds_val,
                tokenizer = tokenizer,
                data_collator = data_collator
                # compute_metrics=compute_metrics
                )

In [None]:
trainer.train()
trainer.save_model()

Step,Training Loss
500,0.3821
1000,0.3105
1500,0.2827
2000,0.2702


Non-default generation parameters: {'max_length': 300, 'min_length': None, 'do_sample': True, 'num_beams': 4, 'temperature': 1.5}
Non-default generation parameters: {'max_length': 300, 'min_length': None, 'do_sample': True, 'num_beams': 4, 'temperature': 1.5}
Non-default generation parameters: {'max_length': 300, 'min_length': None, 'do_sample': True, 'num_beams': 4, 'temperature': 1.5}
Non-default generation parameters: {'max_length': 300, 'min_length': None, 'do_sample': True, 'num_beams': 4, 'temperature': 1.5}
Non-default generation parameters: {'max_length': 300, 'min_length': None, 'do_sample': True, 'num_beams': 4, 'temperature': 1.5}


In [None]:
# gpu_info = !nvidia-smi
# gpu_info = '\n'.join(gpu_info)
# if gpu_info.find('failed') >= 0:
#   print('Not connected to a GPU')
# else:
#   print(gpu_info)

Evaluation with GLEU metric

In [None]:
### load fine-tuned model
tokenizer = T5Tokenizer.from_pretrained(save_dir)
model = T5ForConditionalGeneration.from_pretrained(save_dir).to(torch_device)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [None]:
def correct_grammar(sample, num_return_sequences = 1, pd_input = True, beam = True):
  if pd_input:
    input = tokenizer(sample['incorrect'],truncation=True,padding='max_length',max_length=300, return_tensors="pt").to(torch_device)
  else:
    input = tokenizer(sample,truncation=True,padding='max_length',max_length=300, return_tensors="pt").to(torch_device)

  if beam:
    tgt_code = model.generate(**input,
                                  max_length=300,
                                  num_beams=4,
                                  num_return_sequences=num_return_sequences,
                                  do_sample = True, # use sampling instead of greedy decoding
                                  temperature=1.5 # set together with do_sample = True, controls the value used to modulate the next token ability
                                )
  else:
    tgt_code = model.generate(**input,
                                  max_length=300
                                )

  tgt_text = tokenizer.batch_decode(tgt_code, skip_special_tokens=True) # use batch decode because we allowed num_return_seq to be greater than 1

  # return tgt_text
  return tgt_text[0] # list object

In [None]:
# # testing on simple sentences
# i = 82
# incorrect = test.iloc[i]['incorrect']
# correct = test.iloc[i]['correct']
# print(f'prediction: {correct_grammar(test.iloc[i], num_return_sequences=1)}')

incorrect = 'I am looking forway to see you soon.'
correct = 'I am looking forward to seeing you soon.'
prediction = correct_grammar(incorrect, num_return_sequences = 1, pd_input = False, beam = False)

print(f'original: {incorrect}')
print(f'reference: {correct}')
print(f'prediction: {prediction}')

original: I am looking forway to see you soon.
reference: I am looking forward to seeing you soon.
prediction: I am looking forward to seeing you soon.


In [None]:
### gleu metric, better than bleu because it takes into account structural similarity
import nltk
nltk.download('punkt')
from nltk.translate.gleu_score import sentence_gleu
import numpy as np

# takes lists of decoded prediction and reference labels and compute gleu
def compute_metrics(eval_pred):
    preds, labels = eval_pred

    preds = [pred.strip() for pred in preds]
    labels = [label.strip() for label in labels]
    # print(preds)

    # sentence_gleu takes list(list(word)) as reference, and list(word) as prediction: https://www.nltk.org/api/nltk.translate.gleu_score.html
    gleu_scores = [sentence_gleu([ref.split()], pred.split()) for pred, ref in zip(preds, labels)]
    # print(gleu_scores)
    result = {"gleu": np.mean(gleu_scores) * 100} # *100 so that gleu score in range [0,100] instead of [0,1]
    return {k: round(v, 4) for k, v in result.items()}

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [None]:
# text generation
preds = test.apply(correct_grammar, axis = 1)


# gleu
labels = test['correct']
# add prediction to dataframe
test['preds'] =  [pred for pred in preds]
test.dropna(inplace = True)

after = compute_metrics((test['preds'], test['correct']))
print(f'gleu between predicted correct sentences and true correct sentences: {after}')



In [None]:
test.to_csv('./wi+locness/test_preds.csv')