In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
%cd ./gdrive/MyDrive/DOD\ Dataset

In [None]:
!pip install transformers
!pip install sentence-transformers
!pip install datasets
!pip install evaluate

In [None]:
from torch.utils.data import Dataset, random_split
from pathlib import Path
from pandas import read_csv, concat
import numpy as np



class TypeDataset(Dataset):
    def __init__(self, data_dir: str):
        data_dir = Path(data_dir)

        type_df = read_csv(data_dir / "type_dataset.csv")
        self.len = len(type_df)
        self.examples = type_df

    def __getitem__(self, i):

        label = self.examples.iloc[i].label
        if label == "Policy":
            label = 0
        elif label == "Responsibility":
            label = 1
        elif label == "Procedures":
            label = 2

        return {
            "text": self.examples.iloc[i].text,
            "label": label,
        }

    def __len__(self):
        return len(self.examples)

In [None]:
dataset = TypeDataset('./')

In [None]:
len(dataset)

In [None]:
dataset[5000]

In [None]:
policies = [x for x in dataset if x['label'] == 0]
responsibility = [x for x in dataset if x['label'] == 1]
procedures = [x for x in dataset if x['label'] == 2]

In [None]:
print(len(policies), len(responsibility), len(procedures))

In [None]:
m_datasets = {}
m_datasets[0] = policies
m_datasets[1] = responsibility
m_datasets[2] = procedures

In [None]:
import random

train_dataset = []
val_dataset = []
test_dataset = []


def getRandomFromData(data, percent):
    size = int((percent * len(data)) / 100)

    output = []
    for i in range(size):
        element = random.choice(data)
        output.append(element)
        data.remove(element)

    return output, data

num_types = len(m_datasets.keys())

for i in range(num_types):
    dat = m_datasets[i]
    train_d, dat = getRandomFromData(dat, 60)
    val_d, dat = getRandomFromData(dat, 15)
    test_d, dat = getRandomFromData(dat, 25)

    train_dataset += train_d
    val_dataset += val_d
    test_dataset += test_d

In [None]:
print(len(train_dataset), len(val_dataset), len(test_dataset))

In [None]:
policies = [x for x in test_dataset if x['label'] == 0]
responsibility = [x for x in test_dataset if x['label'] == 1]
procedures = [x for x in test_dataset if x['label'] == 2]

In [None]:
print(len(policies), len(responsibility), len(procedures))

In [None]:
from datasets import load_dataset, Dataset, DatasetDict

# mydataset = Dataset.from_generator(dataset)

myTraindataset = Dataset.from_list(train_dataset)
myValdataset = Dataset.from_list(val_dataset)
myTestdataset = Dataset.from_list(test_dataset)


mydataset = DatasetDict({"train": myTraindataset, "validation": myValdataset, "test":myTestdataset})

In [None]:
from datasets import load_dataset
from transformers import AutoTokenizer, DataCollatorWithPadding

#raw_datasets = load_dataset("glue", "mrpc")

# facebook/bart-base
#"bert-base-uncased"
# roberta-base
# nlpaueb/legal-bert-base-uncased
# casehold/custom-legalbert


checkpoint = "mukund/privbert"
tokenizer = AutoTokenizer.from_pretrained(checkpoint)


def tokenize_function(example):
    return tokenizer(example["text"], truncation=True)


tokenized_datasets = mydataset.map(tokenize_function, batched=True)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
mydataset

In [None]:
mydataset['train'][0]

In [None]:
from transformers import TrainingArguments, Trainer, logging

# training_args = TrainingArguments("test-trainer")

default_args = {
    "output_dir": "./",
    "evaluation_strategy": "epoch",
    "num_train_epochs": 6,
    # "log_level": "error",
    "logging_steps": 1,
    "log_level" : "info",
    "report_to": "none",
}

training_args = TrainingArguments(per_device_train_batch_size=64,
                                  # gradient_accumulation_steps=32,
                                  gradient_checkpointing=True,
                                  # optim="adafactor",
                                  **default_args,
                                )

In [None]:
training_args

In [None]:
from transformers import AutoModelForSequenceClassification

model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=3)

In [None]:
from datasets import load_metric
from sklearn.metrics import mean_squared_error
import evaluate
import numpy as np


accuracy = evaluate.load("accuracy")

# imdb = load_dataset("imdb")

f1 = evaluate.load("f1")

# imdb = load_dataset("imdb")


#F1
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)

    microF1 = f1.compute(predictions=predictions, references=labels, average='micro')
    macroF1 = f1.compute(predictions=predictions, references=labels, average='macro')
    weightF1 = f1.compute(predictions=predictions, references=labels, average='weighted')

    return {"micro-F1": microF1, "macroF1": macroF1, "weightF1": weightF1}


# def compute_metrics(eval_pred):
#     predictions, labels = eval_pred
#     predictions = np.argmax(predictions, axis=1)
#     return accuracy.compute(predictions=predictions, references=labels)

In [None]:
from transformers import Trainer, TrainerCallback
import torch


trainer = Trainer(
    model,
    training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
    # callbacks=[GPUCallback()]
)

In [None]:
!nvidia-smi

In [None]:
import torch
torch.cuda.empty_cache()

In [None]:
trainer.train()

In [None]:
trainer.evaluate(eval_dataset=tokenized_datasets["test"])

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, r2_score, explained_variance_score
import numpy as np

In [None]:
eval_dataset = tokenized_datasets["test"]
predictions = trainer.predict(eval_dataset)

In [None]:
predictions.metrics

In [None]:
y_true = [np.argmax(p) for p in predictions.predictions]
y_pred = predictions.label_ids

In [None]:
target_names = ['policies', 'responsibility', 'procedures']

print(classification_report(y_true, y_pred, target_names=target_names))

In [None]:
trainer.save_model()

In [None]:
from transformers import Trainer, TrainerCallback
import torch


trainer = Trainer(
    model,
    training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
    # callbacks=[GPUCallback()]
)

In [None]:
trainer.evaluate()