In [None]:
import pandas as pd
import numpy as np
import json
import os
from pathlib import Path
from scipy import stats
from collections import defaultdict
import warnings
import re
warnings.filterwarnings('ignore')


main_dir = "./NanoDesigner/NanoDesigner_assessment_experiment_2"

In [None]:
def mean_confidence_interval(data, confidence=0.95):
    """Calculate confidence interval for the mean"""
    if len(data) == 0 or np.all(np.isnan(data)):
        return np.nan, np.nan
    
    clean_data = data[~np.isnan(data)]
    if len(clean_data) == 0:
        return np.nan, np.nan
    
    n = len(clean_data)
    mean = np.mean(clean_data)
    std_err = stats.sem(clean_data)
    margin_error = std_err * stats.t.ppf((1 + confidence) / 2., n-1)
    
    return mean, margin_error

def success_rate_confidence_interval(successes, total, confidence=0.95):
    """Calculate confidence interval for success rate (binomial proportion)"""
    if total == 0:
        return 0, np.nan
    
    p = successes / total
    z = stats.norm.ppf((1 + confidence) / 2.)
    margin = z * np.sqrt(p * (1 - p) / total)
    
    return p * 100, margin * 100  # Convert to percentage

def load_json_lines(file_path):
    """Load JSON lines file"""
    data = []
    try:
        with open(file_path, 'r') as f:
            for line in f:
                if line.strip():
                    data.append(json.loads(line.strip()))
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
    return data

def discover_pdb_directories(main_dir, experiment_subdir):
    """Discover all PDB directories (4 character alphanumeric IDs) in the experiment directory"""
    experiment_path = Path(main_dir) / experiment_subdir
    pdb_dirs = []
    
    if experiment_path.exists():
        for item in experiment_path.iterdir():
            if item.is_dir():
                # Check if directory name matches 4 character alphanumeric pattern
                if re.match(r'^[a-zA-Z0-9]{4}$', item.name):
                    pdb_dirs.append(item.name)
    
    return sorted(pdb_dirs)

def process_pooled_iteration_data(all_experiments_data, iteration_num):
    """
    Process data for a specific iteration by pooling all designs across all PDPs
    """
    results = {}
    
    for exp_key, exp_data in all_experiments_data.items():
        if iteration_num not in exp_data:
            print(f"Warning: Iteration {iteration_num} not found for {exp_key}")
            continue
            
        iteration_data = exp_data[iteration_num]
        
        # Pool all candidates from all PDPs
        pooled_metrics = {
            'AAR H3': [],
            'RMSD(CA) aligned': [],
            'RMSD(CA) CDRH3 aligned': [],
            'TMscore': [],
            'LDDT': [],
            'FoldX_dG': [],
            'FoldX_ddG': [],
            'final_num_clashes': [],
            'DockQ': []
        }
        
        total_valid_ddg = 0
        total_negative_ddg = 0
        
        # Pool all individual candidate data
        for pdb_candidates in iteration_data['all_candidates']:
            for candidate in pdb_candidates:
                # Extract metrics from individual candidates
                for metric_key, metric_values in pooled_metrics.items():
                    if metric_key in candidate and not pd.isna(candidate[metric_key]):
                        metric_values.append(candidate[metric_key])
                
                # Count for success rate
                if 'FoldX_ddG' in candidate and not pd.isna(candidate['FoldX_ddG']):
                    total_valid_ddg += 1
                    if candidate['FoldX_ddG'] < 0:
                        total_negative_ddg += 1
        
        # Calculate statistics for pooled data
        exp_results = {}
        
        for metric, values in pooled_metrics.items():
            if values:
                mean, ci = mean_confidence_interval(np.array(values))
                exp_results[f'{metric}_mean'] = mean
                exp_results[f'{metric}_ci'] = ci
                exp_results[f'{metric}_n'] = len(values)
            else:
                exp_results[f'{metric}_mean'] = np.nan
                exp_results[f'{metric}_ci'] = np.nan
                exp_results[f'{metric}_n'] = 0
        
        # Success rate
        if total_valid_ddg > 0:
            success_rate, success_ci = success_rate_confidence_interval(total_negative_ddg, total_valid_ddg)
            exp_results['Success_Rate_mean'] = success_rate
            exp_results['Success_Rate_ci'] = success_ci
            exp_results['Success_Rate_n'] = total_valid_ddg
        else:
            exp_results['Success_Rate_mean'] = np.nan
            exp_results['Success_Rate_ci'] = np.nan
            exp_results['Success_Rate_n'] = 0
        
        results[exp_key] = exp_results
    
    return results

def load_all_experiment_data(main_dir, experiment_configs):
    """Load data from all experiments and organize by iteration"""
    
    all_experiments_data = {}
    
    for exp_name, design_type, subdir in experiment_configs:
        print(f"Loading {exp_name} - {design_type}...")
        
        # Discover PDB directories automatically
        pdbs = discover_pdb_directories(main_dir, subdir)
        print(f"Found PDBs: {pdbs}")
        
        exp_key = f"{exp_name}_{design_type}"
        exp_data = defaultdict(lambda: {'all_candidates': []})
        
        for pdb in pdbs:
            pdb_folder = Path(main_dir) / subdir / pdb
            if not pdb_folder.exists():
                continue
            
            # Process iterations 1-10
            for iteration in range(1, 11):
                json_file = pdb_folder / f"best_candidates_iter_{iteration}.json"
                
                if json_file.exists():
                    iteration_data = load_json_lines(json_file)
                    
                    if iteration_data:
                        # Take top 15 entries (or all if less than 15)
                        top_entries = iteration_data[:15]
                        
                        # Store all candidates for pooled analysis
                        exp_data[iteration]['all_candidates'].append(top_entries)
        
        all_experiments_data[exp_key] = exp_data
    
    return all_experiments_data

def format_metric_value(mean, ci, metric_key, is_best=False, is_second=False):
    """Format metric value with confidence interval and highlighting"""
    if pd.isna(mean) or pd.isna(ci):
        return "N/A"
    
    # Format based on metric type
    if metric_key == 'Success_Rate':
        formatted = f"{mean:.1f} ± {ci:.1f}"
    elif 'RMSD' in metric_key or 'FoldX' in metric_key:
        formatted = f"{mean:.2f} ± {ci:.2f}"
    else:
        formatted = f"{mean:.3f} ± {ci:.3f}"
    
    # Add highlighting
    if is_best:
        return f"\\textbf{{{formatted}}}"
    elif is_second:
        return f"\\underline{{{formatted}}}"
    else:
        return formatted

def determine_best_values_for_scenario(scenario_results, metric_key, higher_better):
    """Determine best and second best values for a specific scenario"""
    values = {}
    
    # Extract mean values for comparison
    for method in ['DiffAb', 'ADesigner']:
        if method in scenario_results:
            mean_val = scenario_results[method].get(f'{metric_key}_mean', np.nan)
            if not pd.isna(mean_val):
                values[method] = mean_val
    
    if len(values) < 2:
        return {}, {}
    
    # Sort based on whether higher is better
    sorted_methods = sorted(values.items(), key=lambda x: x[1], reverse=higher_better)
    
    best_tools = {sorted_methods[0][0]: True}
    second_tools = {sorted_methods[1][0]: True}
    
    return best_tools, second_tools

def generate_latex_table_for_scenario(baseline_results, ours_results, scenario_name, scenario_type):
    """Generate LaTeX table for a specific scenario comparing baseline vs ours"""
    
    # Define metrics with their display names and whether higher is better
    metrics_info = {
        'AAR H3': ('AAR H3', True),
        'RMSD(CA) aligned': ('RMSD', False),
        'RMSD(CA) CDRH3 aligned': ('RMSD CDRH3', False),
        'TMscore': ('TMscore', True),
        'LDDT': ('LDDT', True),
        'FoldX_dG': ('$\\Delta G$', False),
        'FoldX_ddG': ('$\\Delta \\Delta G$', False),
        'final_num_clashes': ('Clashes', False),
        'DockQ': ('DockQ', True),
        'Success_Rate': ('Success Rate \\%', True)
    }
    
    # Create caption and label
    caption_text = f"Performance metrics for NanoDesigner with {scenario_name} {scenario_type} scenario"
    label_text = f"nanodesigner_{scenario_name.lower()}_{scenario_type.lower()}_metrics"
    
    latex_lines = []
    
    # Table header
    latex_lines.extend([
        "\\begin{table}[h]",
        "\\centering",
        f"\\caption{{{caption_text}}}",
        f"\\label{{tab:{label_text}}}",
        "\\begin{tabular}{l c c}",
        "\\toprule",
        "\\textbf{Metric} & \\textbf{Baseline} & \\textbf{Ours} \\\\",
        "\\midrule"
    ])
    
    # Generate rows for each metric
    for metric_key, (display_name, higher_better) in metrics_info.items():
        # Get baseline and ours values
        baseline_mean = baseline_results.get(f'{metric_key}_mean', np.nan)
        baseline_ci = baseline_results.get(f'{metric_key}_ci', np.nan)
        ours_mean = ours_results.get(f'{metric_key}_mean', np.nan)
        ours_ci = ours_results.get(f'{metric_key}_ci', np.nan)
        
        # Determine which is better
        is_baseline_better = False
        is_ours_better = False
        
        if not pd.isna(baseline_mean) and not pd.isna(ours_mean):
            if higher_better:
                is_ours_better = ours_mean > baseline_mean
                is_baseline_better = baseline_mean > ours_mean
            else:
                is_ours_better = ours_mean < baseline_mean
                is_baseline_better = baseline_mean < ours_mean
        
        # Format values
        baseline_formatted = format_metric_value(
            baseline_mean, baseline_ci, metric_key, is_best=is_baseline_better
        )
        ours_formatted = format_metric_value(
            ours_mean, ours_ci, metric_key, is_best=is_ours_better
        )
        
        # Add direction indicator
        direction = "$\\uparrow$" if higher_better else "$\\downarrow$"
        display_name_with_direction = f"{display_name} {direction}"
        
        latex_lines.append(f"{display_name_with_direction} & {baseline_formatted} & {ours_formatted} \\\\")
    
    # Table footer
    latex_lines.extend([
        "\\bottomrule",
        "\\end{tabular}",
        "\\end{table}",
        ""
    ])
    
    return "\n".join(latex_lines)

def main():
    """Main function"""
    
    # Configuration
    # main_dir = ".rioszemm/NanoDesigner"
    
    experiment_configs = [
        ('ADesigner', 'Denovo', 'NanoDesigner_assessment_experiment_2/NanoDesigner_Adesigner_denovo'),
        ('ADesigner', 'Optimization', 'NanoDesigner_assessment_experiment_2/NanoDesigner_Adesigner_optimization'),
        ('DiffAb', 'Denovo', 'NanoDesigner_assessment_experiment_2/NanoDesigner_DiffAb_denovo'),
        ('DiffAb', 'Optimization', 'NanoDesigner_assessment_experiment_2/NanoDesigner_DiffAb_optimization'),
    ]
    
    # Load all data
    all_experiments_data = load_all_experiment_data(main_dir, experiment_configs)
    
    # Get first iteration results (baseline)
    baseline_results = process_pooled_iteration_data(all_experiments_data, 1)
    
    # Get last iteration results (ours)
    last_iter_results = {}
    last_iter_numbers = {}
    
    print("\nFinding last iterations for each experiment:")
    for exp_key, exp_data in all_experiments_data.items():
        available_iterations = [k for k in exp_data.keys() if isinstance(k, int)]
        if available_iterations:
            last_iteration = min(max(available_iterations), 10)  # Prefer 10th, but use highest available
            last_iter_numbers[exp_key] = last_iteration
            print(f"{exp_key}: Last iteration = {last_iteration}")
            
            last_results = process_pooled_iteration_data({exp_key: exp_data}, last_iteration)
            last_iter_results.update(last_results)
        else:
            last_iter_numbers[exp_key] = 0
            print(f"{exp_key}: No data found")
    
    # Generate LaTeX tables for each scenario
    scenarios = [
        ('DiffAb_Optimization', 'DiffAb', 'Optimization'),
        ('DiffAb_Denovo', 'DiffAb', 'Denovo'),
        ('ADesigner_Optimization', 'ADesigner', 'Optimization'),
        ('ADesigner_Denovo', 'ADesigner', 'Denovo')
    ]
    
    print("\n" + "="*100)
    print("LATEX TABLES")
    print("="*100)
    
    for exp_key, scenario_name, scenario_type in scenarios:
        if exp_key in baseline_results and exp_key in last_iter_results:
            table_latex = generate_latex_table_for_scenario(
                baseline_results[exp_key],
                last_iter_results[exp_key],
                scenario_name,
                scenario_type
            )
            print(f"\n% Table for {scenario_name} {scenario_type}")
            print(table_latex)
        else:
            print(f"\n% No data available for {scenario_name} {scenario_type}")
    
    # Save detailed results to CSV
    rows = []
    for exp_key, data in baseline_results.items():
        row = data.copy()
        row['experiment_key'] = exp_key
        row['iteration_type'] = 'baseline'
        rows.append(row)
    
    for exp_key, data in last_iter_results.items():
        row = data.copy()
        row['experiment_key'] = exp_key
        row['iteration_type'] = 'ours'
        rows.append(row)
    
    df = pd.DataFrame(rows)
    df.to_csv('nanodesigner_baseline_vs_ours_comparison.csv', index=False)
    print(f"\nDetailed results saved to: nanodesigner_baseline_vs_ours_comparison.csv")

if __name__ == "__main__":
    main()