# **Research Questions**

##**RQ1 - Entropy** (Optimal number of counterfactual prompts to be generated by Step 1 of CAFFE)

In [None]:
import math
import re
from collections import Counter
from typing import List, Tuple
import pandas as pd

# -------------------------------------------------
TOKEN_RE = re.compile(r"\b\w+\b", re.UNICODE)

def tokenize(text: str) -> List[str]:
    return TOKEN_RE.findall(text.lower())

# -------------------------------------------------
# Shannon entropy with Miller–Madow
def entropy_mm(tokens: List[str]) -> float:
    N = len(tokens)
    if N == 0:
        return 0.0
    counts = Counter(tokens)
    V = len(counts)
    H_naive = -sum((c / N) * math.log2(c / N) for c in counts.values())
    H_mm = H_naive + (V - 1) / (2 * N * math.log(2))
    return H_mm

# -------------------------------------------------
def find_plateau(
    sentences: List[str],
    batch_size: int = 10,
    epsilon: float = 0.02,
    k: int = 3
) -> Tuple[int, List[float]]:

    tokens, h_curve = [], []
    below_threshold_run = 0
    stop_index = -1

    for i in range(0, len(sentences), batch_size):
        batch = sentences[i : i + batch_size]
        for s in batch:
            tokens.extend(tokenize(s))

        h_now = entropy_mm(tokens)
        h_curve.append(h_now)

        if len(h_curve) > 1:
            delta = h_curve[-1] - h_curve[-2]
            if delta < epsilon:
                below_threshold_run += 1
                if below_threshold_run >= k and stop_index == -1:
                    stop_index = i + batch_size
            else:
                below_threshold_run = 0

    return stop_index, h_curve


# -------------------------------------------------
def analyse_in_blocks(
    csv_path: str,
    block_size: int = 20,
    batch_size: int = 1,
    epsilon: float = 0.02,
    k: int = 3,
) -> pd.DataFrame:
    df = pd.read_csv(csv_path)

    results = []
    for start in range(0, len(df), block_size):
        chunk = df.iloc[start : start + block_size]

        if "bias_type" in chunk.columns and chunk["bias_type"].notna().any():
            bias_mode = chunk["bias_type"].mode().iloc[0]
        else:
            bias_mode = None

        sentences = chunk["sentence"].tolist()
        stop_at, curve = find_plateau(
            sentences, batch_size=batch_size, epsilon=epsilon, k=k
        )

        results.append(
            {
                "block_idx": start // block_size + 1,
                "rows": f"{start}-{start+len(chunk)-1}",
                "bias_type": bias_mode,
                "stop_at": stop_at,
                "final_entropy": curve[-1],
                "entropy_curve": curve,
            }
        )
        print(f"\nBlock {results[-1]['block_idx']}  ({results[-1]['rows']})")
        print(f" bias_type: {bias_mode}")
        print(" Entropy-rate curve:", curve)
        if stop_at != -1:
            print(f" Plateau dopo {stop_at} frasi.")
        else:
            print(" Nessun plateau in questo blocco.")

    return pd.DataFrame(results)

# -------------------------------------------------
if __name__ == "__main__":
    summary = analyse_in_blocks(
        "RQ1_Joined_Prompts_for_Evaluation.csv",
        block_size=20,
        batch_size=1,
        epsilon=0.02,
        k=3,
    )

    summary.to_csv("entropy_summary.csv", index=False)


### **Summary of Mean - Median**

In [None]:
import pandas as pd

try:
    df = pd.read_csv('entropy_summary.csv', engine='python', on_bad_lines='skip')
except Exception as e:
    print(f"Error reading CSV: {e}")
    raise

df['stop_at'] = pd.to_numeric(df['stop_at'], errors='coerce')

stats = df.groupby('bias_type')['stop_at'].agg([
    ('mean_stop_at', 'mean'),
    ('median_stop_at', 'median'),
    ('std_stop_at', 'std'),
    ('count', 'count')
]).reset_index()

output_path = 'RQ1_Results.csv'
stats.to_csv(output_path, index=False)
stats

**We consider the median (12 pairs of prompts)**

--------


## **RQ2 - Responses + Similarity Metrics** (Best semantic similarity metric to be implemented in Step 3 of CAFFE)

### **Responses**

#### Identification of a statistically significant set for intent - bias


In [None]:
import pandas as pd
import numpy as np
from math import ceil
import os

In [None]:
PATH = '/content/RQ3_TD_Test_Data.csv'
df   = pd.read_csv(PATH)
print(f"Loaded {len(df):,} rows")

# === Build a pair index: every two consecutive rows share the same index ===
df['pair_idx']      = df.index // 2
df['pos_in_pair']   = df.groupby('pair_idx').cumcount() + 1


# === Pivot from long to wide: one row per pair ===
wide = (
    df.pivot(index='pair_idx',
             columns='pos_in_pair',
             values=['group', 'sentence'])
      .reset_index(drop=True)
)

# === Flatten the MultiIndex columns that result from the pivot ===
wide.columns = [
    f'{col[0]}_{col[1]}'
    for col in wide.columns
]

# Bring back the metadata columns (topic, intent, bias_type) – they are identical within each pair
meta_cols = ['topic', 'intent', 'bias_type']
meta = (
    df.groupby('pair_idx')[meta_cols]
      .first()
      .reset_index(drop=True)
)

# Final wide DataFrame
wide_df = pd.concat([meta, wide], axis=1)

print(f"Wide table has {len(wide_df):,} rows and {wide_df.shape[1]} columns")
wide_df.head()

OUT_WIDE = '/content/RQ3_TD_Test_Data_Joined.csv'
wide_df.to_csv(OUT_WIDE, index=False)
print(f"Wide version written to {OUT_WIDE}")

In [None]:
PATH = '/content/sentence_pairs_wide.csv'
wide_df = pd.read_csv(PATH)
print(f"{len(wide_df):,} rows (pairs) loaded")

# === Sample-size function (finite-population correction) ===
def sample_size(N, E=0.05, p=0.5, Z=1.96):
    """
    N : population size (number of pairs in the set)
    E : margin of error (default 0.05 = 5 %)
    p : estimated proportion (0.5 = worst-case, largest n)
    Z : Z-score (1.96 → 95 % confidence)
    """
    n0 = (Z**2 * p * (1 - p)) / (E**2)
    return int(ceil(n0 / (1 + (n0 - 1) / N)))

# === Draw the samples ===
rng          = np.random.default_rng()
samples      = []
summary_rows = []

for (bias, intent), grp in wide_df.groupby(['bias_type', 'intent'], sort=False):
    N        = len(grp)
    n_needed = sample_size(N)
    chosen   = rng.choice(grp.index,
                          size=min(N, n_needed),
                          replace=False)
    samples.append(wide_df.loc[chosen])

    summary_rows.append({
        'bias_type'   : bias,
        'intent'      : intent,
        'total_pairs' : N,
        'sample_size' : n_needed,
        'drawn_pairs' : len(chosen)
    })

sampled_df = pd.concat(samples, ignore_index=True)
summary_df = pd.DataFrame(summary_rows)

# ---------------------
OUT_SAMPLE = '/content/sampled_pairs.csv'
OUT_STATS  = '/content/sample_sizes.csv'

sampled_df.to_csv(OUT_SAMPLE, index=False)
summary_df.to_csv(OUT_STATS,  index=False)

print(f"{len(sampled_df):,} rows (pairs) written to {OUT_SAMPLE}")
print(f"Sample-size table written to {OUT_STATS}")


In [None]:
# === Summary table ===
summary_df = pd.DataFrame(summary_rows, columns=[
    'bias_type', 'intent', 'total_pairs', 'sample_size', 'drawn_pairs'
])

totals = summary_df[['total_pairs', 'sample_size', 'drawn_pairs']].sum()
total_row = pd.DataFrame([{
    'bias_type'  : 'TOTAL',
    'intent'     : '',
    'total_pairs': totals['total_pairs'],
    'sample_size': totals['sample_size'],
    'drawn_pairs': totals['drawn_pairs']
}])

summary_df = pd.concat([summary_df, total_row], ignore_index=True)

# === Result ===
print(summary_df)
print("\nGrand totals –")
print("  total_pairs :", totals['total_pairs'])
print("  sample_size :", totals['sample_size'])
print("  drawn_pairs :", totals['drawn_pairs'])


OUT_STATS = '/content/sample_sizes.csv'
summary_df.to_csv(OUT_STATS, index=False)
print(f"Updated sample-size table (with totals) saved to {OUT_STATS}")

#### Responses Generation

In [None]:
!pip install --quiet --upgrade openai tqdm

In [None]:
import os
from google.colab import userdata
from openai import OpenAI
import pandas as pd
from tqdm.auto import tqdm
import time
from google.colab import drive
import numpy as np

In [None]:
drive.mount("/content/drive", force_remount=False)
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

drive_path  = "/.../sampled_pairs.csv" # Final File Save Path

df = pd.read_csv(drive_path)

required = {"sentence_1", "sentence_2"}
missing  = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns: {missing}")

for col in ("response_1", "response_2"):
    if col not in df.columns:
        df[col] = ""

def gpt_reply(prompt: str, model: str = "gpt-4o-mini") -> str:
    try:
        res = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
        )
        return res.choices[0].message.content.strip()
    except Exception as e:
        print(f"Errore con prompt «{prompt[:50]}…»: {e}")
        return ""

todo = (df["response_1"] == "").sum() + (df["response_2"] == "").sum()


with tqdm(total=todo, desc="Response Generation") as pbar:
    for idx, row in df.iterrows():
        # sentence_1
        if row["response_1"] == "":
            df.at[idx, "response_1"] = gpt_reply(row["sentence_1"])
            pbar.update(1)

        # sentence_2
        if row["response_2"] == "":
            df.at[idx, "response_2"] = gpt_reply(row["sentence_2"])
            pbar.update(1)

        # Save after each row
        df.to_csv(drive_path, index=False)
        time.sleep(1)

df.to_csv(drive_path, index=False)
print("Save completed")

### **Similarity Metrics**

#### BERT

In [None]:
!pip install transformers torch --quiet

In [None]:
from transformers import BertTokenizer, BertModel
import torch
import numpy as np
from tqdm import tqdm
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd

In [None]:
df = pd.read_csv('RQ2_Responses.csv')

# Model and Tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
model.eval()

# Embedding
def get_sentence_embedding(text):
    inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=128)
    with torch.no_grad():
        outputs = model(**inputs)

    embeddings = outputs.last_hidden_state.squeeze(0)
    attention_mask = inputs['attention_mask'].squeeze(0)
    valid_embeddings = embeddings[attention_mask == 1]

    mean_embedding = valid_embeddings.mean(dim=0).cpu().numpy()
    return mean_embedding

# Cosine Similarity
def get_cosine_similarity(emb1, emb2):
    emb1 = emb1.reshape(1, -1)
    emb2 = emb2.reshape(1, -1)
    return cosine_similarity(emb1, emb2)[0][0]


similarities = []
for idx, row in tqdm(df.iterrows(), total=len(df)):
    text1 = row['response_1']
    text2 = row['response_2']
    emb1 = get_sentence_embedding(text1)
    emb2 = get_sentence_embedding(text2)
    sim = get_cosine_similarity(emb1, emb2)
    similarities.append(sim)

df['BERT_similarity'] = similarities
df.to_csv('Responses_with_similarity.csv', index=False)

#### Latent Semantic Analysis (LSA)

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
path = "Responses_with_similarity.csv"
df = pd.read_csv(path)
all_texts = df['response_1'].tolist() + df['response_2'].tolist()

# TF-IDF
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(all_texts)

# LSA
lsa = TruncatedSVD(n_components=100, random_state=42)
X_lsa = lsa.fit_transform(X)

n = len(df)
X1, X2 = X_lsa[:n], X_lsa[n:]

# Similarity
lsa_sims = [cosine_similarity([v1], [v2])[0][0] for v1, v2 in zip(X1, X2)]

df['LSA_similarity'] = lsa_sims
df.to_csv(path, index=False)

#### Latent Dirichlet Allocation (LDA)

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
path = "Responses_with_similarity.csv"
df = pd.read_csv(path)
all_texts = df['response_1'].tolist() + df['response_2'].tolist()

# Count matrix
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(all_texts)

# LDA
lda = LatentDirichletAllocation(n_components=30, random_state=42)
X_lda = lda.fit_transform(X)


X1, X2 = X_lda[:n], X_lda[n:]

# Similarity
lda_sims = [cosine_similarity([v1], [v2])[0][0] for v1, v2 in zip(X1, X2)]

df['LDA_similarity'] = lda_sims
df.to_csv(path, index=False)

### **Data Analysis**

In [None]:
!pip install pandas matplotlib seaborn --quiet

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
path = "RQ2_Responses_with_similarity.csv"
df = pd.read_csv(path)

# Convert similarity columns to numeric
similarity_cols = ['BERT_similarity', 'LSA_similarity', 'LDA_similarity']
df[similarity_cols] = df[similarity_cols].apply(pd.to_numeric, errors='coerce')

# === Overall Statistics ===
overall_stats = df[similarity_cols].describe().T
print("=== Overall Similarity Statistics ===")
print(overall_stats)

# === Statistics Grouped by bias_type (flattened) ===
grouped = df.groupby('bias_type')[similarity_cols].describe()

# === Flatten multi-level columns ===
grouped.columns = [f'{col}_{stat}' for col, stat in grouped.columns]

# ===Reset index to move bias_type back as a column ===
grouped_stats = grouped.reset_index()

print("\n=== Cleaned Grouped Statistics (one row per bias_type) ===")
print(grouped_stats)

# === Visualizations ===

# === Boxplot of similarity scores by bias type ===
plt.figure(figsize=(12, 6))
for i, sim in enumerate(similarity_cols):
    plt.subplot(1, 3, i+1)
    sns.boxplot(data=df, x='bias_type', y=sim)
    plt.title(f'{sim} by Bias Type')
    plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# === Histogram of each similarity metric ===
for sim in similarity_cols:
    plt.figure(figsize=(6, 4))
    sns.histplot(df[sim], bins=20, kde=True)
    plt.title(f'Distribution of {sim}')
    plt.xlabel(sim)
    plt.ylabel('Frequency')
    plt.show()

In [None]:
# === Ensure similarity columns are numeric ===
similarity_cols = ['BERT_similarity', 'LSA_similarity', 'LDA_similarity']
df[similarity_cols] = df[similarity_cols].apply(pd.to_numeric, errors='coerce')

# === Set thresholds from 0.1 to 0.9 ===
thresholds = np.arange(0.1, 1.0, 0.1)

# === Function to classify PASS/FAIL per row and threshold ===
def evaluate_row(row, threshold):
    return {
        metric: "PASS" if row[metric] >= threshold else "FAIL"
        for metric in similarity_cols
    }

# === Store results ===
results_all = []
results_by_bias = []

# === Evaluate each row at each threshold ===
for t in thresholds:
    df_temp = df.copy()
    df_temp[[f"{col}_result_{t:.1f}" for col in similarity_cols]] = df.apply(
        lambda row: pd.Series(evaluate_row(row, t)), axis=1
    )
    df_temp['threshold'] = t
    results_all.append(df_temp)


results_all_df = pd.concat(results_all, ignore_index=True)

#===  Melt to long format ===
melted = pd.melt(
    results_all_df,
    id_vars=['threshold', 'bias_type'],
    value_vars=[f"{col}_result_{t:.1f}" for t in thresholds for col in similarity_cols],
    var_name="metric_threshold",
    value_name="result"
)

# === Extract original metric and threshold ===
melted['metric'] = melted['metric_threshold'].apply(lambda x: x.split('_result_')[0])
melted['used_threshold'] = melted['metric_threshold'].apply(lambda x: float(x.split('_result_')[1]))

# === Overall PASS/FAIL statistics ===
summary_all = (
    melted.groupby(['metric', 'used_threshold', 'result'])
    .size()
    .unstack(fill_value=0)
    .reset_index()
)

# === Grouped PASS/FAIL by bias_type ===
summary_by_bias = (
    melted.groupby(['bias_type', 'metric', 'used_threshold', 'result'])
    .size()
    .unstack(fill_value=0)
    .reset_index()
)

print(summary_all)
print(summary_by_bias)

In [None]:
# === Count fairness bugs (FAILs) globally per metric–threshold ===
df_global = (
    melted[melted['result'] == 'FAIL']
    .groupby(['metric', 'used_threshold'])
    .size()
    .reset_index(name='fairness_bugs')
)

# === Count fairness bugs (FAILs) per bias_type ===
df_bias = (
    melted[melted['result'] == 'FAIL']
    .groupby(['bias_type', 'metric', 'used_threshold'])
    .size()
    .reset_index(name='fairness_bugs')
)

# === Identify the best overall metric–threshold combo ===
best_global = df_global.sort_values(by='fairness_bugs', ascending=False).iloc[0]

print("BEST METRIC–THRESHOLD COMBINATION (OVERALL):")
print(f"- Metric: {best_global['metric']}")
print(f"- Threshold: {best_global['used_threshold']}")
print(f"- Fairness Bugs Detected: {best_global['fairness_bugs']}\n")

# === Best metric–threshold per bias_type ===
best_per_bias = (
    df_bias.sort_values(by='fairness_bugs', ascending=False)
    .groupby('bias_type', as_index=False)
    .first()
)

print("BEST COMBINATION BY BIAS TYPE:")
for _, row in best_per_bias.iterrows():
    print(f"- {row['bias_type']}: {row['metric']} @ {row['used_threshold']} (bugs: {row['fairness_bugs']})")


In [None]:
# === Count FAILs per metric–threshold ===
df_global = (
    melted[melted['result'] == 'FAIL']
    .groupby(['metric', 'used_threshold'])
    .size()
    .reset_index(name='fairness_bugs')
)

# === Count FAILs per bias_type–metric–threshold ===
df_bias = (
    melted[melted['result'] == 'FAIL']
    .groupby(['bias_type', 'metric', 'used_threshold'])
    .size()
    .reset_index(name='fairness_bugs')
)

# === Correct total cases ===
num_test_cases = df.shape[0]

# For global: each metric–threshold combo has exactly num_test_cases
df_total_global = pd.DataFrame([
    {
        'metric': metric,
        'used_threshold': threshold,
        'total_cases': num_test_cases
    }
    for metric in similarity_cols
    for threshold in thresholds
])

# For bias-specific: base counts only on original df
df_total_bias_base = df.groupby('bias_type').size().reset_index(name='base_cases')

df_total_bias = pd.DataFrame([
    {
        'bias_type': row['bias_type'],
        'metric': metric,
        'used_threshold': threshold,
        'total_cases': row['base_cases']
    }
    for _, row in df_total_bias_base.iterrows()
    for metric in similarity_cols
    for threshold in thresholds
])

# === Merge and compute fail rates ===
df_global = pd.merge(df_global, df_total_global, on=['metric', 'used_threshold'])
df_global['fail_rate'] = df_global['fairness_bugs'] / df_global['total_cases'] * 100

df_bias = pd.merge(df_bias, df_total_bias, on=['bias_type', 'metric', 'used_threshold'])
df_bias['fail_rate'] = df_bias['fairness_bugs'] / df_bias['total_cases'] * 100

# === Identify best overall combination ===
global_mean_fail_rate = df_global['fail_rate'].mean()
df_global['fail_rate_diff_from_mean'] = df_global['fail_rate'] - global_mean_fail_rate
best_global = df_global.sort_values(by='fail_rate_diff_from_mean', ascending=False).iloc[0]

print("BEST METRIC–THRESHOLD COMBINATION (RELATIVE TO MEAN FAIL RATE):")
print(f"- Metric: {best_global['metric']}")
print(f"- Threshold: {best_global['used_threshold']}")
print(f"- Fairness Bugs: {best_global['fairness_bugs']} out of {best_global['total_cases']}")
print(f"- FAIL Rate: {best_global['fail_rate']:.2f}% (Δ from mean: {best_global['fail_rate_diff_from_mean']:.2f}%)\n")

# === Best per bias type ===
best_per_bias = (
    df_bias.sort_values(by='fail_rate', ascending=False)
    .groupby('bias_type', as_index=False)
    .first()
)

print("BEST COMBINATION BY BIAS TYPE:")
for _, row in best_per_bias.iterrows():
    print(f"- {row['bias_type']}: {row['metric']} @ {row['used_threshold']} → "
          f"{row['fairness_bugs']} FAILs out of {row['total_cases']} "
          f"({row['fail_rate']:.2f}%)")


df_global.to_csv("global_fairness_bug_rates.csv", index=False)
df_bias.to_csv("bias_specific_fairness_bug_rates.csv", index=False)
best_per_bias.to_csv("best_combinations_per_bias.csv", index=False)

from google.colab import files
files.download("global_fairness_bug_rates.csv")
files.download("bias_specific_fairness_bug_rates.csv")
files.download("best_combinations_per_bias.csv")

# === Print top 10 globally ranked configs ===
print("\n TOP 10 METRIC–THRESHOLD COMBINATIONS BY FAIL RATE:")
top_10 = df_global.sort_values(by='fail_rate', ascending=False).head(10)
print(top_10[['metric', 'used_threshold', 'fail_rate', 'fairness_bugs', 'total_cases']].to_string(index=False))


--------------------------


## **RQ3 - Overall Evaluation** (Evaluation of CAFFE results against METAL)

In [None]:
import pandas as pd

path = "Result_LLAMA_Responses_RQ3_TD_Test_Data_Joined.csv"
df = pd.read_csv(path)

# === Compute overall ASR and similarity stats ===
total_execs = len(df)
unsatisfied = (df['ResultLabel'] == 'FAIL').sum()
asr_overall = unsatisfied / total_execs

overall_mean = df['ActualResult'].mean()
overall_median = df['ActualResult'].median()
overall_std = df['ActualResult'].std()

# === Compute per bias_type ASR and similarity stats ===
summary_by_bias = df.groupby('bias_type').agg(
    Total_Executions=('ResultLabel', 'count'),
    Fails=('ResultLabel', lambda x: (x == 'FAIL').sum()),
    Mean=('ActualResult', 'mean'),
    Median=('ActualResult', 'median'),
    StdDev=('ActualResult', 'std')
).reset_index()

summary_by_bias['ASR'] = summary_by_bias['Fails'] / summary_by_bias['Total_Executions']

# === Append overall row ===
overall_row = pd.DataFrame([{
    'bias_type': 'Overall',
    'Total_Executions': total_execs,
    'Fails': unsatisfied,
    'Mean': overall_mean,
    'Median': overall_median,
    'StdDev': overall_std,
    'ASR': asr_overall
}])

summary_with_overall = pd.concat([summary_by_bias, overall_row], ignore_index=True)

summary_with_overall.to_csv("RQ3_Results_" + path, index=False)
print(summary_with_overall.to_string(index=False))


