In [1]:
#!pip install lightning



In [2]:
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import pytorch_lightning as pl
from transformers import AutoModelForSequenceClassification, AutoTokenizer

import pandas as pd
from scipy.stats import spearmanr, pearsonr
from sklearn.model_selection import train_test_split
import os
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
import random
from sklearn.model_selection import KFold
from pytorch_lightning.callbacks import EarlyStopping

import re
import string

import nltk
from nltk.corpus import wordnet as wn

In [3]:
class str_dataset(torch.utils.data.Dataset):
    def __init__(self, dataframe, syn_replace = False, change_random_letter = False, sep = '[SEP]',seed = None, model_type : str = 'distilbert-base-uncased'):
        self.dataframe = dataframe
        self.tokenizer = AutoTokenizer.from_pretrained(model_type)
        self.syn_replace = syn_replace
        self.change_random_letter = change_random_letter
        self.seed = seed

        #donwload nltk data
        nltk.download('wordnet')
        nltk.download('averaged_perceptron_tagger')
        nltk.download('punkt')

        #replace the dataset seperator with the usual custom Seperator Token
        self.dataframe['input'] = self.dataframe.apply(lambda row : row['Text'].replace('\n',sep),axis = 1)

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Extract the features and target from the DataFrame
        # Adjust this based on your DataFrame structure
        features = self.dataframe['input'].loc[idx]

        if self.syn_replace:
            features = self.apply_syn_replacement(features, p = 0.5)
        if self.change_random_letter:
            features = self.apply_change_letter(features, p = 0.5)

        token = self.tokenizer(features, return_tensors='pt', truncation='longest_first', padding='max_length',max_length=265)

        input_ids = token['input_ids'].squeeze()
        attention_mask = token['attention_mask'].squeeze()

        score = self.dataframe['Score'].loc[idx]

        score = torch.tensor(score, dtype=torch.float32).unsqueeze(dim=0)  # Adjust the dtype as needed

        return input_ids, attention_mask, score

    #takes the first sentence of the 2 sentences which are compared as an argument
    #returns a tuple of of the chosen word and pos tag
    def get_random_word(self,seq : str, min_len : int = 3,seed : int = None, sep = '[SEP]', syn_replace : bool = False) -> tuple:
        #random.seed(seed)
        seq = seq.replace(sep, '')
        tokens = nltk.word_tokenize(seq)
        pos_tags = nltk.pos_tag(tokens)
        if syn_replace:
            pos_tags = self.syn_replace_choice(pos_tags)

        pos_tags = list(set(pos_tags))
        candidates = [word for word in pos_tags if len(word[0]) >= min_len]
        if(candidates == []):
            return (None,None)

        return random.choice(candidates)

    #takes a pos_tagged sentence as function argument
    def syn_replace_choice(self,pos_tags : list):
        replacable_tags = ['JJ', 'JJR', 'JJS','RB', 'RBR', 'RBS']
        filtered_data = [item for item in pos_tags if item[1] in replacable_tags]
        return filtered_data

    def get_synonym(self,word : str, pos : str, seed : int = None) -> str:
        synonyms = []
        if pos == None:
            return word
        for syn in wn.synsets(word, pos = pos):
            for lemma in syn.lemmas():
                synonyms.append(lemma.name())
        #random.seed(seed)
        if len(synonyms) <= 1:
            return word
        return random.choice(list(set(synonyms)))

    def find_word_type(self,target_tag):
        word_type_dict = {wn.ADJ : ['JJ', 'JJR', 'JJS'],wn.ADV : ['RB', 'RBR', 'RBS']}
        for key, tag_list in word_type_dict.items():
            if target_tag in tag_list:
                return key
        return None  # Tag not found in any list technically not possible

    def apply_syn_replacement(self,seq : str, sep : str = '[SEP]', seed : int = None, p : float = 0.3):
        word = self.get_random_word(seq, sep = sep, syn_replace= True, seed = seed)
        if word[0] != None:
            seq = seq.replace(word[0],self.get_synonym(word[0], pos = self.find_word_type(word[1]), seed = seed))
        return seq


    #changes one random letter of the word
    #dont interchange first or last letter
    def replace_letter(self,word : str, seed : int = None)-> str:
        #random.seed(seed)
        if len(word) <=2:
            return word
        idx = random.randint(1,len(word)-2)
        mod_word = word[:idx] + random.choice(string.ascii_lowercase) + word[idx + 1:]
        return mod_word

    def apply_change_letter(self,seq : str, p : int = 0.3, sep :str = '[SEP]', seed = None):
        seq = seq.split(sep)
        word = self.get_random_word(seq[0])[0]
        #random.seed(seed)
        if word == None or random.random() < p:
            return seq[0] + sep + seq[1]
        return seq[0].replace(word,self.replace_letter(word,seed = seed)) + sep + seq[1]


class STR_DataModule(pl.LightningDataModule):
    def __init__(self, train_data, batch_size=32, syn_replace = False, change_random_letter = False,model_type : str = 'distilbert-base-uncased'):
        super().__init__()
        self.batch_size = batch_size
        self.train_data = train_data
        self.syn_replace = syn_replace
        self.change_random_letter = change_random_letter
        self.model_type = model_type

    def prepare_data_per_node(self):
    # Return True if you want prepare_data to be called on each node
        return False

    def prepare_data(self):
        train_df, val_df = train_test_split(self.train_data, test_size=0.2, random_state=42)
        self.train_dataset = str_dataset(train_df.reset_index(drop=True),syn_replace = self.syn_replace, change_random_letter = self.change_random_letter,model_type = self.model_type)
        self.val_dataset = str_dataset(val_df.reset_index(drop=True),syn_replace = False, change_random_letter = False,model_type = self.model_type)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size,num_workers = 2, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size,num_workers = 2)

In [4]:
#1.737e-6 LR for DistillBERT
class SentenceSimilarityModel(pl.LightningModule):
    def __init__(self,model_type = 'distilbert-base-uncased', learning_rate=8.912509381337456e-06):
        super(SentenceSimilarityModel, self).__init__()
        self.model = AutoModelForSequenceClassification.from_pretrained(model_type,ignore_mismatched_sizes=True,num_labels=1)

        self.loss_fn = torch.nn.MSELoss()
        self.learning_rate = learning_rate

    # def forward(self, input_ids,attention_mask):
    #     outputs = self.berta(input_ids, attention_mask=attention_mask)

    #     logits = outputs.logits
    #     return torch.sigmoid(logits)

    # def training_step(self, batch, batch_idx):
    #     input_ids, attention_mask, score = batch
    #     logits = self(input_ids,attention_mask)


    #     loss = F.mse_loss(logits, score)
    #     spearman = spearmanr(logits.detach().cpu().numpy(),score.detach().cpu().numpy()).statistic

    #     self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True)
    #     self.log("train_spearman", spearman,on_step = False, on_epoch=True, prog_bar=True)

    #     return loss

    def forward(self, input_ids, attention_mask, scores=None):
        return self.model(input_ids, attention_mask=attention_mask, labels=scores)

    def training_step(self, batch, batch_idx):
        input_ids, attention_mask, score = batch
        outputs = self(input_ids, attention_mask, score)
        loss = outputs.loss

        spearman = spearmanr(outputs.logits.detach().cpu().numpy(),score.detach().cpu().numpy()).statistic

        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log("train_spearman", spearman,on_step = False, on_epoch=True, prog_bar=True)


        return loss


    def validation_step(self, batch, batch_idx):
        input_ids, attention_mask, score = batch
        outputs = self(input_ids, attention_mask,score)
        logits = outputs.logits

        loss = F.mse_loss(logits, score)

        spearman = spearmanr(logits.detach().cpu().numpy(),score.detach().cpu().numpy()).statistic

        self.log("val_loss", loss,on_epoch = True, prog_bar = True)
        self.log("val_spearman",spearman,on_epoch = True, prog_bar = True)


    def test_step(self, batch, batch_idx):
        input_ids, attention_mask, score = batch
        outputs = self(input_ids, attention_mask,score)
        logits = outputs.logits
        # print("Score:", score)
        # print('Score_var:', score.var())

        loss = F.mse_loss(logits, score)
        spearman = spearmanr(logits.detach().cpu().numpy(), score.detach().cpu().numpy()).statistic

        self.log("test_loss", loss, on_epoch=True, prog_bar=True)
        self.log("test_spearman", spearman, on_epoch=True, prog_bar=True)


    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.learning_rate, fused = True)

        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, min_lr=1e-8)

        return {'optimizer': optimizer, 'lr_scheduler': {'scheduler': scheduler, 'monitor': 'val_loss'}}



In [None]:
if __name__ == '__main__':

    kf = KFold(n_splits=5, shuffle=True, random_state=42)


    train_data = pd.read_csv('/content/eng_train.csv')

    all_spearman_corrs = []

    model_type = "roberta-base"

    for fold, (train_idx, test_idx) in enumerate(kf.split(train_data)):

      str_datamodule = STR_DataModule(train_data = train_data.iloc[train_idx].reset_index(drop = True), batch_size = 16, syn_replace = False, change_random_letter = False,model_type = model_type)

      model = SentenceSimilarityModel(model_type = model_type)

      trainer = pl.Trainer(max_epochs=40,precision="16-mixed",callbacks=[EarlyStopping(monitor="val_loss", patience=5, mode="min")],accelerator='gpu')

      tuner = pl.tuner.Tuner(trainer)
      lr_finder = tuner.lr_find(model, datamodule = str_datamodule, min_lr = 1e-7,max_lr= 1e-2)

      # Pick point based on plot, or get suggestion
      new_lr = lr_finder.suggestion()

      #  update hparams of the model
      model.lr = new_lr

      trainer.fit(model, datamodule=str_datamodule)

      test_set = str_dataset(train_data.iloc[test_idx].reset_index(drop = True))

      test_dataloader = DataLoader(test_set, batch_size = 16, num_workers = 2,shuffle = True)


      trainer.test(ckpt_path="best", dataloaders = test_dataloader)


      # Calculate and print the average Spearman correlation
      average_spearman_corr = trainer.callback_metrics["test_spearman"]
      print(
          f"Average Spearman Correlation for Fold {fold + 1}: {average_spearman_corr}"
      )

      all_spearman_corrs.append(average_spearman_corr)


    # Calculate and print the overall average Spearman correlation
    overall_average_spearman_corr = sum(all_spearman_corrs) / len(all_spearman_corrs)
    print(
        f"Overall Average Spearman Correlation across all folds: {overall_average_spearman_corr}"
    )


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.out_proj.weight', 'classifier.dense.weight', 'classifier.dense.bias', 'classifier.out_proj.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
INFO:pytorch_lightning.utilities.rank_zero:Using 16bit Automatic Mixed Precision (AMP)
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger 

Finding best initial lr:   0%|          | 0/100 [00:00<?, ?it/s]

INFO:pytorch_lightning.tuner.lr_finder:LR finder stopped early after 99 steps due to diverging loss.
INFO:pytorch_lightning.tuner.lr_finder:Learning rate set to 8.912509381337456e-06
INFO:pytorch_lightning.utilities.rank_zero:Restoring states from the checkpoint path at /content/.lr_find_ac1d619f-e4d6-47d9-85d0-e60a708ea3fd.ckpt
INFO:pytorch_lightning.utilities.rank_zero:Restored all states from the checkpoint at /content/.lr_find_ac1d619f-e4d6-47d9-85d0-e60a708ea3fd.ckpt
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-da

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]