# Figure Generation for the paper

## Data Ingest

In [None]:
import os
from typing import List, Dict
import json
import pandas as pd
import sys
sys.path.append('/home/ubuntu/BioDSA') # this will need to be updated for other machines/folders
from src import (
    REPO_ROOT,
    TOP_LEVEL_LOG_DIR
)

logs_directory_path = os.path.join(TOP_LEVEL_LOG_DIR, "experiment_logs")

def get_experiment_results(logs_directory_path: str) -> List[Dict]:
    """
    Get the results of all experiments in the given directory.
    """
    results = []
    for log_file in os.listdir(logs_directory_path):
        if not log_file.endswith(".json") or "experiment|" not in log_file:
            continue
        with open(os.path.join(logs_directory_path, log_file), "r") as f:
            try:
                results.append(json.load(f))
            except json.JSONDecodeError:
                print(f"Error decoding JSON from {log_file}")
    return results

results = get_experiment_results(logs_directory_path)

# create a dataframe from the results
df = pd.DataFrame(results)

# unpack the experiment_config column into separate columns
df = pd.concat([df.drop('experiment_config', axis=1), df['experiment_config'].apply(pd.Series)], axis=1)

# extract the agent type, which is composed of the agent_type, and it's hyperparameters
def get_agent_type(row: Dict) -> str:
    """
    Format agent type based on configuration.
    For react agent: (react, step_count)
    For reasoning coder: (reasoning_coder, planning_model, coding_model)
    For coder: (coder, model_name)
    """
    agent_type = row["agent_type"]
    if agent_type == "react":
        return f"(react, {row['step_count']}, {row['model_name']})"
    elif agent_type == "reasoning_coder":
        return f"(reasoning_coder, {row['planning_model']}, {row['coding_model']})"
    elif agent_type == "coder":
        return f"(coder, {row['model_name']})"
    elif agent_type == "reasoning_react":
        return f"(reasoning_react, {row['plan_model_name']}, {row['agent_model_name']}, {row['step_count']})"
    elif agent_type == "reasoning_react_v2":
        return f"(reasoning_react_v2, {row['plan_model_name']}, {row['agent_model_name']}, {row['step_count']})"
    return agent_type

df['agent_summary'] = df.apply(get_agent_type, axis=1)
df['agent_summary'].value_counts()

## Agent Result Standardization

In [None]:
# sanity check in case some of the llms outputted weird values
display(df['final_answer'].value_counts()) # generated by the agents
display(df['hypothesis_is_true'].value_counts()) # ground truth

# Standardize the final answer values
df['coerced_final_answer'] = df['final_answer'].astype(str).str.lower()
df['hypothesis_is_true_coerced'] = df['hypothesis_is_true'].astype(str).str.lower()

# Check unique values after standardization
print("Unique values in coerced_final_answer before standardization:")
display(df['coerced_final_answer'].value_counts())
print("Unique values in hypothesis_is_true_coerced before standardization:")
display(df['hypothesis_is_true_coerced'].value_counts())

def standardize_agent_response(response: str) -> str:
    """
    Standardize the agent response to a consistent format.
    """
    if 'true' in response:
        return 'true'
    elif 'false' in response:
        return 'false'
    else:
        return 'not verifiable'

# Create a cleaner version of final answer that handles variations
df['clean_final_answer'] = df['coerced_final_answer'].apply(
    standardize_agent_response
)

print("Unique values in clean_final_answer after standardization:")
display(df['clean_final_answer'].value_counts())


## Executability Rate
Furthermore, we evaluate the technical quality of the generated code. For each hypothesis, let $C$
denote the total number of code cells generated and $C_{exec}$ the number of those that are executable
without error. The code executability rate is defined as
$$
\text{Executability Rate} = \frac{C_{exec}}{C}
$$



In [None]:
from typing import Dict, Tuple, List

def extract_error_metadata(observations: str) -> dict:
    """
    Extract error metadata from observations containing stack traces.
    
    Args:
        observations: String containing the execution output
        
    Returns:
        Dictionary with error metadata including:
        - has_error: Boolean indicating if a traceback was found
        - error_type: The type of exception if found (e.g., KeyError, ValueError)
        - error_message: The error message if found
    """
    import re
    
    result = {
        "has_error": False,
        "error_type": None,
        "error_message": None
    }
    
    # Check if there's a traceback in the observations
    if 'Traceback' in observations:
        result["has_error"] = True
        
        # Try to extract the error type and message using regex
        # This pattern looks for the error type at the end of a traceback
        # Updated to handle module-specific errors like _csv.Error
        error_pattern = r'Traceback \(most recent call last\):.*?([A-Za-z0-9_\.]+(?:Error|Exception)):\s*([^\n]+)'
        match = re.search(error_pattern, observations, re.DOTALL)
        
        if match:
            result["error_type"] = match.group(1)
            result["error_message"] = match.group(2).strip()
        else:
            # If we found a traceback but couldn't parse the error type, log a warning
            print(f"WARNING: Traceback found but couldn't parse error type from: {observations[:200]}...")
            
            # Try a more lenient pattern to catch other exception types
            alt_pattern = r'Traceback \(most recent call last\):.*?([A-Za-z0-9_\.]+):\s*([^\n]+)'
            alt_match = re.search(alt_pattern, observations, re.DOTALL)
            
            if alt_match:
                result["error_type"] = alt_match.group(1)
                result["error_message"] = alt_match.group(2).strip()
    
    return result

def extract_react_code_cells(observations: str) -> List[Tuple[int, str]]:
    """
    Extract code cells and their corresponding stdout from React agent observations.
    
    Args:
        observations: String containing the React agent observations
        
    Returns:
        List of tuples (observation_number, stdout) for each code cell
    """
    import re
    
    # Pattern to match observation blocks
    observation_pattern = r'## Observation (\d+)\n### Code: \n```.*?\n(.*?)\n```\n### Stdout:\n(.*?)(?=\n## Observation|\Z)'
    
    # Find all matches
    matches = re.findall(observation_pattern, observations, re.DOTALL)
    
    # Extract observation number and stdout for each match
    result = []
    for match in matches:
        observation_num = int(match[0])
        code = match[1]
        stdout = match[2]
        result.append((observation_num, stdout))
    
    return result

def process_observations(row_dict: Dict) -> Dict:
    """
    Process the observations column to extract the number of code cells and the number of executable code cells
    and extract error metadata if present.
    """
    agent_type = row_dict['agent_type']
    observations = row_dict['observations']
    total_code_cells = 0
    executable_code_cells = 0
    
    if agent_type == 'coder' or agent_type == 'reasoning_coder':
        total_code_cells = 1
        
        # Extract error metadata
        error_data = extract_error_metadata(observations)
        
        # Add error metadata to the row dictionary
        row_dict['has_error'] = error_data['has_error']
        row_dict['error_type'] = error_data['error_type']
        row_dict['error_message'] = error_data['error_message']
        
        if not error_data['has_error']:
            executable_code_cells = 1
        
    elif agent_type == 'react' or agent_type == 'reasoning_react' or agent_type == 'reasoning_react_v2':
        # Extract code cells and their stdout
        code_cells = extract_react_code_cells(observations)
        
        # Calculate total code cells
        total_code_cells = len(code_cells)
        
        # Check each stdout for errors
        error_types = []
        error_messages = []
        
        for _, stdout in code_cells:
            error_data = extract_error_metadata(stdout)
            if not error_data['has_error']:
                executable_code_cells += 1
            else:
                if error_data['error_type']:
                    error_types.append(error_data['error_type'])
                if error_data['error_message']:
                    error_messages.append(error_data['error_message'])
        
        # Store error information
        row_dict['has_error'] = total_code_cells > executable_code_cells
        row_dict['error_type'] = error_types
        row_dict['error_message'] = error_messages
    
    row_dict['total_code_cells'] = total_code_cells
    row_dict['executable_code_cells'] = executable_code_cells
    
    return row_dict

# Function to analyze executability rate by agent type
def analyze_executability_rate(df):
    """
    Analyze the executability rate by agent type.
    
    Args:
        df: DataFrame containing the processed observations
        
    Returns:
        DataFrame with executability rate by agent type
    """
    print(df.index)
    # Process all observations
    processed_df = df.apply(process_observations, axis=1)
    # check that the index is not duplicated
    assert not processed_df.index.duplicated().any(), "Index is duplicated"
    
    # Group by agent type and calculate executability rate
    executability = processed_df.groupby('agent_type').apply(
        lambda x: {
            'total_code_cells': x['total_code_cells'].sum(),
            'executable_code_cells': x['executable_code_cells'].sum(),
            'executability_rate': x['executable_code_cells'].sum() / x['total_code_cells'].sum() if x['total_code_cells'].sum() > 0 else 0
        }, include_groups=False
    ).apply(pd.Series)
    
    return executability

# Analyze error types across all agent types
def analyze_error_types(df):
    """
    Analyze the types of errors across the dataset.
    
    Args:
        df: DataFrame containing the processed observations
        
    Returns:
        DataFrame with error type counts
    """
    # Process all observations to extract error metadata
    processed_df = df.apply(process_observations, axis=1)
    
    # For coder and reasoning_coder agents
    standard_agents = processed_df[
        (processed_df['agent_type'].isin(['coder', 'reasoning_coder'])) & 
        (processed_df['has_error'] == True)
    ]
    
    error_counts = standard_agents['error_type'].value_counts().reset_index()
    error_counts.columns = ['Error Type', 'Count']
    
    # For react agents, we need to flatten the list of error types
    react_agents = processed_df[processed_df['agent_type'].isin(['react', 'reasoning_react', 'reasoning_react_v2']) & (processed_df['has_error'] == True)]
    react_error_types = []
    
    for error_types in react_agents['error_type'].dropna():
        react_error_types.extend(error_types)
    
    if react_error_types:
        react_error_counts = pd.Series(react_error_types).value_counts().reset_index()
        react_error_counts.columns = ['Error Type', 'Count']
        
        # Combine the error counts
        error_counts = pd.concat([error_counts, react_error_counts]).groupby('Error Type').sum().reset_index()
    
    return error_counts

executability_analysis = analyze_executability_rate(df)
display(executability_analysis)

for agent in ['react', 'coder', 'reasoning_coder', 'reasoning_react', 'reasoning_react_v2', 'combined agents']:
    print(f"Error analysis for {agent} agents:")
    if agent != 'combined agents':
        error_analysis = analyze_error_types(df[df['agent_type'] == agent])
        display(error_analysis.sort_values(by='Count', ascending=False))
    else:
        error_analysis = analyze_error_types(df)
        display(error_analysis.sort_values(by='Count', ascending=False))

## Computing Metrics
- Accuracy
- Precision
- Recall
- F1 Score
- True Positive
- False Positive
- True Negative
- False Negative


In [None]:
# Before calculating metrics, validate the data preparation
assert set(df['clean_final_answer'].unique()).issubset({'true', 'false', 'not verifiable'}), "Unexpected values in clean_final_answer"
assert set(df['hypothesis_is_true_coerced'].unique()).issubset({'true', 'false'}), "Unexpected values in hypothesis_is_true_coerced"

# accruacy
df['accuracy'] = df['clean_final_answer'] == df['hypothesis_is_true_coerced']
print("Accuracy:")
display(df['accuracy'].value_counts())

# TPR
df['true_positive'] = (df['clean_final_answer'] == 'true') & (df['hypothesis_is_true_coerced'] == 'true')
print("True Positive:")
display(df['true_positive'].value_counts())

# TNR
df['true_negative'] = (df['clean_final_answer'] == 'false') & (df['hypothesis_is_true_coerced'] == 'false')
print("True Negative:")
display(df['true_negative'].value_counts())

# FPR
df['false_positive'] = (df['clean_final_answer'] == 'true') & (df['hypothesis_is_true_coerced'] == 'false')
print("False Positive:")
display(df['false_positive'].value_counts())

# FNR
df['false_negative'] = (df['clean_final_answer'] == 'false') & (df['hypothesis_is_true_coerced'] == 'true')
print("False Negative:")
display(df['false_negative'].value_counts())

# aggregate metrics
summarized_metrics = {}
number_of_true_positives = df['true_positive'].sum()
number_of_false_positives = df['false_positive'].sum()
number_of_true_negatives = df['true_negative'].sum()
number_of_false_negatives = df['false_negative'].sum()

# precision
summarized_metrics['precision'] = number_of_true_positives / (number_of_true_positives + number_of_false_positives)
print("Precision:", summarized_metrics['precision'])

# recall
summarized_metrics['recall'] = number_of_true_positives / (number_of_true_positives + number_of_false_negatives)
print("Recall:", summarized_metrics['recall'])

# f1 score
summarized_metrics['f1_score'] = 2 * (summarized_metrics['precision'] * summarized_metrics['recall']) / (summarized_metrics['precision'] + summarized_metrics['recall'])
print("F1 Score:", summarized_metrics['f1_score'])

# confusion matrix
# create a confusion matrix
confusion_matrix = pd.DataFrame(index=['true', 'false'], columns=['true', 'false'])
confusion_matrix.loc['true', 'true'] = number_of_true_positives
confusion_matrix.loc['true', 'false'] = number_of_false_positives
confusion_matrix.loc['false', 'true'] = number_of_false_negatives
confusion_matrix.loc['false', 'false'] = number_of_true_negatives
print("Confusion Matrix:")
display(confusion_matrix)


#### Basic Sanity Check on our Calculations

In [None]:
# Validate confusion matrix components add up correctly
total_samples = len(df)
total_classified = number_of_true_positives + number_of_false_positives + number_of_true_negatives + number_of_false_negatives
not_verifiable_count = total_samples - total_classified
assert total_classified + not_verifiable_count == total_samples, "Confusion matrix components don't add up to total samples"

# Validate that confusion matrix components match their definitions
assert number_of_true_positives == df['true_positive'].sum(), "TP count mismatch"
assert number_of_false_positives == df['false_positive'].sum(), "FP count mismatch"
assert number_of_true_negatives == df['true_negative'].sum(), "TN count mismatch"
assert number_of_false_negatives == df['false_negative'].sum(), "FN count mismatch"

# Validate that no sample is counted in multiple categories
assert (df['true_positive'] & df['false_positive']).sum() == 0, "Sample counted as both TP and FP"
assert (df['true_positive'] & df['true_negative']).sum() == 0, "Sample counted as both TP and TN"
assert (df['true_positive'] & df['false_negative']).sum() == 0, "Sample counted as both TP and FN"
assert (df['false_positive'] & df['true_negative']).sum() == 0, "Sample counted as both FP and TN"
assert (df['false_positive'] & df['false_negative']).sum() == 0, "Sample counted as both FP and FN"
assert (df['true_negative'] & df['false_negative']).sum() == 0, "Sample counted as both TN and FN"

# Validate metric calculations
# Avoid division by zero
if (number_of_true_positives + number_of_false_positives) > 0:
    calculated_precision = number_of_true_positives / (number_of_true_positives + number_of_false_positives)
    assert abs(calculated_precision - summarized_metrics['precision']) < 1e-10, "Precision calculation error"
else:
    assert 'precision' not in summarized_metrics or pd.isna(summarized_metrics['precision']), "Precision should be undefined when denominator is zero"

if (number_of_true_positives + number_of_false_negatives) > 0:
    calculated_recall = number_of_true_positives / (number_of_true_positives + number_of_false_negatives)
    assert abs(calculated_recall - summarized_metrics['recall']) < 1e-10, "Recall calculation error"
else:
    assert 'recall' not in summarized_metrics or pd.isna(summarized_metrics['recall']), "Recall should be undefined when denominator is zero"

# F1 score validation
if 'precision' in summarized_metrics and 'recall' in summarized_metrics and summarized_metrics['precision'] > 0 and summarized_metrics['recall'] > 0:
    calculated_f1 = 2 * (summarized_metrics['precision'] * summarized_metrics['recall']) / (summarized_metrics['precision'] + summarized_metrics['recall'])
    assert abs(calculated_f1 - summarized_metrics['f1_score']) < 1e-10, "F1 score calculation error"
else:
    assert 'f1_score' not in summarized_metrics or pd.isna(summarized_metrics['f1_score']), "F1 score should be undefined when precision or recall is zero"

# Validate confusion matrix construction
assert confusion_matrix.loc['true', 'true'] == number_of_true_positives, "TP mismatch in confusion matrix"
assert confusion_matrix.loc['true', 'false'] == number_of_false_positives, "FP mismatch in confusion matrix"
assert confusion_matrix.loc['false', 'true'] == number_of_false_negatives, "FN mismatch in confusion matrix"
assert confusion_matrix.loc['false', 'false'] == number_of_true_negatives, "TN mismatch in confusion matrix"

# Print validation success message
print("All metric calculations validated successfully!")

## Stratify Metrics by Agent Type

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# compute metrics per agent type
grouped_by_agent = df.groupby('agent_summary')

metrics = {}
for agent, group in grouped_by_agent:
    metrics[agent] = {}
    
    # sum for TPR, TNR, FPR, FNR
    number_of_true_positives_group = group['true_positive'].sum()
    number_of_false_positives_group = group['false_positive'].sum()
    number_of_true_negatives_group = group['true_negative'].sum()
    number_of_false_negatives_group = group['false_negative'].sum()
    metrics[agent]['true_positive_sum'] = number_of_true_positives_group
    metrics[agent]['true_negative_sum'] = number_of_true_negatives_group
    metrics[agent]['false_positive_sum'] = number_of_false_positives_group
    metrics[agent]['false_negative_sum'] = number_of_false_negatives_group

    # mean TPR, TNR, FPR, FNR
    metrics[agent]['true_positive_mean'] = group['true_positive'].mean()
    metrics[agent]['true_negative_mean'] = group['true_negative'].mean()
    metrics[agent]['false_positive_mean'] = group['false_positive'].mean()
    metrics[agent]['false_negative_mean'] = group['false_negative'].mean()

    # precision
    if (number_of_true_positives_group + number_of_false_positives_group) > 0:
        group_precision = number_of_true_positives_group / (number_of_true_positives_group + number_of_false_positives_group)
    else:
        group_precision = float('nan')
    metrics[agent]['precision'] = group_precision
    
    # recall
    if (number_of_true_positives_group + number_of_false_negatives_group) > 0:
        group_recall = number_of_true_positives_group / (number_of_true_positives_group + number_of_false_negatives_group)
    else:
        group_recall = float('nan')
    metrics[agent]['recall'] = group_recall
    
    # f1 score
    if group_precision > 0 and group_recall > 0:
        metrics[agent]['f1_score'] = 2 * (group_precision * group_recall) / (group_precision + group_recall)
    else:
        metrics[agent]['f1_score'] = float('nan')

# Convert metrics to DataFrame for easier visualization
metrics_df = pd.DataFrame.from_dict(metrics, orient='index')

# Create visualizations
import matplotlib.pyplot as plt
import seaborn as sns

# 1. Bar plots for key metrics
plt.figure(figsize=(15, 10))
metrics_to_plot = ['precision', 'recall', 'f1_score', 
                   'true_positive_mean', 'true_negative_mean', 
                   'false_positive_mean', 'false_negative_mean']

for i, metric in enumerate(metrics_to_plot):
    plt.subplot(2, 4, i+1)
    sns.barplot(x=metrics_df.index, y=metrics_df[metric])
    plt.title(f'{metric}')
    plt.xticks(rotation=45)
    plt.ylim(0, 1)  # All these metrics are between 0 and 1
plt.tight_layout()
plt.show()

# 2. Confusion matrices for each agent
for agent in metrics:
    plt.figure(figsize=(8, 6))
    
    # Create confusion matrix
    cm = np.array([
        [metrics[agent]['true_positive_sum'], metrics[agent]['false_negative_sum']],
        [metrics[agent]['false_positive_sum'], metrics[agent]['true_negative_sum']]
    ])
    
    # Plot confusion matrix
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Predicted True', 'Predicted False'],
                yticklabels=['Actually True', 'Actually False'])
    
    plt.title(f'Confusion Matrix - {agent}')
    plt.tight_layout()
    plt.show()

# 3. Combined metrics comparison
plt.figure(figsize=(12, 6))
metrics_df[['precision', 'recall', 'f1_score']].plot(kind='bar')
plt.title('Performance Metrics by Agent Type')
plt.ylabel('Score')
plt.xticks(rotation=45)
plt.legend(loc='best')
plt.tight_layout()
plt.show()

# 4. Radar chart for comprehensive comparison
def radar_plot(metrics_df):
    # Select metrics for radar plot
    radar_metrics = ['precision', 'recall', 'f1_score', 
                     'true_positive_mean', 'true_negative_mean']
    
    # Number of variables
    categories = radar_metrics
    N = len(categories)
    
    # Create a figure
    fig = plt.figure(figsize=(10, 10))
    
    # Create angles for each metric
    angles = [n / float(N) * 2 * np.pi for n in range(N)]
    angles += angles[:1]  # Close the loop
    
    # Create subplot in polar projection
    ax = plt.subplot(111, polar=True)
    
    # Draw one axis per variable and add labels
    plt.xticks(angles[:-1], categories, size=12)
    
    # Draw ylabels
    ax.set_rlabel_position(0)
    plt.yticks([0.25, 0.5, 0.75], ["0.25", "0.5", "0.75"], size=10)
    plt.ylim(0, 1)
    
    # Plot each agent
    for agent in metrics_df.index:
        values = metrics_df.loc[agent, radar_metrics].values.flatten().tolist()
        values += values[:1]  # Close the loop
        
        # Plot values
        ax.plot(angles, values, linewidth=2, linestyle='solid', label=agent)
        ax.fill(angles, values, alpha=0.1)
    
    # Add legend
    plt.legend(loc='upper right', bbox_to_anchor=(0.1, 0.1))
    plt.title('Agent Performance Comparison', size=15)
    
    return fig

radar_fig = radar_plot(metrics_df)
plt.tight_layout()
plt.show()

# Return the metrics DataFrame for further analysis
metrics_df

In [None]:
display(metrics_df.sort_values(by='false_positive_mean', ascending=False))
display(metrics_df.sort_values(by='false_negative_mean', ascending=False))
# display(metrics_df.sort_values(by='precision', ascending=False))
# display(metrics_df.sort_values(by='recall', ascending=False))
# display(metrics_df.sort_values(by='f1_score', ascending=False))
# display(metrics_df.sort_values(by='true_positive_mean', ascending=False))
# display(metrics_df.sort_values(by='true_negative_mean', ascending=False))



In [None]:

# Show counts by agent type
df['final_answer'].value_counts()


# Extract 5 random samples from each agent type
agent_types = df['agent_type'].unique()
samples = pd.DataFrame()

for agent in ['reasoning_react']:
    agent_samples = []
    # agent_samples.append(df[(df['agent_type'] == agent) & (df['final_answer'] == 'True')].sample(2))
    # agent_samples.append(df[(df['agent_type'] == agent) & (df['final_answer'] == 'False')].sample(2))
    agent_samples.append(df[(df['agent_type'] == agent) & (df['final_answer'] == 'Not Verifiable')].sample(1))
    samples = pd.concat([samples, *agent_samples])

# Display the samples
# display(samples.head(2))

for idx, sample in samples.iterrows():
    
    agent_type = sample['agent_type']
    
        
    print(f"\nSample {idx}:")
    print("hypothesis:", sample['hypothesis'])
    print("final_answer:", sample['final_answer'])
    print("hypothesis_is_true:", sample['hypothesis_is_true'])
    print("agent_type:", sample['agent_type'])
    print("model_name:", sample['model_name'])
    print("planning_model:", sample['planning_model'])
    print("coding_model:", sample['coding_model'])
    print("step_count:", sample['step_count'])
    
    if agent_type == 'react':
        observations = sample['observations']
        print(observations)
        print(sample['code'])  
            
    elif agent_type == "coder":
        print("code:\n", sample['code'])
        print("observations:\n", sample['observations'])
    
    elif agent_type == "reasoning_coder":
        print("analysis_plan:\n", sample['analysis_plan'])
        print("code:\n", sample['code'])
        print("observations:\n", sample['observations'])
    
    print("final_answer:", sample['final_answer'])
    print("hypothesis_is_true:", sample['hypothesis_is_true'])
    
    print("evidence:")
    for evidence in sample['evidence']:
        print("-", evidence)
        
    print("\npmid:", sample['pmid'])
    print("hypothesis_index:", sample['hypothesis_index'])
    print("dataset_ids:", sample['dataset_ids'])
    
    print("------------------")
    


## Evidence Alignement Analysis

### Data Ingestion
- load the evidence alignment results
- unpack the experiment config results

In [None]:
import os
from typing import Dict
import json
import pandas as pd

import os
from typing import List, Dict
import json
import pandas as pd
import sys
sys.path.append('/home/ubuntu/BioDSA') # this will need to be updated for other machines/folders
from src import (
    REPO_ROOT,
    TOP_LEVEL_LOG_DIR,
    HYPOTHESIS_DIR
)

EVIDENCE_ALIGNMENT_RESULTS_FILE = os.path.join( os.path.join(TOP_LEVEL_LOG_DIR, "eval_evidence_alignment"), "eval_results.json")

alignment_df = pd.read_json(EVIDENCE_ALIGNMENT_RESULTS_FILE)

# unpack the experiment_config column into separate columns

alignment_df = pd.concat([alignment_df.drop('experiment_config', axis=1), alignment_df['experiment_config'].apply(pd.Series)], axis=1)

alignment_df['agent_summary'] = alignment_df.apply(get_agent_type, axis=1)
# display(df['agent_summary'].value_counts())

### Compute Summarized Alignment Results

In [None]:
# create a summary of the alignment results
def summarize_alignment_results(row: Dict) -> str:
    """
    Summarize the alignment results for a given row.
    """
    ground_truth_evidence = row['ground_truth_evidence']
    generated_evidence = row['generated_evidence']
    alignment_results = row['eval_evidence_alignment']
    
    if not len(ground_truth_evidence) == len(alignment_results):
        print(f"WARNING: Length mismatch between ground_truth_evidence and alignment_results for row {row}")
        # print("ground_truth_evidence: ", ground_truth_evidence)
        # print("alignment_results: ", alignment_results)
        # print("generated_evidence: ", generated_evidence)
        # raise ValueError("Length mismatch between ground_truth_evidence and alignment_results")
    
    values = {
        "supported": 0,
        "contradicted": 0,
        "missed": 0
    }
    for i in range(len(ground_truth_evidence)):
        res = alignment_results[i]['alignment']
        res = res.strip().lower()
        
        if res not in ['supported', 'contradicted', 'missed']:
            raise ValueError(f"Invalid alignment result: {res}")
        
        values[res] += 1
        
    for key, value in values.items():
        row[f"alignment_eval_{key}"] = value
        
    return row

alignment_df = alignment_df.apply(summarize_alignment_results, axis=1)
alignment_df

## Non-verifiable Hypothesis Analysis

In [None]:
nv_logs_directory_path = os.path.join(TOP_LEVEL_LOG_DIR, 'experiment_logs_non-verifiable')
nv_results = get_experiment_results(nv_logs_directory_path)

# create a dataframe from the results
df_nv = pd.DataFrame(nv_results)

# unpack the experiment_config column into separate columns
df_nv = pd.concat([df_nv.drop('experiment_config', axis=1), df_nv['experiment_config'].apply(pd.Series)], axis=1)

# extract the agent type, which is composed of the agent_type, and it's hyperparameters
def get_agent_type(row: Dict) -> str:
    """
    Format agent type based on configuration.
    For react agent: (react, step_count)
    For reasoning coder: (reasoning_coder, planning_model, coding_model)
    For coder: (coder, model_name)
    """
    agent_type = row["agent_type"]
    if agent_type == "react":
        return f"(react, {row['step_count']}, {row['model_name']})"
    elif agent_type == "reasoning_coder":
        return f"(reasoning_coder, {row['planning_model']}, {row['coding_model']})"
    elif agent_type == "coder":
        return f"(coder, {row['model_name']})"
    elif agent_type == "reasoning_react":
        return f"(reasoning_react, {row['plan_model_name']}, {row['agent_model_name']}, {row['step_count']})"
    elif agent_type == "reasoning_react_v2":
        return f"(reasoning_react_v2, {row['plan_model_name']}, {row['agent_model_name']}, {row['step_count']})"
    return agent_type

df_nv['agent_summary'] = df_nv.apply(get_agent_type, axis=1)
display(df_nv['agent_summary'].value_counts())

# sanity check in case some of the llms outputted weird values
display(df_nv['final_answer'].value_counts()) # generated by the agents
display(df_nv['hypothesis_is_true'].value_counts()) # ground truth

# Standardize the final answer values
df_nv['coerced_final_answer'] = df_nv['final_answer'].astype(str).str.lower()
df_nv['hypothesis_is_true_coerced'] = df_nv['hypothesis_is_true'].astype(str).str.lower()

# Check unique values after standardization
print("Unique values in coerced_final_answer before standardization:")
display(df_nv['coerced_final_answer'].value_counts())
print("Unique values in hypothesis_is_true_coerced before standardization:")
display(df_nv['hypothesis_is_true_coerced'].value_counts())

def standardize_agent_response(response: str) -> str:
    """
    Standardize the agent response to a consistent format.
    """
    if 'true' in response:
        return 'true'
    elif 'false' in response:
        return 'false'
    else:
        return 'not verifiable'

# Create a cleaner version of final answer that handles variations
df_nv['clean_final_answer'] = df_nv['coerced_final_answer'].apply(
    standardize_agent_response
)

print("Unique values in clean_final_answer after standardization:")
display(df_nv['clean_final_answer'].value_counts())


In [None]:
executability_analysis_nv = analyze_executability_rate(df_nv)
display(executability_analysis_nv)

for agent in ['react', 'coder', 'reasoning_coder', 'combined agents']:
    print(f"Error analysis for {agent} agents:")
    if agent != 'combined agents':
        error_analysis = analyze_error_types(df_nv[df_nv['agent_type'] == agent])
        display(error_analysis.sort_values(by='Count', ascending=False))
    else:
        error_analysis = analyze_error_types(df_nv)
        display(error_analysis.sort_values(by='Count', ascending=False))

In [None]:
# Before calculating metrics, validate the data preparation
assert set(df_nv['clean_final_answer'].unique()).issubset({'true', 'false', 'not verifiable'}), "Unexpected values in clean_final_answer"
assert set(df_nv['hypothesis_is_true_coerced'].unique()).issubset({'true', 'false'}), "Unexpected values in hypothesis_is_true_coerced"

# accruacy
df_nv['accuracy'] = df_nv['clean_final_answer'] == df_nv['hypothesis_is_true_coerced']
print("Accuracy:")
display(df_nv['accuracy'].value_counts())

# TPR
df_nv['true_positive'] = (df_nv['clean_final_answer'] == 'true') & (df_nv['hypothesis_is_true_coerced'] == 'true')
print("True Positive:")
display(df_nv['true_positive'].value_counts())

# TNR
df_nv['true_negative'] = (df_nv['clean_final_answer'] == 'false') & (df_nv['hypothesis_is_true_coerced'] == 'false')
print("True Negative:")
display(df_nv['true_negative'].value_counts())

# FPR
df_nv['false_positive'] = (df_nv['clean_final_answer'] == 'true') & (df_nv['hypothesis_is_true_coerced'] == 'false')
print("False Positive:")
display(df_nv['false_positive'].value_counts())

# FNR
df_nv['false_negative'] = (df_nv['clean_final_answer'] == 'false') & (df_nv['hypothesis_is_true_coerced'] == 'true')
print("False Negative:")
display(df_nv['false_negative'].value_counts())

# aggregate metrics
summarized_metrics_nv = {}
number_of_true_positives_nv = df_nv['true_positive'].sum()
number_of_false_positives_nv = df_nv['false_positive'].sum()
number_of_true_negatives_nv = df_nv['true_negative'].sum()
number_of_false_negatives_nv = df_nv['false_negative'].sum()

# precision
summarized_metrics_nv['precision'] = number_of_true_positives_nv / (number_of_true_positives_nv + number_of_false_positives_nv)
print("Precision:", summarized_metrics_nv['precision'])

# recall
summarized_metrics_nv['recall'] = number_of_true_positives_nv / (number_of_true_positives_nv + number_of_false_negatives_nv)
print("Recall:", summarized_metrics_nv['recall'])

# f1 score
summarized_metrics_nv['f1_score'] = 2 * (summarized_metrics_nv['precision'] * summarized_metrics_nv['recall']) / (summarized_metrics_nv['precision'] + summarized_metrics_nv['recall'])
print("F1 Score:", summarized_metrics_nv['f1_score'])

# confusion matrix
# create a confusion matrix
confusion_matrix_nv = pd.DataFrame(index=['true', 'false'], columns=['true', 'false'])
confusion_matrix_nv.loc['true', 'true'] = number_of_true_positives_nv
confusion_matrix_nv.loc['true', 'false'] = number_of_false_positives_nv
confusion_matrix_nv.loc['false', 'true'] = number_of_false_negatives_nv
confusion_matrix_nv.loc['false', 'false'] = number_of_true_negatives_nv
print("Confusion Matrix:")
display(confusion_matrix_nv)


## Sanity checking
# Validate confusion matrix components add up correctly
total_samples_nv = len(df_nv)
total_classified_nv = number_of_true_positives_nv + number_of_false_positives_nv + number_of_true_negatives_nv + number_of_false_negatives_nv
not_verifiable_count_nv = total_samples_nv - total_classified_nv
assert total_classified_nv + not_verifiable_count_nv == total_samples_nv, "Confusion matrix components don't add up to total samples"

# Validate that confusion matrix components match their definitions
assert number_of_true_positives_nv == df_nv['true_positive'].sum(), "TP count mismatch"
assert number_of_false_positives_nv == df_nv['false_positive'].sum(), "FP count mismatch"
assert number_of_true_negatives_nv == df_nv['true_negative'].sum(), "TN count mismatch"
assert number_of_false_negatives_nv == df_nv['false_negative'].sum(), "FN count mismatch"

# Validate that no sample is counted in multiple categories
assert (df_nv['true_positive'] & df_nv['false_positive']).sum() == 0, "Sample counted as both TP and FP"
assert (df_nv['true_positive'] & df_nv['true_negative']).sum() == 0, "Sample counted as both TP and TN"
assert (df_nv['true_positive'] & df_nv['false_negative']).sum() == 0, "Sample counted as both TP and FN"
assert (df_nv['false_positive'] & df_nv['true_negative']).sum() == 0, "Sample counted as both FP and TN"
assert (df_nv['false_positive'] & df_nv['false_negative']).sum() == 0, "Sample counted as both FP and FN"
assert (df_nv['true_negative'] & df_nv['false_negative']).sum() == 0, "Sample counted as both TN and FN"

# Validate metric calculations
# Avoid division by zero
if (number_of_true_positives_nv + number_of_false_positives_nv) > 0:
    calculated_precision_nv = number_of_true_positives_nv / (number_of_true_positives_nv + number_of_false_positives_nv)
    assert abs(calculated_precision_nv - summarized_metrics_nv['precision']) < 1e-10, "Precision calculation error"
else:
    assert 'precision' not in summarized_metrics_nv or pd.isna(summarized_metrics_nv['precision']), "Precision should be undefined when denominator is zero"

if (number_of_true_positives_nv + number_of_false_negatives_nv) > 0:
    calculated_recall_nv = number_of_true_positives_nv / (number_of_true_positives_nv + number_of_false_negatives_nv)
    assert abs(calculated_recall_nv - summarized_metrics_nv['recall']) < 1e-10, "Recall calculation error"
else:
    assert 'recall' not in summarized_metrics_nv or pd.isna(summarized_metrics_nv['recall']), "Recall should be undefined when denominator is zero"

# F1 score validation
if 'precision' in summarized_metrics_nv and 'recall' in summarized_metrics_nv and summarized_metrics_nv['precision'] > 0 and summarized_metrics_nv['recall'] > 0:
    calculated_f1_nv = 2 * (summarized_metrics_nv['precision'] * summarized_metrics_nv['recall']) / (summarized_metrics_nv['precision'] + summarized_metrics_nv['recall'])
    assert abs(calculated_f1_nv - summarized_metrics_nv['f1_score']) < 1e-10, "F1 score calculation error"
else:
    assert 'f1_score' not in summarized_metrics_nv or pd.isna(summarized_metrics_nv['f1_score']), "F1 score should be undefined when precision or recall is zero"

# Validate confusion matrix construction
assert confusion_matrix_nv.loc['true', 'true'] == number_of_true_positives_nv, "TP mismatch in confusion matrix"
assert confusion_matrix_nv.loc['true', 'false'] == number_of_false_positives_nv, "FP mismatch in confusion matrix"
assert confusion_matrix_nv.loc['false', 'true'] == number_of_false_negatives_nv, "FN mismatch in confusion matrix"
assert confusion_matrix_nv.loc['false', 'false'] == number_of_true_negatives_nv, "TN mismatch in confusion matrix"

# Print validation success message
print("All metric calculations validated successfully!")

In [None]:
# compute metrics per agent type
grouped_by_agent_nv = df_nv.groupby('agent_summary')

metrics_nv = {}
for agent, group in grouped_by_agent_nv:
    metrics_nv[agent] = {}
    
    # sum for TPR, TNR, FPR, FNR
    number_of_true_positives_group = group['true_positive'].sum()
    number_of_false_positives_group = group['false_positive'].sum()
    number_of_true_negatives_group = group['true_negative'].sum()
    number_of_false_negatives_group = group['false_negative'].sum()
    metrics_nv[agent]['true_positive_sum'] = number_of_true_positives_group
    metrics_nv[agent]['true_negative_sum'] = number_of_true_negatives_group
    metrics_nv[agent]['false_positive_sum'] = number_of_false_positives_group
    metrics_nv[agent]['false_negative_sum'] = number_of_false_negatives_group

    # mean TPR, TNR, FPR, FNR
    metrics_nv[agent]['true_positive_mean'] = group['true_positive'].mean()
    metrics_nv[agent]['true_negative_mean'] = group['true_negative'].mean()
    metrics_nv[agent]['false_positive_mean'] = group['false_positive'].mean()
    metrics_nv[agent]['false_negative_mean'] = group['false_negative'].mean()

    # precision
    if (number_of_true_positives_group + number_of_false_positives_group) > 0:
        group_precision_nv = number_of_true_positives_group / (number_of_true_positives_group + number_of_false_positives_group)
    else:
        group_precision_nv = float('nan')
    metrics_nv[agent]['precision'] = group_precision_nv
    
    # recall
    if (number_of_true_positives_group + number_of_false_negatives_group) > 0:
        group_recall_nv = number_of_true_positives_group / (number_of_true_positives_group + number_of_false_negatives_group)
    else:
        group_recall_nv = float('nan')
    metrics_nv[agent]['recall'] = group_recall_nv
    
    # f1 score
    if group_precision_nv > 0 and group_recall_nv > 0:
        metrics_nv[agent]['f1_score'] = 2 * (group_precision_nv * group_recall_nv) / (group_precision_nv + group_recall_nv)
    else:
        metrics_nv[agent]['f1_score'] = float('nan')

# Convert metrics to DataFrame for easier visualization
metrics_df_nv = pd.DataFrame.from_dict(metrics_nv, orient='index')

# Create visualizations
import matplotlib.pyplot as plt
import seaborn as sns

# 1. Bar plots for key metrics
plt.figure(figsize=(15, 10))
metrics_to_plot = ['precision', 'recall', 'f1_score', 
                   'true_positive_mean', 'true_negative_mean', 
                   'false_positive_mean', 'false_negative_mean']

for i, metric in enumerate(metrics_to_plot):
    plt.subplot(2, 4, i+1)
    sns.barplot(x=metrics_df.index, y=metrics_df[metric])
    plt.title(f'{metric}')
    plt.xticks(rotation=45)
    plt.ylim(0, 1)  # All these metrics are between 0 and 1
plt.tight_layout()
plt.show()

# 2. Confusion matrices for each agent
for agent in metrics_nv:
    plt.figure(figsize=(8, 6))
    
    # Create confusion matrix
    cm = np.array([
        [metrics_nv[agent]['true_positive_sum'], metrics_nv[agent]['false_negative_sum']],
        [metrics_nv[agent]['false_positive_sum'], metrics_nv[agent]['true_negative_sum']]
    ])
    
    # Plot confusion matrix
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Predicted True', 'Predicted False'],
                yticklabels=['Actually True', 'Actually False'])
    
    plt.title(f'Confusion Matrix - {agent}')
    plt.tight_layout()
    plt.show()

# 3. Combined metrics comparison
plt.figure(figsize=(12, 6))
metrics_df_nv[['precision', 'recall', 'f1_score']].plot(kind='bar')
plt.title('Performance Metrics by Agent Type')
plt.ylabel('Score')
plt.xticks(rotation=45)
plt.legend(loc='best')
plt.tight_layout()
plt.show()

# 4. Radar chart for comprehensive comparison
def radar_plot(metrics_df):
    # Select metrics for radar plot
    radar_metrics = ['precision', 'recall', 'f1_score', 
                     'true_positive_mean', 'true_negative_mean']
    
    # Number of variables
    categories = radar_metrics
    N = len(categories)
    
    # Create a figure
    fig = plt.figure(figsize=(10, 10))
    
    # Create angles for each metric
    angles = [n / float(N) * 2 * np.pi for n in range(N)]
    angles += angles[:1]  # Close the loop
    
    # Create subplot in polar projection
    ax = plt.subplot(111, polar=True)
    
    # Draw one axis per variable and add labels
    plt.xticks(angles[:-1], categories, size=12)
    
    # Draw ylabels
    ax.set_rlabel_position(0)
    plt.yticks([0.25, 0.5, 0.75], ["0.25", "0.5", "0.75"], size=10)
    plt.ylim(0, 1)
    
    # Plot each agent
    for agent in metrics_df.index:
        values = metrics_df.loc[agent, radar_metrics].values.flatten().tolist()
        values += values[:1]  # Close the loop
        
        # Plot values
        ax.plot(angles, values, linewidth=2, linestyle='solid', label=agent)
        ax.fill(angles, values, alpha=0.1)
    
    # Add legend
    plt.legend(loc='upper right', bbox_to_anchor=(0.1, 0.1))
    plt.title('Agent Performance Comparison', size=15)
    
    return fig

radar_fig = radar_plot(metrics_df_nv)
plt.tight_layout()
plt.show()

# Return the metrics DataFrame for further analysis
metrics_df_nv