In [51]:
import os
import json
import glob
from typing import Dict, Tuple, List, Optional
import pandas as pd
from sklearn.metrics import precision_recall_fscore_support, accuracy_score


In [52]:
EMOTION_MAPPING_PT_TO_EN = {
    'felicidade': 'Happiness',
    'tristeza': 'Sadness',
    'medo': 'Fear',
    'raiva': 'Anger',
    'surpresa': 'Surprise',
    'desgosto': 'Disgust',
    'neutro': 'Neutral',
}

CANONICAL_LABELS = set(EMOTION_MAPPING_PT_TO_EN.values())

In [53]:
def load_ground_truth(gt_path: str) -> Dict[str, Dict[str, str]]:
    """Load ground truth JSON mapping to dict[output][segment] -> label (EN)."""
    with open(gt_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    gt: Dict[str, Dict[str, str]] = {}
    for output_name, segments in data.items():
        gt[output_name] = {}
        for seg_id, seg_data in segments.items():
            label = seg_data.get('principal_emocao_detectada')
            gt[output_name][seg_id] = label
    return gt

In [54]:
def normalize_label(value: Optional[str]) -> Optional[str]:
    if not value:
        return None

    raw = str(value).strip().lower()

    replacements = {
        'emoção': 'emocao',
        'felicidade': 'felicidade',
        'tristeza': 'tristeza',
        'medo': 'medo',
        'raiva': 'raiva',
        'surpresa': 'surpresa',
        'desgosto': 'desgosto',
        'neutro': 'neutro',
    }

    if raw in EMOTION_MAPPING_PT_TO_EN:
        return EMOTION_MAPPING_PT_TO_EN[raw]

    alias = {
        'happy': 'Happiness',
        'happiness': 'Happiness',
        'sad': 'Sadness',
        'sadness': 'Sadness',
        'fear': 'Fear',
        'afraid': 'Fear',
        'angry': 'Anger',
        'anger': 'Anger',
        'surprise': 'Surprise',
        'surprised': 'Surprise',
        'disgust': 'Disgust',
        'disgusted': 'Disgust',
        'neutral': 'Neutral',
    }
    if raw in alias:
        return alias[raw]

    cap = raw.capitalize()
    if cap in CANONICAL_LABELS:
        return cap

    return None

In [55]:
def load_splits(csv_path: str) -> Dict[Tuple[str, str], str]:
    df = pd.read_csv(csv_path)
    mapping: Dict[Tuple[str, str], str] = {}
    for _, row in df.iterrows():
        file_path = str(row['File Path'])
        split = str(row['Split']).strip().lower()
        file_path = file_path.replace('/', '\\').replace('\\', os.sep)
        parts = file_path.split(os.sep)
        if len(parts) >= 2:
            output_name = parts[0]
            seg_with_ext = parts[1]
            seg = os.path.splitext(seg_with_ext)[0]
            mapping[(output_name, seg)] = split
    return mapping

In [56]:
def extract_pred_label(seg_obj: dict) -> Optional[str]:
    for key in (
        'principal_emocao_detectada',
        'emoção',
        'emocao',
        'emotion',
        'label',
        'pred',
    ):
        if key in seg_obj and seg_obj[key]:
            return str(seg_obj[key])
    return None

In [57]:
def load_model_predictions(all_json_path: str) -> Dict[str, Dict[str, Optional[str]]]:
    with open(all_json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    preds: Dict[str, Dict[str, Optional[str]]] = {}
    for output_name, segments in data.items():
        if not isinstance(segments, dict):
            continue
        preds[output_name] = {}
        for seg_id, seg_obj in segments.items():
            if not isinstance(seg_obj, dict):
                continue
            raw_label = extract_pred_label(seg_obj)
            preds[output_name][seg_id] = normalize_label(raw_label)
    return preds

In [58]:
def collect_pairs(gt: Dict[str, Dict[str, str]],
                  preds: Dict[str, Dict[str, Optional[str]]],
                  subset: Optional[str],
                  split_map: Dict[Tuple[str, str], str]) -> Tuple[List[str], List[str], int]:
    y_true: List[str] = []
    y_pred: List[str] = []
    hallucinations = 0

    for output_name, segs_gt in gt.items():
        if output_name not in preds:
            continue
        segs_pred = preds[output_name]
        for seg_id, true_label in segs_gt.items():
            if seg_id not in segs_pred:
                continue
            if subset == 'test':
                if split_map.get((output_name, seg_id), None) != 'test':
                    continue
            pred_label = segs_pred.get(seg_id)
            if pred_label is None:
                hallucinations += 1
                # Skip adding to y_true/y_pred so sklearn doesn't see None labels
                continue
            y_true.append(true_label)
            y_pred.append(pred_label)

    return y_true, y_pred, hallucinations

In [59]:
def compute_metrics(y_true: List[str], y_pred: List[str],
                    hallucinations: int, total_seen: int) -> Dict[str, float]:
    if len(y_true) == 0:
        return {
            'precision_weighted': 0.0,
            'recall_weighted': 0.0,
            'f1_weighted': 0.0,
            'precision_macro': 0.0,
            'recall_macro': 0.0,
            'f1_macro': 0.0,
            'accuracy': 0.0,
            'samples': 0,
            'hallucination_rate': 0.0,
            'coverage': 0.0,
        }

    pw, rw, fw, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted', zero_division=0)
    pm, rm, fm, _ = precision_recall_fscore_support(y_true, y_pred, average='macro', zero_division=0)
    acc = accuracy_score(y_true, y_pred)
    rate = (hallucinations / total_seen) if total_seen > 0 else 0.0
    coverage = ((total_seen - hallucinations) / total_seen) if total_seen > 0 else 0.0
    return {
        'precision_weighted': pw,
        'recall_weighted': rw,
        'f1_weighted': fw,
        'precision_macro': pm,
        'recall_macro': rm,
        'f1_macro': fm,
        'accuracy': acc,
        'samples': len(y_true),
        'hallucination_rate': rate,
        'coverage': coverage,
    }

In [60]:
def evaluate_one(all_json_path: str, gt: Dict[str, Dict[str, str]],
                 split_map: Dict[Tuple[str, str], str], subset: Optional[str]) -> Dict[str, float]:
    preds = load_model_predictions(all_json_path)

    total_seen = 0
    total_expected = 0
    missing_predictions = 0
    for output_name, segs_gt in gt.items():
        segs_pred = preds.get(output_name)
        for seg_id in segs_gt.keys():
            if subset == 'test' and split_map.get((output_name, seg_id), None) != 'test':
                continue
            total_expected += 1

            if segs_pred is None:
                missing_predictions += 1
                continue

            if seg_id not in segs_pred:
                missing_predictions += 1
                continue

            total_seen += 1

    y_true, y_pred, hallucinations = collect_pairs(gt, preds, subset, split_map)
    result = compute_metrics(y_true, y_pred, hallucinations, total_seen)

    inclusive_hallucinations = missing_predictions + hallucinations
    if total_expected > 0:
        result['hallucination_rate_inclusive'] = inclusive_hallucinations / total_expected
        result['coverage_inclusive'] = (total_expected - inclusive_hallucinations) / total_expected
    else:
        result['hallucination_rate_inclusive'] = 0.0
        result['coverage_inclusive'] = 0.0

    result['total_expected'] = total_expected
    result['total_seen'] = total_seen
    result['missing_predictions'] = missing_predictions

    return result

In [61]:
def main():
    base_results_dir = os.path.join('../results')
    gt_path = os.path.join(base_results_dir, 'resultado_manual.json')
    splits_csv = os.path.join(base_results_dir, 'data_splits.csv')

    if not os.path.exists(gt_path):
        raise FileNotFoundError(f"Ground truth not found at {gt_path}")
    if not os.path.exists(splits_csv):
        raise FileNotFoundError(f"Splits CSV not found at {splits_csv}")

    gt = load_ground_truth(gt_path)
    split_map = load_splits(splits_csv)

    rows = []

    search_specs = [
        ('resultados_com_contexto', 'context'),
        ('resultados_sem_contexto', 'nocontext'),
    ]

    for subdir, ctx in search_specs:
        ctx_dir = os.path.join(base_results_dir, subdir)
        if not os.path.isdir(ctx_dir):
            continue

        for name in sorted(os.listdir(ctx_dir)):
            model_dir = os.path.join(ctx_dir, name)
            if not os.path.isdir(model_dir):
                continue

            json_candidates = glob.glob(os.path.join(model_dir, 'all_*.json'))
            if not json_candidates:
                continue

            all_json = json_candidates[0]

            model_name = name

            for subset in ('test', 'full'):
                subset_arg = None if subset == 'full' else 'test'
                metrics = evaluate_one(all_json, gt, split_map, subset_arg)

                row_id = f"{model_name}-{ctx}-{subset}"
                row = {
                    'model_id': row_id,
                    'model': model_name,
                    'context': ctx,
                    'subset': subset,
                }
                for k, v in metrics.items():
                    row[k] = round(float(v), 6) if isinstance(v, float) else v
                rows.append(row)

    if not rows:
        print('No results found. Ensure result folders contain all_*.json files.')
        return

    df = pd.DataFrame(rows)
    subset_order = {'test': 0, 'full': 1}
    df['subset_ord'] = df['subset'].map(subset_order)
    df = df.sort_values(by=['context', 'model', 'subset_ord']).drop(columns=['subset_ord'])

    base_cols = ['model_id', 'model', 'context', 'subset']
    metric_cols = [
        'precision_weighted', 'recall_weighted', 'f1_weighted',
        'precision_macro', 'recall_macro', 'f1_macro',
        'accuracy', 'samples', 'hallucination_rate', 'coverage',
        'hallucination_rate_inclusive', 'coverage_inclusive',
        'total_expected', 'total_seen', 'missing_predictions'
    ]
    out_cols = base_cols + metric_cols
    for c in metric_cols:
        if c not in df.columns:
            df[c] = 0.0
    df = df[out_cols]

    out_csv = os.path.join(base_results_dir, 'llm_models_metrics.csv')
    df.to_csv(out_csv, index=False, encoding='utf-8')
    print(f"Saved: {out_csv}")
    print(len(df))
    print(df.to_string(index=False))
    
    print("\nRankings by context and subset:")

    # Ensure numeric type for samples in case it was parsed as string
    df['samples'] = pd.to_numeric(df['samples'], errors='coerce')

    #expected_samples = { 'test': 154, 'full': 769 }

    categories = [
        ("with context and full data", 'context', 'full'),
        ("with context and test data", 'context', 'test'),
        ("without context and full data", 'nocontext', 'full'),
        ("without context and test data", 'nocontext', 'test'),
    ]

    all_rankings = []

    for title, ctx_val, subset in categories:
        mask = (
            (df['context'] == ctx_val) &
            (df['subset'] == subset) #&
            #(df['samples'] == expected_samples[subset])
        )
        df_cat = df.loc[mask].sort_values(by='f1_macro', ascending=False)
        print(f"\nRanking - {title} (rows={len(df_cat)})")
        if len(df_cat) == 0:
            print("No results found for this category.")
        else:
            print(df_cat.to_string(index=False))

        # Save per-category ranking CSV
        out_csv_cat = os.path.join(base_results_dir, f"llm_models_metrics_ranking_{ctx_val}_{subset}.csv")
        df_cat.to_csv(out_csv_cat, index=False, encoding='utf-8')
        print(f"Saved ranking CSV: {out_csv_cat}")

        # Accumulate for combined rankings file
        tmp = df_cat.copy()
        tmp['ranking_title'] = title
        tmp['ranking_context'] = ctx_val
        tmp['ranking_subset'] = subset
        all_rankings.append(tmp)

    # Save combined rankings CSV
    if all_rankings:
        df_all = pd.concat(all_rankings, ignore_index=True)
        out_all_csv = os.path.join(base_results_dir, 'llm_models_metrics_rankings_all.csv')
        df_all.to_csv(out_all_csv, index=False, encoding='utf-8')
        print(f"Saved combined rankings CSV: {out_all_csv}")


if __name__ == '__main__':
    main()

Saved: ../results\llm_models_metrics.csv
60
                              model_id                   model   context subset  precision_weighted  recall_weighted  f1_weighted  precision_macro  recall_macro  f1_macro  accuracy  samples  hallucination_rate  coverage  hallucination_rate_inclusive  coverage_inclusive  total_expected  total_seen  missing_predictions
          deepseek-r1_14b-context-test         deepseek-r1_14b   context   test            0.751245         0.824675     0.786250         0.124266      0.136412  0.130056  0.824675      154            0.000000  1.000000                      0.000000            1.000000             154         154                    0
          deepseek-r1_14b-context-full         deepseek-r1_14b   context   full            0.772031         0.843953     0.803660         0.199586      0.178267  0.180558  0.843953      769            0.000000  1.000000                      0.000000            1.000000             769         769                    0