### Install Dependencies

In [None]:
!pip3 install pandas
!pip3 install numpy
!pip3 install torch
!pip3 install transformers
!pip3 install accelerate -U
!pip3 install ray[tune]
!pip3 install hyperopt
!pip3 install sklearn
!pip3 install datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Import libraries

In [None]:
from transformers import AutoTokenizer, Trainer, TrainingArguments, AutoModelForSequenceClassification, BertTokenizerFast, BertForSequenceClassification
from transformers.trainer_callback import EarlyStoppingCallback
import pandas as pd
import torch
import accelerate
import numpy as np
from datasets import Dataset

### Load data

In [None]:
train_path = 'drive/MyDrive/data/VUA/VUA_formatted_train.csv'
eval_path = 'drive/MyDrive/data/VUA/VUA_formatted_val.csv'
test_path = 'drive/MyDrive/data/VUA/VUA_formatted_test.csv'

def load_dataset(train, eval, test, encoding):
    train_df = pd.read_csv(train, encoding=encoding)
    eval_df = pd.read_csv(eval, encoding=encoding)
    test_df = pd.read_csv(test, encoding=encoding)

    return train_df, eval_df, test_df

train_df, eval_df, test_df = load_dataset(train_path, eval_path, test_path, encoding='ISO-8859-1')

### Preprocess Data and Tokenize input

In [None]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

def preprocess_data(df):
    tokenized_sentences = []
    attention_masks = []
    verb_labels = []

    for _, row in df.iterrows():
        sentence = row['sentence']
        verb_idx = row['verb_idx']
        label = row['label']

        # Tokenize the sentence and get the respective wordpiece token positions
        tokens = tokenizer.tokenize(sentence)
        input_ids = tokenizer.convert_tokens_to_ids(tokens)
        mask = [1] * len(input_ids)

        start_token_idx = len(tokenizer.tokenize(sentence[:verb_idx]))
        end_token_idx = start_token_idx + len(tokenizer.tokenize(row['verb'])) - 1

        verb_label = [0] * len(tokens)
        for idx in range(start_token_idx, end_token_idx + 1):
            verb_label[idx] = label

        tokenized_sentences.append(input_ids)
        attention_masks.append(mask)
        verb_labels.append(verb_label)

    return tokenized_sentences, attention_masks, verb_labels

train_encodings, train_masks, train_labels = preprocess_data(train_df)
eval_encodings, eval_masks, eval_labels = preprocess_data(eval_df)
test_encodings, test_masks, test_labels = preprocess_data(test_df)

In [None]:
MAX_LEN = 128

def preprocess_data(df):
    tokenized_sentences = []
    attention_masks = []
    verb_labels = []

    for _, row in df.iterrows():
        sentence = row['sentence']
        label = row['label']

        # Tokenize the sentence and get the respective wordpiece token positions
        tokens = tokenizer.tokenize(sentence)[:MAX_LEN - 2]  # Account for BERT model [CLS] and [SEP]
        input_ids = tokenizer.convert_tokens_to_ids(tokens)
        mask = [1] * len(input_ids)

        # Padding
        while len(input_ids) < MAX_LEN:
            input_ids.append(0)
            mask.append(0)

        tokenized_sentences.append(input_ids)
        attention_masks.append(mask)
        verb_labels.append(label)

    return tokenized_sentences, attention_masks, verb_labels

train_encodings, train_masks, train_labels = preprocess_data(train_df)
eval_encodings, eval_masks, eval_labels = preprocess_data(eval_df)
test_encodings, test_masks, test_labels = preprocess_data(test_df)


### Load data for training

In [None]:
def create_hf_dataset(encodings, masks, labels):
    return Dataset.from_dict({
        'input_ids': encodings,
        'attention_mask': masks,
        'labels': labels
    })

train_dataset = create_hf_dataset(train_encodings, train_masks, train_labels)
eval_dataset = create_hf_dataset(eval_encodings, eval_masks, eval_labels)
test_dataset = create_hf_dataset(test_encodings, test_masks, test_labels)

### Training

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    f1 = f1_score(labels, predictions)
    r, _ = pearsonr(labels, predictions)

    return {"f1": f1, "pearson_r": r}

In [None]:

from sklearn.metrics import f1_score
from scipy.stats import pearsonr

from transformers import BertForSequenceClassification
model = BertForSequenceClassification.from_pretrained('bert-base-cased', num_labels=2)


# model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

from transformers import TrainingArguments

training_args = TrainingArguments(
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=1,
    max_steps=8000,
    logging_dir='./logs',
    logging_steps=2000, 
    save_steps=2000,
    evaluation_strategy="steps",
    save_total_limit=2, 
    learning_rate=2e-5,
    remove_unused_columns=False,
    output_dir="./metaphor_detection_model",
)

from transformers import DataCollatorWithPadding

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
     compute_metrics=compute_metrics, 
)



In [None]:
trainer.train()

In [None]:
results = trainer.evaluate()

results_on_test_data = trainer.evaluate(test_dataset)
print(results_on_test_data)

In [None]:
from datasets import load_dataset
import numpy as np
from scipy.stats import pearsonr
from transformers import BertConfig, BertForSequenceClassification, BertTokenizer, Trainer, TrainingArguments

dataset = load_dataset('csv', data_files='drive/MyDrive/data/EMOBANK/emobank.csv')

train_dataset = dataset['train'].filter(lambda example: example['split'] == 'train')
test_dataset = dataset['train'].filter(lambda example: example['split'] == 'test')
dev_dataset = dataset['train'].filter(lambda example: example['split'] == 'dev')


def normalize_values(dataset):
    for key in ['V', 'A', 'D']:
        dataset[key] = (dataset[key] - 1) / 4
    return dataset

train_dataset = train_dataset.map(normalize_values)
test_dataset = test_dataset.map(normalize_values)
dev_dataset = dev_dataset.map(normalize_values)




In [None]:
dev_dataset = dev_dataset.filter(lambda example: example['text'] is not None)

tokenizer = BertTokenizer.from_pretrained("bert-base-cased")

def tokenize_function(examples):
#    return tokenizer(examples['text'], padding="max_length", truncation=True, max_length=128, return_tensors="pt")
# For Python 3.9
     tokenized = tokenizer(examples['text'], padding="max_length", truncation=True, max_length=128, return_tensors="pt")
     return {key: value.numpy() for key, value in tokenized.items()}

train_dataset = train_dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)
dev_dataset = dev_dataset.map(tokenize_function, batched=True)

from transformers import BertTokenizer

dev_dataset = dev_dataset.filter(lambda example: example['text'] is not None)

tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

In [None]:
def format_dataset(example):
    example['labels'] = [example['V'], example['A'], example['D']]
    return example

train_dataset = train_dataset.map(format_dataset)
test_dataset = test_dataset.map(format_dataset)
dev_dataset = dev_dataset.map(format_dataset)

train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
dev_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])


In [None]:
from transformers import BertConfig, BertModel, Trainer, TrainingArguments
import torch.nn as nn


class VADRegressor(nn.Module):
    def __init__(self, pretrained_model_name, config):
        super(VADRegressor, self).__init__()
        self.bert = BertModel.from_pretrained(pretrained_model_name, config=config)
        self.regressor = nn.Linear(config.hidden_size, 3)  # Three outputs for V, A, D

    def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
        outputs = self.bert(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled_output = outputs[1]
        logits = self.regressor(pooled_output)

        loss = None
        if labels is not None:
            loss_fct = nn.MSELoss()
            loss = loss_fct(logits.view(-1), labels.view(-1))
        return (loss, logits) if loss is not None else logits

config = BertConfig.from_pretrained("kangela/Metaphor-FineTuned-BERT")
model = VADRegressor("kangela/Metaphor-FineTuned-BERT", config=config)

training_args = TrainingArguments(
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=12,
    learning_rate=5e-5,
    output_dir='./metaphor_on_emotion_results2',
    logging_dir='./logs',
    logging_steps=1000,
    do_train=True,
    do_eval=True,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=dev_dataset,
)

In [None]:
trainer.train()

In [None]:
predictions = trainer.predict(test_dataset).predictions
ground_truth = test_dataset['labels']

pearson_v = pearsonr(predictions[:, 0], ground_truth[:, 0])[0]
pearson_a = pearsonr(predictions[:, 1], ground_truth[:, 1])[0]
pearson_d = pearsonr(predictions[:, 2], ground_truth[:, 2])[0]

print(f"Pearson r values: Valence: {pearson_v}, Arousal: {pearson_a}, Dominance: {pearson_d}")
