In [None]:
import torch
import numpy as np
import pandas as pd
from typing import Dict
import torch
from datasets import load_dataset
from transformers import DataCollatorWithPadding

from datasets import load_dataset
from transformers import (
    AutoConfig,
    AutoModelForSequenceClassification,
    AutoTokenizer,
    EvalPrediction,
    Trainer,
    TrainingArguments,
    set_seed,
)

import random

from copy import deepcopy
from tqdm import tqdm
from typing import List, Optional
from abc import ABC, abstractmethod

## WordNet


In [None]:

import nltk

nltk.download('wordnet')

from nltk.corpus import wordnet as wn

## Schemas & Classes

In [None]:
# Generic classes for augmenting datasets

class DataAugmentationStep(ABC):
    
    @abstractmethod
    def __init__(self, probability: float):
        self.probability = probability
    
    @abstractmethod
    def apply(self, samples: List[Dict]) -> Optional[List[Dict]]:
        pass

class DataAugmentationPipeline:
    
    def __init__(self, steps: List[DataAugmentationStep]):
        """__init__ DataAugmentationPipeline 
        
        A collection of data augmentation steps

        Parameters
        ----------
        steps : List[DataAugmentationStep]
            A list of data augmentation steps, objects that implement the apply method
        """
        self.steps = steps
        
        
    def apply(self, sample: Dict) -> Optional[List[Dict]]:
        """apply Apply the pipeline to a sample

        Parameters
        ----------
        sample : Dict
            A sample of our NLP dataset

        Returns
        -------
        Dict
            An augmented sample
        """
        
        sample = [sample]
        for step in self.steps:
            sample = step.apply(sample)
            if sample is None:
                return None
        return sample

class DatasetAugmentation:
    
    def __init__(self, pipeline: DataAugmentationPipeline, percentage: float, random_sample: bool = False):
        """__init__ Constructor for the DatasetAugmentation class

        Parameters
        ----------
        pipeline : DataAugmentationPipeline
            A pipeline of data augmentation steps
        percentage : float
            The percentage of the dataset to augment
        random_sample : bool, optional
            Whether to sample randomly from the dataset, if false the dataset gets converted to an 
            augmented version by extracting the indices sequentially from zero up to a given percentage
            of its length, otherwise a given percentage of its indices get sampled without replacement, 
            by default False
        """
        
        
        self.pipeline = pipeline
        self.percentage = percentage
        self.random_sample = random_sample

    def augment(self, dataset) -> Optional[pd.DataFrame]:
        """augment Augment a dataset

        Parameters
        ----------
        dataset : Dataset
            A dataset object from the HuggingFace datasets library

        Returns
        -------
        Dataset
            An augmented dataset
        """
        n_samples = len(dataset)
        n_samples_to_augment = int(n_samples * self.percentage)
        if self.random_sample:
            indices = np.random.choice(n_samples, n_samples_to_augment, replace=False)
        else:
            indices = np.arange(n_samples_to_augment)
        augmented_samples = []
        discarded = 0
        
        try:
            for i in tqdm(indices):
                sample = dataset.iloc[i].to_dict()
                out = self.pipeline.apply(sample)
                if out is not None and len(out) > 0:
                    augmented_samples.extend([{
                        'premise': augmented_sample['premise'],
                        'hypothesis': augmented_sample['hypothesis'],
                        'label': augmented_sample['label']
                    } for augmented_sample in out]
                )
                else:
                    discarded += 1
        except Exception as e:
            print(e)
            print(f"Discarded {discarded} samples")
            raise e
        
        # convert list of dictionaries to pandas dataframe
        augmented_dataset = pd.DataFrame.from_dict(augmented_samples)
        
        print(f"Augmentation done, discarded {discarded} samples")
        return augmented_dataset

In [None]:



# Augmentation steps

class Synonimization(DataAugmentationStep):
    
    def __init__(self, probability: float, apply_to: str, max_synonyms: int = 5):
        """__init__ Constructor for the Synonimization class

        Parameters
        ----------
        probability : float
            The probability of applying the step on each token
        apply_to : str
            The key of the sample to apply the step to, can be 'hypothesis' or 'premise'
        max_synonyms : int, optional
            The maximum number of synonyms to generate, by default 5
        """
        self.probability = probability
        self.apply_to = apply_to
        self.max_synonyms = max_synonyms

    def apply(self, samples: List[Dict]) -> Optional[List[Dict]]:
        """apply Apply the step to a set of samples

        Parameters
        ----------
        samples : List[Dict]
            A set of samples from our NLP dataset

        Returns
        -------
        Optional[List[Dict]]
            An augmented sample
        """
        
        result_set = []
        
        for sample in samples:
            try:
                
                result_set.append(sample)
                
                for i, token in enumerate(sample["wsd"][self.apply_to]):
                    if np.random.rand() < self.probability:
                        text = token["text"]
                        wsd_wnet = token["wnSynsetOffset"]
                        if wsd_wnet == "O":
                            continue
                        
                        synonyms = self.get_synonym(text, wsd_wnet)

                        for j, synonym in enumerate(synonyms):
                            
                            if synonym == text:
                                continue
                            
                            if j >= self.max_synonyms:
                                break
                            
                            new_sample = deepcopy(sample)
                            new_sample["wsd"][self.apply_to][i]["text"] = synonym
                            new_sample["srl"][self.apply_to]["tokens"][i]["rawText"] = synonym
                            text = " ".join([token["text"] for token in new_sample["wsd"][self.apply_to]])
                            new_sample[self.apply_to] = text
                            result_set.append(new_sample)
            except:
                print(f"Error in sample: {sample}")
                print(f"WSD: {sample["wsd"][self.apply_to]}")
                print(f"SRL: {sample["srl"][self.apply_to]}")
                print(f"Lengths: {len(sample["wsd"][self.apply_to])}, {len(sample["srl"][self.apply_to]["tokens"])}")
                raise ValueError("Error in sample")
                    
        return result_set

    def get_synonym(self, text, wsd_wnet):
        """get_synonym Get a synonym for a given word

        Parameters
        ----------
        text : str
            The word to find a synonym for
        wsd_wnet : str
            The WordNet synset offset

        Returns
        -------
        str
            A synonym for the given word
        """
        
        synsets = wn.synsets(text)
        
        strip_char = lambda s : int("".join([c for c in s if c.isdigit()]))
        
        for synset in synsets:
            if synset.offset() == strip_char(wsd_wnet):
                synonyms = [" ".join(w.name().split("_")) for w in synset.lemmas()]
                # shuffle
                for synonym in random.sample(synonyms, len(synonyms)):
                    yield synonym
        yield text

class CopulaInverter(DataAugmentationStep):
    
    def __init__(self, probability: float):
        """__init__ Constructor for the CopulaInverter class

        Parameters
        ----------
        probability : float
            The probability of applying the step on each token

        """
        self.probability = probability
        
    def apply(self, samples: List[Dict]) -> Optional[List[Dict]]:
        """apply Apply the step to a sample

        Parameters
        ----------
        sample : List[Dict]
            A sample of our NLP dataset (a list of dictionaries)

        Returns
        -------
        Optional[List[Dict]]
            A set of augmented samples, or None if the sample is discarded
        """
        result_set = []
        
        for sample in samples:
            
            if np.random.rand() > self.probability:
                result_set.append(sample)
                continue
            
            new_sample = deepcopy(sample)
            
            srl = new_sample["srl"]
            
            existential_copula = None
            
            # linear search for an IS copula
            for ann in srl["hypothesis"]["annotations"]:
                frame = ann["verbatlas"]["frameName"]
                token_lemma = srl["hypothesis"]["tokens"][ann["tokenIndex"]]["rawText"]
            
                if frame == "COPULA" and (token_lemma == "is" or token_lemma == "was" or token_lemma == "were" or token_lemma == "are"):
                    
                    existential_copula = ann
                    break
                    
            if existential_copula is None:
                result_set.append(new_sample)
                continue
            
            try:
                slice_1 = existential_copula["verbatlas"]["roles"][0]["span"]
                slice_2 = existential_copula["verbatlas"]["roles"][1]["span"]
                
                if slice_1[0] < slice_2[0]:
                    slice_1, slice_2 = slice_2, slice_1
                
                slice_1 = slice(slice_1[0], slice_1[1])
                slice_2 = slice(slice_2[0], slice_2[1])
                
            except IndexError:
                # Somehow the copula does not have the right spans, we skip the augmentation
                # and keep the sample as is
                result_set.append(new_sample)
                continue
            
            # swap the two slices in the wsd token list
            
            new_wsd = []
            new_indices = list(range(len(sample["wsd"]["hypothesis"])))
            
            # swap the two slices, hopefully we invert the sentence (passive -> active, active -> passive)
            new_indices[slice_1], new_indices[slice_2] = new_indices[slice_2], new_indices[slice_1]

            for i in new_indices:
                new_wsd.append(sample["wsd"]["hypothesis"][i])
                
            if len(new_wsd) != len(new_sample["wsd"]["hypothesis"]):
                # This should NEVER happen if we filtered the new_samples before in the pipeline
                print(f"Length mismatch: {len(new_wsd)} != {len(new_sample['wsd']['hypothesis'])}")
                print(f"Sample was: {new_sample}")
                print(f"Slice 1: {slice_1}, Slice 2: {slice_2}")
                text_new_sample = " ".join([token["text"] for token in new_sample["wsd"]["hypothesis"]])
                text_augmented = " ".join([token["text"] for token in new_wsd])
                print(f"Text new_sample: {text_new_sample}")
                print(f"Text augmented: {text_augmented}")
            
            new_sample["wsd"]["hypothesis"] = new_wsd
            new_sample["hypothesis"] = " ".join([token["text"] for token in new_wsd])
            
            result_set.append(new_sample)

        return result_set
    
class LengthFilter(DataAugmentationStep):
    
    def __init__(self):
        pass
    
    def apply(self, samples: List[Dict]) -> Optional[List[Dict]]:
        """apply Apply the step to a sample

        Parameters
        ----------
        sample : Dict
            A sample of our NLP dataset

        Returns
        -------
        Dict
            An augmented sample
        """
        
        predicate = lambda sample : (
            len(sample["wsd"]["premise"]) == len(sample["srl"]["premise"]["tokens"]) 
            and 
            len(sample["wsd"]["hypothesis"]) == len(sample["srl"]["hypothesis"]["tokens"])
            )
        
        return list(filter(predicate, samples))

class CopulaContradictor(DataAugmentationStep):
    
    def __init__(self, probability: float):
        """__init__ Constructor for the CopulaContradictor class

        Parameters
        ----------
        probability : float
            The probability of applying the step on each token

        """
        self.probability = probability
        
    def apply(self, samples: List[Dict]) -> Optional[List[Dict]]:
        """apply Apply the step to a sample

        Parameters
        ----------
        sample : List[Dict]
            A sample of our NLP dataset (a list of dictionaries)

        Returns
        -------
        Optional[List[Dict]]
            A set of augmented samples, or None if the sample is discarded
        """
        result_set = []
        
        for sample in samples:
            
            if np.random.rand() > self.probability:
                result_set.append(sample)
                continue
            
            new_sample = deepcopy(sample)
            
            srl = new_sample["srl"]
            
            existential_copula = None
            
            # linear search for an IS copula
            for ann in srl["hypothesis"]["annotations"]:
                frame = ann["verbatlas"]["frameName"]
                token_lemma = srl["hypothesis"]["tokens"][ann["tokenIndex"]]["rawText"]
            
                if frame == "COPULA" and token_lemma == "is":
                    existential_copula = ann
                    break
                    
            if existential_copula is None:
                result_set.append(new_sample)
                continue
        
            srl["hypothesis"]["tokens"][ann["tokenIndex"]]["rawText"] = "is not"
            new_sample["label"] = (
            "ENTAILMENT" if new_sample["label"] == "CONTRADICTION" 
                            else "CONTRADICTION" if new_sample["label"] == "ENTAILMENT" 
                            else new_sample["label"]
            )
            text = " ".join([token["rawText"] for token in srl["hypothesis"]["tokens"]])
            new_sample["hypothesis"] = text
            new_sample["wsd"]["hypothesis"][ann["tokenIndex"]]["text"] = "is not"

            result_set.append(new_sample)
            
        return result_set

In [None]:
def compare(sample):
    
    premise = sample["premise"]
    hypothesis = sample["hypothesis"]
    wsd_premise = sample["wsd"]["premise"]
    wsd_hypothesis = sample["wsd"]["hypothesis"]
    
    print("Premise")
    print(premise)
    print("Hypothesis")
    print(hypothesis)
    print("WSD Premise")
    print(" ".join([token["text"] for token in wsd_premise]))
    print("WSD Hypothesis")
    print(" ".join([token["text"] for token in wsd_hypothesis]))

## WordNet

In [None]:

SUBSET_PERCENT = 1.00
augmentor = DatasetAugmentation(
    pipeline=DataAugmentationPipeline([
        LengthFilter(),
        Synonimization(probability=0.3, apply_to="premise", max_synonyms=2),
        CopulaInverter(probability=0.5),
        CopulaContradictor(probability=0.25),
        Synonimization(probability=0.35, apply_to="hypothesis", max_synonyms=3),
    ]),
    percentage=SUBSET_PERCENT,
    random_sample=False
)

In [None]:
def print_sample(ds, idx):
    for f in list(("premise", "hypothesis", "label")):
        print(f"{f}: {ds[f][idx]}")

DATASET_NAME = "tommasobonomo/sem_augmented_fever_nli"

set_seed(42)

In [None]:
ds = load_dataset(DATASET_NAME)

In [None]:
train_data = ds["train"].to_pandas()
val_data = ds["validation"].to_pandas()

In [None]:
# Separate augmentation for separate files, we do not want to leak information between train and validation

augmented_train_data = augmentor.augment(train_data)
augmented_val_data = augmentor.augment(val_data)

In [None]:
print(f"Grow factor: {len(augmented_train_data) / (len(train_data) * SUBSET_PERCENT) * 100:.2f}%")

In [None]:
augmented_train_data.to_json("augmented_train.jsonl", orient="records", lines=True)
augmented_val_data.to_json("augmented_val.jsonl", orient="records", lines=True)