#**FinBERT Model**

### Installing required libraries

In [None]:
!pip install transformers
!pip install datasets
!pip install sentencepiece
!pip install torch

In [1]:
import pandas as pd
import numpy as np

In [None]:
from transformers import AutoTokenizer
from datasets import Dataset
from transformers import DataCollatorWithPadding
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer

In [None]:
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

In [None]:
import torch
from torch.utils.checkpoint import checkpoint
import torch.nn as nn

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
torch.cuda.get_device_name(0)

### Define configuration

[Models](https://huggingface.co/models)

In [None]:
model_name = "ProsusAI/finbert"

### Prepare data

Get data and apply simple normalisation if necessary

In [None]:
from google.colab import drive
drive.mount('/gdrive')

In [None]:
df = pd.read_csv("/gdrive/MyDrive/data/labelled_marketaux_news_combined_2022-10-15.csv")
df

In [None]:
print(len(df))
print(len(df["3m"].dropna()))
print(len(df["6m"].dropna()))
print(len(df["1y"].dropna()))

In [None]:
df = df[["Relevant Texts", "3m"]]
df = df.dropna(subset=["3m"])
df = df.reset_index(drop = True)
len(df)

In [None]:
def normalise(text):
    text = text.lower()
    return text

df['Relevant Texts'] = df['Relevant Texts'].apply(normalise)

In [None]:
df.rename(columns = {'3m':'labels', 'Relevant Texts': 'text'}, inplace = True)

In [None]:
df["labels"] = df["labels"].astype(int)
df.head()

In [None]:
#0: Positive
#1: Negative
#2: Neutral
df.loc[df['labels'] == 0, 'labels'] = -1 # temporary
df.loc[df['labels'] == 2, 'labels'] = 0
df.loc[df['labels'] == -1, 'labels'] = 2
df.labels.value_counts()

Get tokeniser (for NSP)

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.model_max_length = 512

Split into train-valid

In [None]:
#train:valid:test = 60:20:20
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df["labels"])
train_df, valid_df = train_test_split(train_df, test_size=0.25, random_state=42, stratify=train_df["labels"])

### Create tokenised dataset

In [None]:
train_dataset = Dataset.from_pandas(train_df)
valid_dataset = Dataset.from_pandas(valid_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

tokenized_train_dataset = train_dataset.shuffle(seed=42).map(tokenize_function, batched=True)
tokenized_valid_dataset = valid_dataset.shuffle(seed=42).map(tokenize_function, batched=True)
tokenized_test_dataset = test_dataset.shuffle(seed=42).map(tokenize_function, batched=True)

In [None]:
print(tokenized_train_dataset)
print(tokenized_valid_dataset)

### Define Model

Dynamic Padding

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

Model

In [None]:
def model_init():
  return AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=3)

Training arguments

In [None]:
MODEL_PATH = "/gdrive/MyDrive/results/models/FinBERT_v2.0"

training_args = TrainingArguments(
    output_dir=MODEL_PATH,
    num_train_epochs=5,
    warmup_ratio=0.1,
    weight_decay=0.01,
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    seed=42,
    evaluation_strategy ="epoch",
    save_strategy = "epoch",
    save_total_limit = 1, # only best model is saved - older ones are overwritten
    load_best_model_at_end=True,
    lr_scheduler_type='cosine',
    # auto_find_batch_size=True, # Optimising
    # gradient_accumulation_steps=4,
    # fp16=True,
)

Combating class imbalance with class weights

In [None]:
# Calculating the weights
# Weightage = 1 - (num_of_samples_of_class)/(total_num_of_samples)
# less samples, more weightage

w_pos = 1-len(df[df['labels'] == 0])/len(df)
w_neg = 1-len(df[df['labels'] == 1])/len(df)
w_neu = 1-len(df[df['labels'] == 2])/len(df)


class_weights = torch.tensor(
    [w_neu, w_neg, w_pos]
).cuda()

class_weights

Define Trainer

In [None]:
# override the compute_loss function of the Trainer and introduce our class weighgts
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        # forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")
        # compute custom loss
        loss_fct = nn.CrossEntropyLoss(weight=class_weights) # class weighting
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

In [None]:
from sklearn.metrics import accuracy_score

def evaluation(eval_preds):
    logits, labels = eval_preds
    preds = np.argmax(logits, axis=-1)
    return {'accuracy': accuracy_score(labels, preds)}

In [None]:
trainer = CustomTrainer(
    model_init=model_init,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_valid_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=evaluation,
)

### Training

In [None]:
trainer.train(resume_from_checkpoint=True)
# trainer.train()

In [None]:
trainer.save_model(MODEL_PATH)

In [None]:
trained_finbert = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)

In [None]:
tokenized_test_dataset

In [None]:
trained_model = CustomTrainer(
    trained_finbert,
    tokenizer=tokenizer,
)
output = trained_model.predict(
    test_dataset=tokenized_test_dataset
)

In [None]:
output

### Evaluation

In [None]:
from sklearn.preprocessing import LabelEncoder

encoder = LabelEncoder()
df['labels'] = encoder.fit_transform(df['labels'])
encoder.inverse_transform([np.argmax(i) for i in output.predictions])
preds = [np.argmax(i) for i in output.predictions]

In [None]:
accuracy_score(test_df.labels, preds)