In [8]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
from tqdm import tqdm
import optuna
from transformers import AutoTokenizer, AutoModelForSequenceClassification

os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [21]:
dataset = "idiap"
dataset = "idiap_chunked"

In [23]:
if dataset == "my_personality":
    target_vars_names = [
        "sEXT",
        "sNEU",
        "sAGR",
        "sCON",
        "sOPN",
    ]
    train = pd.read_csv(
        "data/my_personality/my_personality.csv",
        encoding="ISO-8859-1",
    )
    train.rename(columns={"STATUS": "text"}, inplace=True)
elif dataset == "idiap":
    target_vars_names = [
        "hones16",
        "emoti16",
        "extra16",
        "agree16",
        "consc16",
        "openn16",
        "icar_hat0",
        "icar_hat1",
        "icar_hat2",
    ]
    train = pd.read_excel("data/idiap/dataset.xlsx")
    train.rename(columns={"final_text": "text"}, inplace=True)
else:
    target_vars_names = [
        "hones16",
        "emoti16",
        "extra16",
        "agree16",
        "consc16",
        "openn16",
        "icar_hat0",
        "icar_hat1",
        "icar_hat2",
    ]
    train = pd.read_csv("data/idiap_chunked/chunked_dataset.csv")
    train.rename(columns={"chunk_text": "text"}, inplace=True)

target_vars = train[target_vars_names]
target_vars = (target_vars - target_vars.min()) / (target_vars.max() - target_vars.min())
target_vars = target_vars.reset_index(drop=True).to_numpy()
texts = train["text"]

In [None]:
model_name = "bhadresh-savani/bert-base-go-emotion"
tokenizer = AutoTokenizer.from_pretrained(model_name)
emotion_model = AutoModelForSequenceClassification.from_pretrained(model_name)

def extract_features(texts):
    features = []
    for text in tqdm(texts):
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
        with torch.no_grad():
            outputs = emotion_model(**inputs)
        probabilities = torch.softmax(outputs.logits, dim=-1)
        features.append(probabilities[0].numpy())
    return np.array(features)

features = extract_features(texts)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(features, target_vars, test_size=0.2, random_state=42)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)


class EmotionRegressor(nn.Module):
    def __init__(self, input_size, output_size, hidden_units, num_layers, dropout_rate):
        super(EmotionRegressor, self).__init__()
        layers = []
        for _ in range(num_layers):
            layers.append(
                nn.Linear(
                    input_size if len(layers) == 0 else hidden_units, hidden_units
                )
            )
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
        layers.append(nn.Linear(hidden_units, output_size))
        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)


def objective(trial):
    hidden_units = trial.suggest_int("hidden_units", 32, 256, step=32)
    num_layers = trial.suggest_int("num_layers", 1, 3)
    dropout_rate = trial.suggest_float("dropout_rate", 0.0, 0.5)
    learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)

    model = EmotionRegressor(
        input_size=X_train.shape[1],
        output_size=y_train.shape[1],
        hidden_units=hidden_units,
        num_layers=num_layers,
        dropout_rate=dropout_rate,
    )
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    epochs = 50
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(X_train_tensor)
        loss = criterion(outputs, y_train_tensor)
        loss.backward()
        optimizer.step()

    model.eval()
    with torch.no_grad():
        predictions = model(X_test_tensor).numpy()

    rmse_per_output = np.sqrt(
        mean_squared_error(y_test, predictions, multioutput="raw_values")
    )
    mean_rmse = np.mean(rmse_per_output)

    mae_per_output = mean_absolute_error(y_test, predictions, multioutput="raw_values")
    mean_mae = np.mean(mae_per_output)

    return mean_mae


study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=40)

print("Best hyperparameters:", study.best_params)

best_params = study.best_params
final_model = EmotionRegressor(
    input_size=X_train.shape[1],
    output_size=y_train.shape[1],
    hidden_units=best_params["hidden_units"],
    num_layers=best_params["num_layers"],
    dropout_rate=best_params["dropout_rate"],
)
criterion = nn.MSELoss()
optimizer = optim.Adam(final_model.parameters(), lr=best_params["learning_rate"])

epochs = 100
for epoch in range(epochs):
    final_model.train()
    optimizer.zero_grad()
    outputs = final_model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

final_model.eval()
with torch.no_grad():
    predictions = final_model(X_test_tensor).numpy()

mae_per_output = mean_absolute_error(y_test, predictions, multioutput="raw_values")
rmse_per_output = np.sqrt(
    mean_squared_error(y_test, predictions, multioutput="raw_values")
)
mean_mae = np.mean(mae_per_output)
mean_rmse = np.mean(rmse_per_output)

print("MAE per output:", mae_per_output)
print("RMSE per output:", rmse_per_output)
print("Mean MAE:", mean_mae)
print("Mean RMSE:", mean_rmse)