In [None]:
!pip install transformers text-hammer pyreadstat

In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
import torch.nn.functional as F
from transformers import RobertaTokenizer, RobertaForSequenceClassification, AdamW, get_linear_schedule_with_warmup , AutoTokenizer, TFAutoModel
import numpy as np
import seaborn as sns
from sklearn.metrics import confusion_matrix,classification_report,accuracy_score, f1_score
import matplotlib.pyplot as plt
import os
import io
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split, KFold
from sklearn import svm
from collections import defaultdict
import text_hammer as th
import pandas as pd
import tensorflow as tf , keras
import transformers
import random as rd
import keras.backend as K
from numpy.random import seed
from tensorflow.keras import layers
from keras.utils import plot_model
from keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn import metrics
from tensorflow.keras.models import load_model
import warnings
warnings.filterwarnings("ignore")
%matplotlib inline

In [None]:
# identify and specify the GPU as the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
n_gpu = torch.cuda.device_count()

In [None]:
def compute_validation_loss(model, validation_dataloader):
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for batch in validation_dataloader:
            input_ids, attention_mask, labels = batch
            input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            total_val_loss += loss.item()

    average_val_loss = total_val_loss / len(validation_dataloader)
    return average_val_loss

def metric2(y_true, y_pred_classes):
    n = y_true.size(0)
    # Calculate errors where prediction is off by 1 class
    res = torch.abs(y_true - y_pred_classes)
    count_error = torch.sum(res == 1, dtype=torch.float32)
    metric = 1 - count_error / n
    return metric.item()

def metric2_2(y_true, y_pred):
    # Convert to numpy arrays for easier manipulation
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    # Calculate the number of predictions off by 1 class
    off_by_one = np.sum(np.abs(y_true - y_pred) == 1)
    # Calculate the metric
    metric = 1 - off_by_one / len(y_true)
    return metric


In [None]:
def get_clean(x):
    mention = r'@\w+'
    hash = r'#\w+'
    x = str(x).lower().replace('\\', '').replace('_', '')
    x = re.sub(r'[^\x00-\x7F]+', ' ', x)
    x = th.cont_exp(x)
    x = th.remove_emails(x)
    x = th.remove_urls(x)
    x = re.sub(mention, ' ', x)
    x = re.sub(hash, ' ', x)
    x = th.remove_html_tags(x)
    x = th.remove_rt(x)
    x = th.remove_accented_chars(x)
    x = th.remove_special_chars(x)
    x = re.sub("(.)\\1{2,}", "\\1", x)
    x = re.sub(r'\s+', ' ', x).strip()
    x = re.sub(r'\w*\d+\w*', ' ', x).strip()
    return x

In [None]:
df = pd.read_spss("/content/drive/MyDrive/VA_EN_TU_2012-2020_3000_tweets_relevant_V03_labeled_1200_cleaned.sav")
# Original labels: 0 - positive, 1 - negative, 2 - neutral
# Remapping dictionary to align with RoBERTa's expected labels
label_mapping = {1: 2, 2: 0, 3: 1}

# Remapped labels: 0 - negative, 1 - neutral, 2 - positive
df['Label_B_emotion'] = df['Label_B_emotion'].replace(label_mapping)
df['Label_B_emotion'] = df['Label_B_emotion'].astype(int)
dff = df[['text','Label_B_emotion']].copy()
dff['Label_B_emotion'].unique()

In [None]:
dff['cleaned_data'] = dff['text'].apply(get_clean)

In [None]:
data_train, data_temp, labels_train, labels_temp = train_test_split(
    dff["text"],
    dff["Label_B_emotion"],
    test_size=0.3,
    random_state=42
)

# Split the temporary test set into validation and test sets
data_val, data_test, labels_val, labels_test = train_test_split(
    data_temp,
    labels_temp,
    test_size=0.5,
    random_state=42
)


In [None]:
# new_tokens = ['self-driving', 'autopilot', 'driverless', 'lidar', 'driver-less','automated','mobility','autonomous',
#                'traffic-law','safety-standards','smart-car','hands-free','AI-powered','selfdriving','IoT'
#                ]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = RobertaTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment')

# num_added_toks = tokenizer.add_tokens(new_tokens)

# print('Added for : twitter-roberta-base-sentiment ', num_added_toks, 'tokens')

In [None]:
# Tokenize and encode training data
encoded_train = tokenizer(
    text=data_train.tolist(),
    padding=True,
    truncation=True,
    return_tensors='pt',
    max_length=128
)
train_input_ids = encoded_train["input_ids"].to(device)
train_attention_mask = encoded_train["attention_mask"].to(device)
train_labels = torch.tensor(labels_train.tolist()).to(device)

# Tokenize and encode validation data
encoded_val = tokenizer(
    text=data_val.tolist(),
    padding=True,
    truncation=True,
    return_tensors='pt',
    max_length=128
)
val_input_ids = encoded_val["input_ids"].to(device)
val_attention_mask = encoded_val["attention_mask"].to(device)
val_labels = torch.tensor(labels_val.tolist()).to(device)

# Tokenize and encode test data
encoded_test = tokenizer(
    text=data_test.tolist(),
    padding=True,
    truncation=True,
    return_tensors='pt',
    max_length=128
)
test_input_ids = encoded_test["input_ids"].to(device)
test_attention_mask = encoded_test["attention_mask"].to(device)
test_labels = torch.tensor(labels_test.tolist()).to(device)


In [None]:
# DataLoader objects
train_data = TensorDataset(train_input_ids, train_attention_mask, train_labels)
train_dataloader = DataLoader(train_data, batch_size=40, shuffle=True)

val_data = TensorDataset(val_input_ids, val_attention_mask, val_labels)
val_dataloader = DataLoader(val_data, batch_size=40)

test_data = TensorDataset(test_input_ids, test_attention_mask, test_labels)
test_dataloader = DataLoader(test_data, batch_size=40)


In [None]:
# Model initialization and training setup
num_labels = 3
model = RobertaForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment', num_labels=num_labels)
#model.resize_token_embeddings(len(tokenizer))


for layer in model.roberta.encoder.layer:
    layer.attention.self.dropout.p = 0.5
    layer.attention.output.dropout.p = 0.5

# Set dropout rate in classifier layer
model.classifier.dropout.p = 0.2

optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=len(train_dataloader) * 2)
num_epochs = 2
model.to(device)

print("Attention dropout rate:", model.roberta.encoder.layer[0].attention.self.dropout.p)
print("Classifier dropout rate:", model.classifier.dropout.p)


In [None]:
model.train()
best_val_acc = 0.0

# Training loop
for epoch in range(num_epochs):
    total_loss = 0
    for batch in train_dataloader:
        input_ids, attention_mask, labels = batch
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        #print(loss.item())

    average_loss = total_loss / len(train_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {average_loss:.4f}")

    # Validation loop
    model.eval()
    val_predictions, val_true_labels = [], []
    with torch.no_grad():
        for batch in val_dataloader:

            input_ids, attention_mask, labels = batch
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            val_predictions.extend(logits.argmax(dim=1).cpu().tolist())
            val_true_labels.extend(labels.cpu().tolist())

    val_acc = accuracy_score(val_true_labels, val_predictions)
    print(f"Epoch {epoch + 1}/{num_epochs}, Validation Accuracy: {val_acc:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        #torch.save(model.state_dict(), "best_model.pt")

In [None]:
# Test loop
model.eval()
predictions, true_labels = [], []
with torch.no_grad():
    for batch in test_dataloader:
        input_ids, attention_mask, labels = batch
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        predictions.extend(logits.argmax(dim=1).cpu().tolist())
        true_labels.extend(labels.cpu().tolist())

In [None]:
target_names = ["Negative", "Neutral", "Positive"]
report = classification_report(true_labels, predictions, target_names=target_names)
metric2_value = metric2(torch.tensor(true_labels), torch.tensor(predictions))
print(f"Test Metric 2: {metric2_value:.4f}")
print(report)

In [None]:
cm = confusion_matrix(true_labels, predictions)

plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=target_names,
            yticklabels=target_names)

plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix [Twitter-Roberta-base extended vocabulary]: raw data')
plt.show()

#### Version 2 fine-tuning

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments,AutoConfig
import torch
from torch.utils.data import Dataset

class TweetDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        #item['labels'] = torch.tensor(self.labels[idx])
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)

# Load tokenizer and model
tokenizer = RobertaTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment')
num_labels = 3
model = RobertaForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment', num_labels=num_labels)

for layer in model.roberta.encoder.layer:
    layer.attention.self.dropout.p = 0.5
    layer.attention.output.dropout.p = 0.5

model.classifier.dropout.p = 0.2
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
model.to(device)

train_df , test_df = train_test_split(data_f[['text','Label_B_emotion']], test_size=0.2, random_state=42)

# Tokenize the input
encodings = tokenizer(train_df['text'].tolist(), truncation=True, padding=True, max_length=128)
labels = train_df['Label_B_emotion'].tolist()

# Create a dataset
dataset = TweetDataset(encodings, labels)

# Training arguments
training_args = TrainingArguments(
    output_dir='./results',          # output directory
    num_train_epochs=3,              # total number of training epochs
    per_device_train_batch_size=8,   # batch size per device during training
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
)

trainer = Trainer(
    model=model,                         # the instantiated Transformers model to be trained
    args=training_args,
    train_dataset=dataset,
)

trainer.train()


In [None]:
test_encodings = tokenizer(test_df['text'].tolist(), truncation=True, padding=True, max_length=128)
test_labels = test_df['Label_B_emotion'].tolist()
test_dataset = TweetDataset(test_encodings, test_labels)

evaluation_results = trainer.evaluate(test_dataset)
print(evaluation_results)

In [None]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
import torch

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)

    # Convert to torch tensors
    labels_tensor = torch.tensor(labels)
    preds_tensor = torch.tensor(preds)

    # Metrics
    metric_2 = metric2(labels_tensor, preds_tensor)
    metric_1 = metric1_torch(labels_tensor, preds_tensor)

    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)

    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall,
        'metric1': metric_1,
        'metric2': metric_2
    }

def metric2(y_true, y_pred_classes):
    n = y_true.size(0)
    # Calculate errors where prediction is off by 1 class
    res = torch.abs(y_true - y_pred_classes)
    count_error = torch.sum(res == 1, dtype=torch.float32)
    metric = 1 - count_error / n
    return metric.item()

def metric1_torch(y_true, y_pred):
    correct_preds = (y_true == y_pred)
    metric = torch.mean(correct_preds.float())
    return metric

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
)

evaluation_results = trainer.evaluate(test_dataset)
print(evaluation_results)
