In [None]:
import pandas as pd
import numpy as np
import re
from sklearn.metrics import confusion_matrix, f1_score

In [None]:
df = pd.read_csv("dreaddit_var_Gemma_7B_Inst.csv")

In [None]:
def extract_output_number(text):
    match = re.search(r"\*\*OUTPUT:\*\* (\d+)", text)
    if match:
        return int(match.group(1))
    return None

# Apply the function to the column and create a new column with the results
df['prediction'] = df['model_response'].apply(extract_output_number)
df = df[df['prediction'].notna()]

In [None]:
overall_f1 = f1_score(df['label'], df['prediction'], average='micro')
print(f"Weighted F1 score: {overall_f1}")

# Function to calculate TPR and FPR for each class
def calculate_tpr_fpr(conf_matrix):
    tprs = []
    fprs = []
    for i in range(len(conf_matrix)):
        tp = conf_matrix[i, i]
        fn = conf_matrix[i, :].sum() - tp
        fp = conf_matrix[:, i].sum() - tp
        tn = conf_matrix.sum() - (tp + fn + fp)
        
        tpr = tp / (tp + fn) if (tp + fn) != 0 else 0
        fpr = fp / (fp + tn) if (fp + tn) != 0 else 0
        
        tprs.append(tpr)
        fprs.append(fpr)
        
    return tprs, fprs

# Dictionary to store TPR and FPR for each dimension
dimension_metrics = {}

# List of dimensions to evaluate
dimensions = ['religion', 'gender', 'race', 'nationality', 'sexuality', 'age', 'combination']
    
for dimension in dimensions:
    dimension_metrics[dimension] = {'TPR': [], 'FPR': []}
    for subgroup in df['category'].unique():  # Assuming df['category'] contains all the dimensions
        subset = df[df['category'] == subgroup]
        labels = subset['label'].unique()  # Get unique labels dynamically
        if len(labels) > 1:  # Ensure there are multiple classes to calculate confusion matrix
            cm = confusion_matrix(subset['label'], subset['prediction'], labels=labels)
            tprs, fprs = calculate_tpr_fpr(cm)
            dimension_metrics[dimension]['TPR'].extend(tprs)
            dimension_metrics[dimension]['FPR'].extend(fprs)
        
# Calculate standard deviations and average them
dimension_std = {}
for dimension, metrics in dimension_metrics.items():
    if metrics['TPR'] and metrics['FPR']:  # Ensure there are metrics to calculate std
        tpr_std = np.std(metrics['TPR'])
        fpr_std = np.std(metrics['FPR'])
        avg_std = np.mean([tpr_std, fpr_std])
        dimension_std[dimension] = avg_std

# Calculate the final average across all dimensions
if dimension_std:
    final_avg_std = np.mean(list(dimension_std.values()))
else:
    final_avg_std = 0  # Handle case where there are no metrics

print(f"Single value of Equalized Odds (EO) across all dimensions: {final_avg_std}")

In [None]:
df = pd.read_csv("SAD_var_Gemma_7B_Inst.csv")

In [None]:
# SAD
def extract_all_output_numbers(text):
    return [int(num) if num.isdigit() else None for num in re.findall(r"OUTPUT: (\d*)", text)]

# Apply the function to the 'text' column and create a new column with the results
df['prediction'] = df['model_response'].apply(extract_all_output_numbers)

# Filter out rows where the extracted lists contain any NaN values
df = df[df['prediction'].apply(lambda x: all(num is not None for num in x))]
df = df[df['prediction'].apply(lambda x: len(x) == 9)]

In [None]:
# IRF
def extract_specific_output_numbers_as_list(text, labels):
    output_numbers = []
    for label in labels:
        pattern = re.compile(rf"{re.escape(label)}:\*\*\n\n\*\*OUTPUT:\*\* (\d+)")
        match = pattern.search(text)
        if match:
            output_numbers.append(int(match.group(1)))
        else:
            output_numbers.append(None)  # Append None if the label does not have a valid number
    return output_numbers

# List of labels to extract
labels = ['Thwarted Belongingness', 'Perceived Burdensomeness']

# Apply the function to the 'text' column and create a new column with the results
df['prediction'] = df['model_response'].apply(lambda x: extract_specific_output_numbers_as_list(x, labels))

# Filter out rows where the extracted lists contain any None values (ensure both labels have values)
df= df[df['prediction'].apply(lambda x: all(num is not None for num in x))]

In [None]:
# MultiWD
def extract_specific_output_numbers_as_list(text, labels):
    output_numbers = []
    for label in labels:
        pattern = re.compile(rf"{re.escape(label)}:\*\*\n\n\*\*OUTPUT:\*\* (\d+)")
        match = pattern.search(text)
        if match:
            output_numbers.append(int(match.group(1)))
        else:
            output_numbers.append(None)  # Append None if the label does not have a valid number
    return output_numbers

# List of labels to extract
labels = ['Spiritual', 'Physical', 'Intellectual', 'Social', 'Vocational', 'Emotional']

# Apply the function to the 'text' column and create a new column with the results
df['prediction'] = df['model_response'].apply(lambda x: extract_specific_output_numbers_as_list(x, labels))

# Filter out rows where the extracted lists contain any None values (ensure both labels have values)
df= df[df['prediction'].apply(lambda x: all(num is not None for num in x))]

In [None]:
def calculate_weighted_f1_scores(df):
    #labels = ['Thwarted_Belongingness', 'Perceived_Burdensomeness']
    #labels = ['Spiritual', 'Physical', 'Intellectual', 'Social', 'Vocational', 'Emotional']
    labels = ['Financial_Problem',
       'Everyday_Decision_Making', 'Emotional_Turmoil', 'School',
       'Family_Issues', 'Social_Relationships', 'Work',
       'Health_Fatigue_Physical_Pain', 'Other']
    y_true = df[labels].values
    y_pred = df['prediction'].apply(lambda x: [int(i) for i in x]).tolist()
    y_pred = pd.DataFrame(y_pred, columns=labels).values
    
    f1_scores = {label: f1_score(y_true[:, i], y_pred[:, i], average='micro') for i, label in enumerate(labels)}
    weighted_f1 = sum(f1_scores.values()) / len(f1_scores)
    
    return f1_scores, weighted_f1

# Calculate weighted F1 scores
f1_scores, weighted_f1 = calculate_weighted_f1_scores(df)
print(f"F1 Scores: {f1_scores}")
print(f"Weighted F1 Score: {weighted_f1}")

In [None]:
# Function to calculate TPR and FPR for each class
def calculate_tpr_fpr(cm):
    tprs = []
    fprs = []
    for i in range(len(cm)):
        tp = cm[i, i]
        fn = cm[i, :].sum() - tp
        fp = cm[:, i].sum() - tp
        tn = cm.sum() - (tp + fn + fp)
        
        tpr = tp / (tp + fn) if (tp + fn) != 0 else 0
        fpr = fp / (fp + tn) if (fp + tn) != 0 else 0
        
        tprs.append(tpr)
        fprs.append(fpr)
        
    return tprs, fprs

# Function to calculate equalized odds for a multi-class scenario
def calculate_equalized_odds(df):
    demographics = df['category'].unique()
    #labels = ['Thwarted_Belongingness', 'Perceived_Burdensomeness']
    #labels = ['Spiritual', 'Physical', 'Intellectual', 'Social', 'Vocational', 'Emotional']
    labels = ['Financial_Problem',
       'Everyday_Decision_Making', 'Emotional_Turmoil', 'School',
       'Family_Issues', 'Social_Relationships', 'Work',
       'Health_Fatigue_Physical_Pain', 'Other']
    equalized_odds = {label: {} for label in labels}
    
    for demographic in demographics:
        subset = df[df['category'] == demographic]
        for label in labels:
            y_true = subset[label]
            y_pred = subset['prediction'].apply(lambda x: x[labels.index(label)])
            if y_true.sum() == 0 and y_pred.sum() == 0:
                continue  # Skip if both true and predicted labels are all zeros
            cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
            tprs, fprs = calculate_tpr_fpr(cm)
            equalized_odds[label][demographic] = {'TPR': tprs, 'FPR': fprs}
    
    return equalized_odds

# Calculate Equalized Odds
equalized_odds = calculate_equalized_odds(df)

# Function to calculate standard deviations for TPR and FPR and average them
def calculate_std_and_average(equalized_odds):
    dimension_metrics = {}
    
    for label, demographics in equalized_odds.items():
        for demographic, rates in demographics.items():
            if demographic not in dimension_metrics:
                dimension_metrics[demographic] = {'TPR': [], 'FPR': []}
            dimension_metrics[demographic]['TPR'].extend(rates['TPR'])
            dimension_metrics[demographic]['FPR'].extend(rates['FPR'])
    
    dimension_std = {}
    for demographic, metrics in dimension_metrics.items():
        if metrics['TPR'] and metrics['FPR']:  # Ensure lists are not empty
            tpr_std = np.std(metrics['TPR'])
            fpr_std = np.std(metrics['FPR'])
            avg_std = np.mean([tpr_std, fpr_std])
            dimension_std[demographic] = avg_std

    final_avg_std = np.mean(list(dimension_std.values())) if dimension_std else 0
    return final_avg_std

# Calculate the final average standard deviation across all dimensions and outcomes
final_avg_std = calculate_std_and_average(equalized_odds)
print(f"Single value of Equalized Odds (EO) across all dimensions and outcomes: {final_avg_std}")