In [None]:
from utils.preprocessor import preprocess_data, remap_class
from sklearn.model_selection import train_test_split
from utils.Dataset import CustomDataset

from utils.eval import plot_confusion_matrix, plot_data_distribution
from utils.helper import create_dir_not_exist, file_exist

import pandas as pd
from tqdm import tqdm
import numpy as np
import random

import torch
from torch import nn
from torch.utils.data import DataLoader
from datasets import Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline, AutoConfig, Trainer, TrainingArguments
from commons.modeleval import f1metrics

In [None]:
model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
model_alternate_name = "roberta2022"

resources_dir = "resources"
    
model_dir = f"{resources_dir}/models"
model_bin_path = f"{model_dir}/bin"
tokenizer_path = f"{model_dir}/tokenizer"

other_dir = f"{resources_dir}/other" #for other constraints, instructions, blacklists, etc
path_blacklist_sentence = f"{other_dir}/blacklist_sentence.txt"

dataset_dir = f"dataset"
input_data = f"{dataset_dir}/input.csv"
output_cache = f"{dataset_dir}/output.csv"

all_plot_data_distribution_path = f"{output_cache}/eval/all_plot_data_distribution.png"
train_plot_data_distribution_path = f"{output_cache}/eval/train_plot_data_distribution.png"
test_plot_data_distribution_path = f"{output_cache}/eval/test_plot_data_distribution.png"
test_plot_confusion_matrix_path = f"{output_cache}/eval/test_plot_confusion_matrix.png"
multi_Score_fscore_path = f"{output_cache}/plot_multi_fscore.png"

text_column = "text"
label_column = "label"

label_mapping = {1: 0, 2: 1, 3: 2, 4: 3, 5: 4}
vis_label = ['Label1', 'Label2', 'Label3', 'Label4', 'Label5']
infer_mapping_firstlevel = {'LABEL_4': 5, 'LABEL_3': 4, 'LABEL_2': 3, 'LABEL_1': 2, 'LABEL_0': 1}
infer_mapping_secondlevel = {5:'VERY POSITIVE', 4:'POSITIVE', 3:'NEUTRAL', 2:'NEGATIVE', 1:'VERY NEGATIVE'}

seed = 42
max_length = 256
train_batch_size = 8
val_batch_size = 8
lr = 2e-5
num_epochs = 5
num_labels = 5
weight_decay = 0.01

configs = f"""
preprocessing_steps = remove_blacklist_sentences, remove_url, filter_non_english_words, remove_square_bracket
model_name = {model_name}
manual_seed = {seed}
max_length = {max_length}
train_batch_size = {train_batch_size}
val_batch_size = {val_batch_size}
lr = {lr}
num_epochs = {num_epochs}
num_labels = {num_labels}
weight_decay = {weight_decay}
"""

create_dir_not_exist(model_output)
create_dir_not_exist(f"{model_output}/eval")
create_dir_not_exist(pretrain_save_path)
create_dir_not_exist(tokenizer_save_path)
    
with open(config_file, 'w') as file:
    file.write(configs)
    
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

In [None]:
# Main
df = pd.read_csv(all_data)
train_df = pd.read_csv(train_data_path)
test_df = pd.read_csv(test_data_path)

# plot_data_distribution(df, label_column, save_path=all_plot_data_distribution_path)
# plot_data_distribution(train_df, label_column, save_path=train_plot_data_distribution_path)
# plot_data_distribution(test_df, label_column, save_path=test_plot_data_distribution_path)

In [None]:
train_df[text_column] = train_df[text_column].apply(preprocess_data)
train_df = remap_class(train_df, label_column, label_mapping)

test_df[text_column] = test_df[text_column].apply(preprocess_data)
test_df = remap_class(test_df, label_column, label_mapping)

In [None]:
train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
import numpy as np
import evaluate
from sklearn.metrics import accuracy_score, f1_score

metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    preds = predictions.argmax(axis=1)
    accuracy = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average='weighted')  # Using weighted average for F1
    return {'accuracy': accuracy, 'f1': f1}

In [None]:
class WeigtedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(**inputs)
        logits = outputs.get("logits")
        labels = inputs.get("labels")
        loss_fn = nn.CrossEntropyLoss(weight=class_weights)
        loss = loss_fn(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name)

config = AutoConfig.from_pretrained(model_name)
config.num_labels = num_labels
model = AutoModelForSequenceClassification.from_pretrained(
    model_name, config=config, ignore_mismatched_sizes=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = torch.optim.AdamW(
    model.parameters(), lr=lr, weight_decay=weight_decay)

In [None]:
df_concat = pd.concat([train_df, test_df], ignore_index=True)
class_weights = (1-(train_df[label_column].value_counts().sort_index() / len(train_df))).values
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

In [None]:
def preprocess_function(examples):
    return tokenizer(examples[text_column], truncation=True, padding='max_length', max_length=max_length)

encoded_train_dataset = train_dataset.map(preprocess_function, batched=True)
encoded_test_dataset = test_dataset.map(preprocess_function, batched=True)

In [None]:
training_args = TrainingArguments(
    num_train_epochs=num_epochs,
    learning_rate=lr,
    per_device_train_batch_size=train_batch_size,
    per_device_eval_batch_size=val_batch_size,
    weight_decay=weight_decay,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    output_dir=model_output,
    logging_dir=f'{model_output}/logs',
    logging_steps=10,
    fp16=True
)

trainer = WeigtedLossTrainer(
    model=model,
    args=training_args,
    train_dataset=encoded_train_dataset,
    eval_dataset=encoded_test_dataset,
    compute_metrics=compute_metrics
)

# Fine-tune the model
trainer.train()

In [None]:
eval_results = trainer.evaluate()
print(f"Evaluation Results: {eval_results}")

In [None]:
model.save_pretrained(pretrain_save_path)
tokenizer.save_pretrained(tokenizer_save_path)

In [None]:
predictions_output = trainer.predict(encoded_test_dataset)
predictions = predictions_output.predictions
true_labels_list = predictions_output.label_ids

predicted_labels_list = predictions.argmax(axis=1)

f1metrics(true_labels_list, predicted_labels_list)

# Save evals
data = {
        'true_labels': true_labels_list,
        'predicted_labels': predicted_labels_list,
        'text': test_df[text_column],
        'difference': [abs(a - b) for a, b in zip(true_labels_list, predicted_labels_list)]
    }
labels_true_predicted_df = pd.DataFrame(data)
labels_true_predicted_df.to_csv(f"{model_output}/labels_true_predicted.csv", index=False)
plot_confusion_matrix(labels_true_predicted_df.true_labels, labels_true_predicted_df.predicted_labels,
                        vis_label, save_path=test_plot_confusion_matrix_path)

In [None]:
def predict(textlist):
    tokenizer_kwargs = {'padding': True, 'truncation': True, 'max_length': max_length}

    pipe = pipeline("text-classification",
                    model=model, tokenizer=tokenizer, device=device,
                    **tokenizer_kwargs)
    result = []
    for text in tqdm(textlist):
        rel = pipe(text)[0]
        label = rel["label"]
        score = rel["score"]
        result.append({"text": text,
                        "label": label,
                        "score": score})
    return result
        
new_df = pd.DataFrame({'text': 
    ["
     "Commuting for me involves traveling between home and work, and I can choose between driving, taking public transportation, or biking"
     ]})

new_df['text'] = new_df['text'].apply(preprocess_data)
result = predict(new_df['text'].values)

for res in result:
    print(f"""
    text: {res['text']}
    label: {infer_mapping_secondlevel[infer_mapping_firstlevel[res['label']]]}
    """)