# Quantitative Analysis on LLMs Performance
This notebook performs comprehensive quantitative analysis on LLM performance across different test conditions. Functions are defined to load and process data, calculate scores, and generate statistics. The main execution block loads data, calculates scores, and saves the results to a CSV file.

## Imports and Setup

In [1]:
import pandas as pd
import glob
import os
from typing import Dict, List, Tuple

## Data Cleaning Functions

In [None]:
def clean_model_answer(answer: str) -> str:
    """
    Clean Model_Answer by removing spaces and commas
    Example: 'A, B, C' -> 'A B C'
    """
    if pd.isna(answer):  # Handle NaN values
        return answer
    return answer.replace(' ', '').replace(',', '')

def clean_confidence_value(x):
    """
    Convert confidence values to float, handling various formats
    """
    if pd.isna(x):
        return None
    try:
        if isinstance(x, str):
            # Remove any non-numeric characters except decimal point
            x = ''.join(c for c in x if c.isdigit() or c == '.')
        return float(x)
    except (ValueError, TypeError):
        return None

## Data Loading Functions

In [2]:
def load_and_process_files() -> Dict[str, pd.DataFrame]:
    """
    Load all CSV files and organize them by condition
    Returns a dictionary with condition numbers as keys and corresponding dataframes as values
    """
    # Dictionary to store dataframes by condition number
    condition_dfs = {}
    
    # Get all CSV files
    csv_files = glob.glob(r'frontier_results_datafiles/results_*.csv')
    
    for file in csv_files:
        # Get just the filename without the path
        filename = os.path.basename(file)
        # Parse filename components
        parts = filename.replace('.csv', '').split('_')
        
        if len(parts) != 6:
            print(f"Skipping {file} - not enough parts in filename")
            continue
            
        exam_type = parts[1]      # policy or comprehensive
        model = parts[2]          # qwen, mistral, etc.
        quesion_type = parts[5]   # 

        # Find the index of "condition" in parts and get the number after it
        try:
            condition_index = parts.index('condition')
            condition_num = parts[condition_index + 1]
            condition = f"condition_{condition_num}"
        except ValueError:
            print(f"Could not find condition number in {filename}")
            continue
        
        # Skip reading files for condition_3
        if condition == 'condition_3':
            print(f"Skipping {file} - condition_3")
            continue
            
        print(f"Processing: {filename}")
        
        # Read the CSV file
        df = pd.read_csv(file)
        
        # Clean Model_Answer column
        df['Model_Answer'] = df['Model_Answer'].apply(clean_model_answer)
        if condition != 'condition_3':
            df['Model_Confidence'] = df['Model_Confidence'].apply(clean_confidence_value)
        
        # Add exam_type from filename
        df['exam_type'] = exam_type
        df["question_type"] = quesion_type
        
        # Initialize condition key if it doesn't exist
        if condition not in condition_dfs:
            condition_dfs[condition] = []
        
        # Append the dataframe to the appropriate condition list
        condition_dfs[condition].append(df)
    
    # Combine dataframes within each condition
    for condition in list(condition_dfs.keys()):
        if condition_dfs[condition]:
            condition_dfs[condition] = pd.concat(condition_dfs[condition], ignore_index=True)
            print(f"Concatenated {condition}: {len(condition_dfs[condition])} rows")
        else:
            print(f"Warning: No files found for {condition}")
            condition_dfs.pop(condition)
    
    print("\nAvailable conditions:", list(condition_dfs.keys()))
    return condition_dfs

## Scoring Functions

In [None]:
def calculate_scores(condition_name: str, condition_dfs: Dict[str, pd.DataFrame], include_confidence: bool = True) -> pd.DataFrame:
    """
    Universal function to calculate scores for all conditions.
    """
    if condition_name not in condition_dfs:
        print(f"Error: {condition_name} not found in the data")
        return None
        
    df = condition_dfs[condition_name].copy()  # Create a copy to avoid modifying original

    # Add skipped answer flagging for condition 2
    if condition_name == 'condition_2':
        df['is_skipped'] = df['Model_Answer'].astype(str).str.contains('跳过')
    
    # Convert Question_ID to numeric, removing any non-numeric characters
    df['Question_ID'] = pd.to_numeric(df['Question_ID'].astype(str).str.extract('(\d+)', expand=False))
    
    def calculate_sata_score_and_correctness(row):
        """
        Calculate score and correctness type for a SATA question
        Scoring rules:
        - Exact match: 2 points
        - Partial match: 0.5 points per correct selection
        - Any incorrect selection: 0 points
        """
        correct_ans = set(str(row['Correct_Answer']))
        model_ans = set(str(row['Model_Answer']))
        
        if any(ans not in correct_ans for ans in model_ans):
            return 0, 'incorrect'
        
        correct_selections = len(model_ans.intersection(correct_ans))
        
        if correct_selections == len(correct_ans) and len(model_ans) == len(correct_ans):
            return 2, 'exact'
        elif correct_selections > 0:
            return 0.5 * correct_selections, 'partial'
        else:
            return 0, 'incorrect'
    
    results = []
    
    for model in df['Model'].unique():
        for exam in df['exam_type'].unique():
            model_exam_mask = (df['Model'] == model) & (df['exam_type'] == exam)
            
            # Calculate skipped questions count for condition 2
            skipped_count = 0
            if condition_name == 'condition_2':
                skipped_count = df[model_exam_mask]['is_skipped'].sum()
            
            # Single select calculations
            single_mask = model_exam_mask & (df['question_type'] == 'single')
            single_df = df[single_mask]
            
            single_total = len(single_df)
            single_exact_correct = single_df['Model_Answer'].eq(single_df['Correct_Answer']).sum()
            single_exact_percentage = (single_exact_correct / single_total * 100) if single_total > 0 else 0
            
            # SATA calculations
            multiple_mask = model_exam_mask & (df['question_type'] == 'multiple')
            multiple_df = df[multiple_mask]
            
            multiple_results = [calculate_sata_score_and_correctness(row) for _, row in multiple_df.iterrows()]
            multiple_scores, multiple_correctness = zip(*multiple_results) if multiple_results else ([], [])
            
            multiple_total = len(multiple_df)
            multiple_exact_correct = multiple_correctness.count('exact')
            multiple_any_correct = multiple_correctness.count('exact') + multiple_correctness.count('partial')
            
            multiple_exact_percentage = (multiple_exact_correct / multiple_total * 100) if multiple_total > 0 else 0
            multiple_any_correct_percentage = (multiple_any_correct / multiple_total * 100) if multiple_total > 0 else 0
            
            # Calculate total scores
            total_single_score = single_exact_correct
            total_multiple_score = sum(multiple_scores)
            total_score = total_single_score + total_multiple_score
            
            # Prepare results dictionary
            result_dict = {
                'exam_type': exam,
                'Model': model,
                'total_score(out of 100)': total_score,
                'correct_percentage': (single_exact_correct + multiple_exact_correct) / 80,
                'single_correct_percentage': single_exact_percentage/100,
                'multiple_exact_correct_percentage': multiple_exact_percentage/100,
                'multiple_include_partial_correct_percentage': multiple_any_correct_percentage/100,
                'single_score(out of 60)': total_single_score,
                'multiple_score(out of 40)': total_multiple_score,
                'single_questions_count': single_total,
                'multiple_questions_count': multiple_total,
                'single_correct_count': single_exact_correct,
                'multiple_exact_correct_count': multiple_exact_correct,
                'total_correct_count': single_exact_correct + multiple_exact_correct
            }
            
            # Add skipped questions count for condition 2
            if condition_name == 'condition_2':
                result_dict['skipped_questions_count'] = skipped_count
            
            # Add confidence scores if available and requested
            if include_confidence and 'Model_Confidence' in df.columns:
                try:
                    result_dict.update({
                        'avg_confidence': df[model_exam_mask]['Model_Confidence'].mean()
                    })
                except Exception as e:
                    print(f"Warning: Error calculating confidence for {model}, {exam}: {str(e)}")
            
            results.append(result_dict)
    
    # Convert to DataFrame and round values
    scores_df = pd.DataFrame(results)
    
    # Round numeric columns
    numeric_columns = scores_df.select_dtypes(include=['float64', 'int64']).columns
    scores_df[numeric_columns] = scores_df[numeric_columns].round(3)
    
    # Sort by total score descending within each exam type
    scores_df = scores_df.sort_values(['exam_type', 'total_score(out of 100)'], ascending=[True, False])
    
    return scores_df


## Statistical Analysis Functions

In [4]:
def calculate_detailed_statistics(condition_dfs):
    stats_data = []
    format_accuracy_data = []
    
    # 1. Stats by exam type and condition
    for condition, df in condition_dfs.items():
        df_scores = calculate_scores(condition, condition_dfs)
        for exam_type in ['policy', 'comprehensive']:
            exam_df = df_scores[df_scores['exam_type'] == exam_type]
            stats_dict = {
                'analysis_type': 'exam_type',
                'condition': condition,
                'exam_type': exam_type,
                'region': 'all',
                'avg_total': exam_df['total_score(out of 100)'].mean(),
                'median_total': exam_df['total_score(out of 100)'].median(),
                'std_total': exam_df['total_score(out of 100)'].std(),
                'avg_single': exam_df['single_score(out of 60)'].mean(),
                'median_single': exam_df['single_score(out of 60)'].median(),
                'avg_multiple': exam_df['multiple_score(out of 40)'].mean(),
                'median_multiple': exam_df['multiple_score(out of 40)'].median(),
                'avg_single_percentage': exam_df['single_correct_percentage'].mean(),
                'avg_multiple_exact_percentage': exam_df['multiple_exact_correct_percentage'].mean(),
            }
            
            # Add skipped questions statistics for condition 2
            if condition == 'condition_2' and 'skipped_questions_count' in exam_df.columns:
                stats_dict.update({
                    'avg_skipped': exam_df['skipped_questions_count'].mean(),
                    'total_skipped': exam_df['skipped_questions_count'].sum()
                })
            
            stats_data.append(stats_dict)

    # 2. Stats by model grouping (Chinese vs Western)
    chinese_models = ['qwen', 'ernie', 'deepseek', 'kimi']
    western_models = ['gpt', 'anthropic', 'mistral', 'gemini']
    
    for condition, df in condition_dfs.items():
        df_scores = calculate_scores(condition, condition_dfs)
        
        for region, models in [('Chinese', chinese_models), ('Western', western_models)]:
            for exam_type in ['policy', 'comprehensive']:
                region_scores = df_scores[
                    (df_scores['Model'].isin(models)) & 
                    (df_scores['exam_type'] == exam_type)
                ]
                stats_dict = {
                    'analysis_type': 'region',
                    'condition': condition,
                    'exam_type': exam_type,
                    'region': region,
                    'avg_total': region_scores['total_score(out of 100)'].mean(),
                    'median_total': region_scores['total_score(out of 100)'].median(),
                    'std_total': region_scores['total_score(out of 100)'].std(),
                    'avg_single': region_scores['single_score(out of 60)'].mean(),
                    'median_single': region_scores['single_score(out of 60)'].median(),
                    'avg_multiple': region_scores['multiple_score(out of 40)'].mean(),
                    'median_multiple': region_scores['multiple_score(out of 40)'].median(),
                    'avg_single_percentage': region_scores['single_correct_percentage'].mean(),
                    'avg_multiple_exact_percentage': region_scores['multiple_exact_correct_percentage'].mean(),
                }
                
                # Add skipped questions statistics for condition 2
                if condition == 'condition_2' and 'skipped_questions_count' in region_scores.columns:
                    stats_dict.update({
                        'avg_skipped': region_scores['skipped_questions_count'].mean(),
                        'total_skipped': region_scores['skipped_questions_count'].sum()
                    })
                
                stats_data.append(stats_dict)

    
    # Convert to DataFrames and round numeric columns
    stats_df = pd.DataFrame(stats_data)
    
    # Round numeric columns
    numeric_cols = stats_df.select_dtypes(include=['float64']).columns
    stats_df[numeric_cols] = stats_df[numeric_cols].round(3)
    
    return stats_df


In [None]:
def transform_scores(df):
    # Create a mapping for region groups
    region_mapping = {
        'gemini': 'Western',
        'gpt': 'Western',
        'mistral': 'Western',
        'anthropic': 'Western',
        'ernie': 'Chinese',
        'qwen': 'Chinese',
        'kimi': 'Chinese',
        'deepseek': 'Chinese'
    }
    
    # Select relevant columns and rename them for melting
    df_subset = df[['exam_type', 'Model', 
                    'total_score(out of 100)',
                    'single_correct_percentage', 
                    'multiple_exact_correct_percentage',
                    'single_score(out of 60)', 
                    'multiple_score(out of 40)']]
    
    # Melt all metrics into a single column
    df_long = pd.melt(
        df_subset,
        id_vars=['exam_type', 'Model', 'total_score(out of 100)'],
        value_vars=[
            'single_correct_percentage',
            'multiple_exact_correct_percentage',
            'single_score(out of 60)',
            'multiple_score(out of 40)'
        ],
        var_name='metric_type',
        value_name='score/percentage'
    )
    
    # Add region group
    df_long['region_group'] = df_long['Model'].map(region_mapping)
    
    # Clean up metric_type names
    metric_mapping = {
        'single_correct_percentage': 'single_percentage',
        'multiple_exact_correct_percentage': 'multiple_percentage',
        'single_score(out of 60)': 'single_score',
        'multiple_score(out of 40)': 'multiple_score'
    }
    df_long['metric_type'] = df_long['metric_type'].map(metric_mapping)
    
    # Reorder columns
    df_long = df_long[['exam_type', 'Model', 'region_group', 'metric_type', 'score/percentage', 'total_score(out of 100)']]
    
    return df_long




In [None]:
def combine_scores(scores_dict):
    """
    Combine condition_1 and condition_2 into one DataFrame
    """
    # Add condition column to each DataFrame
    dfs = []
    for condition in ['condition_1', 'condition_2']:
        if condition in scores_dict:
            df_copy = scores_dict[condition].copy()
            df_copy['condition'] = condition
            dfs.append(df_copy)
    
    # Concatenate all DataFrames
    combined_c1c2 = pd.concat(dfs, ignore_index=True)
    
    return combined_c1c2


In [None]:
def create_condition_2_table(df):
    # Filter for Condition 2 data
    condition_2_df = df[df['condition'] == 'condition_2'].copy()
    
    # Calculate total questions skipped
    condition_2_df['Total questions skipped'] = condition_2_df['skipped_questions_count']
    
    # Calculate total percent correct with skip
    condition_2_df['Total percent correct-w/Skip'] = condition_2_df['total_correct_count'] / (80- condition_2_df['skipped_questions_count']) 
    
    # Calculate total percent correct without skip
    # This assumes skipped questions are counted as incorrect
    condition_2_df['Total percent correct w/o Skip'] = condition_2_df['total_correct_count'] / 80
    
    # Calculate ranks for condition 1 and 2 using min method
    condition_1_df = df[df['condition'] == "condition_1"].copy()
    condition_1_df['rank'] = condition_1_df['total_score(out of 100)'].rank(method='min', ascending=False)
    condition_2_df['rank'] = condition_2_df['Total percent correct w/o Skip'].rank(method='min', ascending=False)
    
    # Create rank change mapping
    rank_changes = {}
    for model in df['Model'].unique():
        rank_1 = condition_1_df[condition_1_df['Model'] == model]['rank'].iloc[0]
        rank_2 = condition_2_df[condition_2_df['Model'] == model]['rank'].iloc[0]
        rank_changes[model] = rank_1 - rank_2
    
    condition_2_df['Change in Rank'] = condition_2_df['Model'].map(rank_changes)
    
    # Select and order columns for final display
    result = condition_2_df[['Model', 
                            'Total questions skipped',
                            'Total percent correct-w/Skip',
                            'Total percent correct w/o Skip',
                            'Change in Rank']]
    
    # Sort by Total percent correct-w/Skip
    result = result.sort_values('Total percent correct w/o Skip', ascending=False)
    
    # Add rank column using dense ranking
    result.insert(0, 'Condition 2 Rank', condition_2_df['rank'])
    
    # Round numeric columns
    result = result.round(3)
    
    return result


## Data Export Function

In [None]:
def save_combined_csv(scores_dict):
    """
    Combine all conditions into one DataFrame and save to a single CSV
    """
    # Add condition column to each DataFrame
    dfs = []
    for condition, df in scores_dict.items():
        df_copy = df.copy()
        df_copy['condition'] = condition
        dfs.append(df_copy)
    
    # Concatenate all DataFrames
    combined_df = pd.concat(dfs, ignore_index=True)
    
    # Save to CSV
    combined_df.to_csv('Frontier_all_scores.csv', index=False)

## Main Execution

In [None]:
if __name__ == "__main__":
    # Load and process data
    condition_dfs = load_and_process_files()
    
    # Calculate scores for each condition
    scores = {
        'condition_1': calculate_scores('condition_1', condition_dfs),
        'condition_2': calculate_scores('condition_2', condition_dfs),
        'condition_3': calculate_scores('condition_3', condition_dfs, include_confidence=False)
    }
    
    # Generate statistics and analysis
    stats_df = calculate_detailed_statistics(condition_dfs)
    
    # Save results
    save_combined_csv(scores)