In [None]:
import os
import sys

PROJECT_MARKERS = ("src", "data", "prompts", "results")

def find_project_root(start_path):
    current = os.path.abspath(start_path)

    while True:
        if all(os.path.isdir(os.path.join(current, m)) for m in PROJECT_MARKERS):
            return current

        parent = os.path.dirname(current)
        if parent == current:
            raise RuntimeError("Project root not found")

        current = parent


# ---- execution directory (cwd) ----
cwd = os.getcwd()

# ---- safe starting point ----
try:
    start_path = os.path.dirname(os.path.abspath(__file__))
except NameError:
    start_path = cwd


# ---- resolve canonical paths ----
project_root = find_project_root(start_path)

if project_root not in sys.path:
    sys.path.insert(0, project_root)

src_root     = os.path.join(project_root, "src", "daniel", "gemini")
data_root    = os.path.join(project_root, "data", "MAMS-ACSA", "raw", "data_jsonl", "annotated")
schemas_root = os.path.join(project_root, "data", "MAMS-ACSA", "raw", "data_jsonl", "schema")
prompts_root = os.path.join(project_root, "prompts", "daniel", "llama")
utils_root   = os.path.join(project_root, "utils")
results_root = os.path.join(project_root, "results", "daniel")

print(
    f"ðŸ“‚ cwd          : {cwd}\n"
    f"ðŸ“‚ Project root : {project_root}\n"
    f"ðŸ“‚ Source root  : {src_root}\n"
    f"ðŸ“‚ Data root    : {data_root}\n"
    f"ðŸ“‚ Prompts root : {prompts_root}\n"
    f"ðŸ“‚ Utils root   : {utils_root}\n"
    f"ðŸ“‚ Results root : {results_root}"
)

In [None]:
import json
from pathlib import Path

import pandas as pd
import numpy as np

import torch
from torch.utils.data import Dataset

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score

from transformers import (
    AutoTokenizer,
    AutoModel,
    Trainer,
    TrainingArguments
)

In [None]:
def load_jsonl(path):
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            rows.append(json.loads(line))
    return rows

train_rows = load_jsonl(Path(data_root) / "train.jsonl")
val_rows   = load_jsonl(Path(data_root) / "validation.jsonl")
test_rows  = load_jsonl(Path(data_root) / "test.jsonl")

In [None]:
def explode(rows):
    records = []
    for r in rows:
        text = r["input"]
        for o in r["output"]:
            records.append({
                "text": text,
                "aspect": o["aspect"],
                "emotion": o["emotion"],
                "polarity": o["polarity"]
            })
    return pd.DataFrame(records)

In [None]:
train_df = explode(train_rows)
val_df   = explode(val_rows)
test_df  = explode(test_rows)

len(train_df), len(val_df), len(test_df)

In [None]:
for df in (train_df, val_df, test_df):
    df["emotion"] = df["emotion"].replace({"mentioned_only": "neutral"})

In [None]:
train_df["emotion"].value_counts()

In [None]:
emotion_encoder = LabelEncoder()
polarity_encoder = LabelEncoder()

# fit on TRAIN only
train_df["emotion_id"]  = emotion_encoder.fit_transform(train_df["emotion"])
train_df["polarity_id"] = polarity_encoder.fit_transform(train_df["polarity"])

# apply to VAL / TEST
val_df["emotion_id"]  = emotion_encoder.transform(val_df["emotion"])
val_df["polarity_id"] = polarity_encoder.transform(val_df["polarity"])

test_df["emotion_id"]  = emotion_encoder.transform(test_df["emotion"])
test_df["polarity_id"] = polarity_encoder.transform(test_df["polarity"])

emotion_encoder.classes_, polarity_encoder.classes_

In [None]:
class EmotionDataset(Dataset):
    def __init__(self, df, tokenizer, max_len=256):
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.loc[idx]

        text = f"ASPECT: {row['aspect']} | TEXT: {row['text']}"

        enc = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_len,
            return_tensors="pt"
        )

        return {
            "input_ids": enc["input_ids"].squeeze(0),
            "attention_mask": enc["attention_mask"].squeeze(0),
            "emotion_labels": torch.tensor(row["emotion_id"]),
            "polarity_labels": torch.tensor(row["polarity_id"]),
        }

In [None]:
MODEL_NAME = "distilroberta-base"

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

train_ds = EmotionDataset(train_df, tokenizer)
val_ds   = EmotionDataset(val_df, tokenizer)
test_ds  = EmotionDataset(test_df, tokenizer)

len(train_ds), len(val_ds), len(test_ds)

In [None]:
class EmotionPolarityModel(torch.nn.Module):
    def __init__(self, model_name, num_emotions, num_polarity):
        super().__init__()

        self.encoder = AutoModel.from_pretrained(model_name)
        hidden_size = self.encoder.config.hidden_size

        self.emotion_head  = torch.nn.Linear(hidden_size, num_emotions)
        self.polarity_head = torch.nn.Linear(hidden_size, num_polarity)

    def forward(
        self,
        input_ids,
        attention_mask,
        emotion_labels=None,
        polarity_labels=None
    ):
        outputs = self.encoder(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

        cls_repr = outputs.last_hidden_state[:, 0]

        emotion_logits  = self.emotion_head(cls_repr)
        polarity_logits = self.polarity_head(cls_repr)

        loss = None
        if emotion_labels is not None:
            loss_emotion = torch.nn.functional.cross_entropy(
                emotion_logits, emotion_labels
            )
            loss_polarity = torch.nn.functional.cross_entropy(
                polarity_logits, polarity_labels
            )
            loss = loss_emotion + 0.3 * loss_polarity

        return {
            "loss": loss,
            "emotion_logits": emotion_logits,
            "polarity_logits": polarity_logits,
        }

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred

    emotion_logits = logits["emotion_logits"]
    emotion_labels = labels["emotion_labels"]

    preds = np.argmax(emotion_logits, axis=1)

    return {
        "emotion_f1_macro": f1_score(
            emotion_labels,
            preds,
            average="macro"
        )
    }

In [None]:
model = EmotionPolarityModel(
    model_name=MODEL_NAME,
    num_emotions=len(emotion_encoder.classes_),
    num_polarity=len(polarity_encoder.classes_)
)

training_args = TrainingArguments(
    output_dir=results_root,
    evaluation_strategy="epoch",
    save_strategy="no",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_steps=50,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
trainer.train()