In [74]:
# For pre-trained models eval:
import pandas as pd
import ast

# Load data
test_df = pd.read_csv("/Dataset/test_data/beauty_test.csv")
pred_df = pd.read_csv("./results/responses_beauty_w_keys.csv")
test_df["prediction"] = pred_df["response"]

# Helper to safely parse dictionaries
def safe_parse_dict(text, wrap_braces=False):
    if pd.isna(text):
        return {}
    try:
        if wrap_braces:
            text = "{" + text + "}"
        return ast.literal_eval(text)
    except:
        return {}

# Build comparison table
comparison_data = []
for i, row in test_df.iterrows():
    gt = safe_parse_dict(row["aspects"])
    pred = safe_parse_dict(row["prediction"], wrap_braces=True)
    comparison_data.append({
        "product_id": row["product_id"],
        "video_title": row["video_title"],
        "ground_truth": gt,
        "prediction": pred
    })

# Save to CSV
comparison_df = pd.DataFrame(comparison_data)
comparison_df.to_csv("./results/aspect_prediction_comparison.csv", index=False)
print("Exported to aspect_prediction_comparison.csv")


Exported to aspect_prediction_comparison.csv


In [None]:
# Attribute-Conditioned Evaluation
import pandas as pd
import ast
import os
from collections import defaultdict

# Load your comparison file
df = pd.read_csv("./results/aspect_prediction_comparison.csv")
df_top10 = df[df["prediction"].notna()]
    
# Helper to safely parse dictionaries
def safe_parse(val):
    try:
        return ast.literal_eval(val)
    except:
        return {}

# Custom fuzzy match based on common substring rule (≥ 50% of label length)
def custom_fuzzy_match(label, pred):
    label = str(label).lower()
    pred = str(pred).lower()
    match_length = len(os.path.commonprefix([label, pred]))
    return match_length >= (len(label) / 2)

# Compute F1 scores (overall and per attribute)
def compute_fuzzy_f1_scores(df):
    total_tp, total_fp, total_fn = 0, 0, 0
    attr_stats = defaultdict(lambda: {"tp": 0, "fp": 0, "fn": 0})

    for _, row in df.iterrows():
        gt = safe_parse(row["ground_truth"])
        pred = safe_parse(row["prediction"])

        matched_keys = set()

        for key, gt_val in gt.items():
            if key in pred:
                if custom_fuzzy_match(gt_val, pred[key]):
                    total_tp += 1
                    attr_stats[key]["tp"] += 1
                else:
                    total_fn += 1
                    attr_stats[key]["fn"] += 1
                matched_keys.add(key)
            else:
                total_fn += 1
                attr_stats[key]["fn"] += 1

        for key in pred:
            if key not in gt:
                total_fp += 1
                attr_stats[key]["fp"] += 1
            elif key not in matched_keys:
                total_fp += 1
                attr_stats[key]["fp"] += 1

    # Overall scores
    precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) else 0
    recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0

    # Attribute-level scores
    attr_f1_scores = {}
    for attr, stats in attr_stats.items():
        tp, fp, fn = stats["tp"], stats["fp"], stats["fn"]
        p = tp / (tp + fp) if tp + fp else 0
        r = tp / (tp + fn) if tp + fn else 0
        f1_attr = 2 * p * r / (p + r) if p + r else 0
        attr_f1_scores[attr] = round(f1_attr, 4)

    return round(precision, 4), round(recall, 4), round(f1, 4), attr_f1_scores

# Run the function
overall_precision, overall_recall, overall_f1, attribute_f1s = compute_fuzzy_f1_scores(df_top10)

# Display results
print(f"\n🔹 Overall F1:")
print(f"Precision: {overall_precision}")
print(f"Recall:    {overall_recall}")
print(f"F1 Score:  {overall_f1}\n")

print("🔹 Attribute-level F1 scores (sorted by F1 descending):")
for attr, f1_score in sorted(attribute_f1s.items(), key=lambda x: x[1], reverse=True):
    print(f"{attr}: {f1_score}")
# Count attribute frequency in ground truth
attr_freq = defaultdict(int)
for _, row in df_top10.iterrows():
    gt = safe_parse(row["ground_truth"])
    for key in gt.keys():
        attr_freq[key] += 1

# Sort F1 scores by frequency descending
sorted_attrs_by_freq = sorted(attribute_f1s.items(), key=lambda x: attr_freq.get(x[0], 0), reverse=True)

print("🔹 Attribute-level F1 scores (sorted by attribute frequency in ground truth):")
for attr, f1_score in sorted_attrs_by_freq:
    print(f"{attr} (count={attr_freq.get(attr, 0)}): {f1_score}")


In [None]:
# Generalized Evaluation
import pandas as pd
import difflib
import ast
import re


# Load data
df = pd.read_csv("./results/aspect_prediction_comparison.csv")
df_pred = pd.read_csv("/Dataset/test_data/responses_beauty_generalized.csv")

# Filter out empty predictions
df_top10 = df[df["prediction"].notna()]

# -------- Utility Functions --------

def normalize(text):
    return re.sub(r'\s+', ' ', str(text).strip().lower())

def fuzzy_match(s1, s2, threshold=50):
    s1 = normalize(s1)
    s2 = normalize(s2)
    score = difflib.SequenceMatcher(None, s1, s2).ratio() * 100
    return score >= threshold

def match_key_fuzzy(pred_key, gt_keys, threshold=50):
    pred_key_norm = normalize(pred_key)
    for gt_key in gt_keys:
        if difflib.SequenceMatcher(None, pred_key_norm, normalize(gt_key)).ratio() * 100 >= threshold:
            return gt_key
    return None

def compute_fuzzy_f1(pred_dict, gt_dict, val_threshold=50, key_threshold=50):
    TP = 0
    FP = 0
    FN = 0

    matched_gt_keys = set()
    used_pred_keys = set()

    for pred_key, pred_val in pred_dict.items():
        matched_gt_key = match_key_fuzzy(pred_key, gt_dict.keys(), threshold=key_threshold)
        if matched_gt_key and matched_gt_key not in matched_gt_keys:
            if fuzzy_match(pred_val, gt_dict[matched_gt_key], threshold=val_threshold):
                TP += 1
            else:
                FP += 1
                FN += 1  # counts as both wrong and missing
            matched_gt_keys.add(matched_gt_key)
            used_pred_keys.add(pred_key)
        else:
            FP += 1

    # Remaining ground truth keys are false negatives
    for gt_key in gt_dict:
        if gt_key not in matched_gt_keys:
            FN += 1

    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {
        "TP": TP,
        "FP": FP,
        "FN": FN,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }

# -------- Evaluation Loop --------

gt = df_top10['ground_truth'].tolist()
pred = df_pred['response'].tolist()

TP_total = FP_total = FN_total = 0
precision_total = recall_total = f1_total = 0

for i in range(len(pred)):
    # Parse prediction
    pred_pairs = re.findall(r"'([^']+)': '([^']+)'", str(pred[i]))
    pred_dict = dict(pred_pairs)

    # Parse ground truth
    gt_dict = ast.literal_eval(gt[i])

    # Compute per-instance scores
    result = compute_fuzzy_f1(pred_dict, gt_dict, val_threshold=50, key_threshold=50)
    TP_total += result['TP']
    FP_total += result['FP']
    FN_total += result['FN']
    precision_total += result['precision']
    recall_total += result['recall']
    f1_total += result['f1']

# -------- Results --------

n = len(gt)
print("\n🔍 Evaluation Summary")
print("----------------------")
print("True Positives:", TP_total)
print("False Positives:", FP_total)
print("False Negatives:", FN_total)
print("Precision:", round(precision_total / n, 4))
print("Recall:   ", round(recall_total / n, 4))
print("F1 Score: ", round(f1_total / n, 4))



from collections import defaultdict

# Step 1: Count ground-truth attribute frequency
attribute_frequency = defaultdict(int)
for gt_row in gt:
    gt_dict = ast.literal_eval(gt_row)
    for key in gt_dict:
        attribute_frequency[key] += 1

# Step 2: Collect performance per attribute
attribute_stats = defaultdict(lambda: {"TP": 0, "FP": 0, "FN": 0})
for i in range(len(pred)):
    pred_pairs = re.findall(r"'([^']+)': '([^']+)'", str(pred[i]))
    pred_dict = dict(pred_pairs)
    gt_dict = ast.literal_eval(gt[i])

    matched_gt_keys = set()
    for pred_key, pred_val in pred_dict.items():
        matched_gt_key = match_key_fuzzy(pred_key, gt_dict.keys(), threshold=50)
        if matched_gt_key and matched_gt_key not in matched_gt_keys:
            if fuzzy_match(pred_val, gt_dict[matched_gt_key], threshold=50):
                attribute_stats[matched_gt_key]["TP"] += 1
            else:
                attribute_stats[matched_gt_key]["FP"] += 1
                attribute_stats[matched_gt_key]["FN"] += 1
            matched_gt_keys.add(matched_gt_key)
        else:
            attribute_stats[pred_key]["FP"] += 1

    for gt_key in gt_dict:
        if gt_key not in matched_gt_keys:
            attribute_stats[gt_key]["FN"] += 1

# Step 3: Compute F1 per attribute and attach frequency
attribute_performance = []
for attr, stats in attribute_stats.items():
    TP = stats["TP"]
    FP = stats["FP"]
    FN = stats["FN"]
    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0
    freq = attribute_frequency[attr]
    attribute_performance.append((attr, freq, TP, FP, FN, precision, recall, f1))

# Step 4: Sort by attribute frequency (high to low)
attribute_performance.sort(key=lambda x: x[1], reverse=True)

# Step 5: Print results
print("\n📊 Attribute-level performance (sorted by frequency):")
for attr, freq, TP, FP, FN, precision, recall, f1 in attribute_performance:
    print(f"\nAttribute: {attr}")
    print(f"  Frequency: {freq}")
    print(f"  TP: {TP}, FP: {FP}, FN: {FN}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall:    {recall:.4f}")
    print(f"  F1 Score:  {f1:.4f}")
