In [None]:
import pandas as pd
import os
import numpy as np
from transformers import AutoTokenizer, AutoModelForSequenceClassification, RobertaConfig

from smart_pytorch import SMARTLoss, kl_loss, sym_kl_loss
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
from torch.utils.data import (
    Dataset,
    DataLoader,
    RandomSampler,
    SequentialSampler
)
import math
from transformers.optimization import (
    AdamW,
    get_linear_schedule_with_warmup
)
from sklearn.metrics import (
    confusion_matrix,
    matthews_corrcoef,
    accuracy_score,
    roc_curve,
    auc,
    average_precision_score,
    f1_score,
)
from scipy.special import softmax
from torch.nn import CrossEntropyLoss
from tqdm import tqdm

In [None]:
# Define parameters for the fine-tuning task

model_name = "roberta-large"

num_labels = 3
device = torch.device("cuda")

tokenizer_name = model_name

max_seq_length = 128
train_batch_size = 8
test_batch_size = 8
warmup_ratio = 0.06
weight_decay=0.0
gradient_accumulation_steps = 1
num_train_epochs = 20
learning_rate = 1e-05
adam_epsilon = 1e-08

In [None]:
# Define a classification head and load the pre-trained RoBERTa model and tokenizer

class SMARTRobertaClassificationModel(nn.Module):

    def __init__(self, model, weight = 0.02):
        super().__init__()
        self.model = model
        self.weight = weight

    def forward(self, input_ids, attention_mask, labels):

        # Get initial embeddings
        embed = self.model.roberta.embeddings(input_ids)

        # Define eval function
        def eval(embed):
            outputs = self.model.roberta(inputs_embeds=embed, attention_mask=attention_mask)
            pooled = outputs[0]
            logits = self.model.classifier(pooled)
            return logits

        # Define SMART loss
        smart_loss_fn = SMARTLoss(eval_fn = eval, loss_fn = kl_loss, loss_last_fn = sym_kl_loss)
        # Compute initial (unperturbed) state
        state = eval(embed)
        # Apply classification loss
        loss = F.cross_entropy(state.view(-1, 3), labels.view(-1))
        # Apply smart loss
        loss += self.weight * smart_loss_fn(embed, state)

        return state, loss


tokenizer = AutoTokenizer.from_pretrained(model_name)

config = RobertaConfig.from_pretrained(model_name, num_labels=num_labels)
model = AutoModelForSequenceClassification.from_pretrained(tokenizer_name,config=config)

model_smart = SMARTRobertaClassificationModel(model)

In [None]:
# Load the dataset for classification task

all_df = pd.read_csv('AllAgree.csv')

label = []
for index, row in all_df.iterrows():
    if row['Sentiment'] == 'negative':
        label.append(0)
    elif row['Sentiment'] == 'neutral':
        label.append(1)
    else:
        label.append(2)

label_df=pd.DataFrame(label)
label_df.columns = ['Class']
all_df = all_df.join(label_df)

In [None]:
class MyClassificationDataset(Dataset):

    def __init__(self, data, tokenizer):
        text, labels = data
        self.examples = tokenizer(text=text,text_pair=None,truncation=True,padding="max_length",
                                  max_length=max_seq_length,return_tensors="pt")
        self.labels = torch.tensor(labels, dtype=torch.long)


    def __len__(self):
        return len(self.examples["input_ids"])

    def __getitem__(self, index):
        return {key: self.examples[key][index] for key in self.examples}, self.labels[index]

train_set_size = int(len(all_df) * 0.8)
test_set_size = len(all_df) - train_set_size
train, test = data.random_split(all_df, [train_set_size, test_set_size], generator=torch.Generator().manual_seed(0))

train_df = train.dataset.iloc[train.indices]
train_examples = (train_df['Sentence'].astype(str).tolist(), train_df['Class'].tolist())
train_dataset = MyClassificationDataset(train_examples,tokenizer)

test_df = train.dataset.iloc[test.indices]
test_examples = (test_df['Sentence'].astype(str).tolist(), test_df['Class'].tolist())
test_dataset = MyClassificationDataset(test_examples,tokenizer)

In [None]:
def get_inputs_dict(batch):
    inputs = {key: value.squeeze(1).to(device) for key, value in batch[0].items()}
    inputs["labels"] = batch[1].to(device)
    return inputs

train_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(train_dataset,sampler=train_sampler,batch_size=train_batch_size)

test_sampler = SequentialSampler(test_dataset)
test_dataloader = DataLoader(test_dataset, sampler=test_sampler, batch_size=test_batch_size)

In [None]:
# Define parameters for optimizer and scheduler

t_total = len(train_dataloader) // gradient_accumulation_steps * num_train_epochs
optimizer_grouped_parameters = []
custom_parameter_names = set()
no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters.extend(
    [
        {
            "params": [
                p
                for n, p in model_smart.named_parameters()
                if n not in custom_parameter_names and not any(nd in n for nd in no_decay)
            ],
            "weight_decay": weight_decay,
        },
        {
            "params": [
                p
                for n, p in model_smart.named_parameters()
                if n not in custom_parameter_names and any(nd in n for nd in no_decay)
            ],
            "weight_decay": 0.0,
        },
    ]
)

warmup_steps = math.ceil(t_total * warmup_ratio)
optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate, eps=adam_epsilon)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps, num_training_steps=t_total)

In [None]:
# Methods to compute accuracy of predictions

def compute_metrics(preds, model_outputs, labels, eval_examples=None, multi_label=False):
    assert len(preds) == len(labels)
    mismatched = labels != preds
    mcc = matthews_corrcoef(labels, preds)
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average='macro')
    con_m = confusion_matrix(labels, preds, labels=[0, 1, 2])
    return (
        {
            **{"mcc": mcc, "acc":acc, "f1": f1},
        },
        con_m
    )

def print_confusion_matrix(result):
    print('confusion matrix:')
    print('            predicted    ')
    print('          0     |     1')
    print('    ----------------------')
    print('   0 | ',format(result['tn'],'5d'),' | ',format(result['fp'],'5d'))
    print('gt -----------------------')
    print('   1 | ',format(result['fn'],'5d'),' | ',format(result['tp'],'5d'))
    print('---------------------------------------------------')

In [None]:
# Training and evalustion

torch.cuda.empty_cache()

model_smart.to(device)

model_smart.zero_grad()


# Training
for epoch in range(num_train_epochs):

    model_smart.train()
    epoch_loss = []

    for batch in tqdm(train_dataloader):
        batch = get_inputs_dict(batch)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        logits, loss = model_smart(input_ids, attention_mask=attention_mask, labels=labels)
        loss.backward()
        optimizer.step()
        scheduler.step()
        model_smart.zero_grad()
        epoch_loss.append(loss.item())


    # Evaluate model with test_df at the end of the epoch.
    eval_loss = 0.0
    nb_eval_steps = 0
    n_batches = len(test_dataloader)
    preds = np.empty((len(test_dataset), num_labels))
    out_label_ids = np.empty((len(test_dataset)))
    model_smart.eval()

    for i,test_batch in enumerate(test_dataloader):
        test_batch = get_inputs_dict(test_batch)
        input_ids = test_batch['input_ids'].to(device)
        attention_mask = test_batch['attention_mask'].to(device)
        labels = test_batch['labels'].to(device)
        logits, tmp_eval_loss = model_smart(input_ids, attention_mask=attention_mask, labels=labels)
        eval_loss += tmp_eval_loss.item()

        nb_eval_steps += 1
        start_index = test_batch_size * i
        end_index = start_index + test_batch_size if i != (n_batches - 1) else len(test_dataset)
        preds[start_index:end_index] = logits.detach().cpu().numpy()
        out_label_ids[start_index:end_index] = test_batch["labels"].detach().cpu().numpy()

    eval_loss = eval_loss / nb_eval_steps
    model_outputs = preds
    preds = np.argmax(preds, axis=1)
    result, con_m = compute_metrics(preds, model_outputs, out_label_ids)

    # SAVE
    PATH = "SMART_RoBERTa_Large_FinancialPhraseBank/AllAgree/epoch"+str(epoch)
    torch.save(model_smart.state_dict(), PATH)

    print('epoch',epoch,'Training avg loss',np.mean(epoch_loss))
    print('epoch',epoch,'Testing  avg loss',eval_loss)
    print(result)
    print(con_m)
    print('---------------------------------------------------\n')