In [1]:
import numpy as np
import pandas as pd
from SimulationConfig import SimulationConfig
from simulation import Simulation
import itertools
from typing import Dict, List, Optional

In [2]:

class FlexibleSensitivityAnalyzer:
    def __init__(self):
        self.base_config = SimulationConfig()
        self.results = []
        
    def define_parameter_ranges(self) -> Dict[str, Dict[str, float]]:
        """Define all available parameter variations to test"""
        return {
            'prevalence': {
                'low': 26,      # per 100,000
                'base': 53,
                'high': 95
            },
            'treatment_access': {
                'low': 0.25,
                'base': 0.43,
                'high': 0.60
            },
            'chronic_fraction': {
                'low': 0.15,
                'base': 0.20,
                'high': 0.25
            },
            'instensity_scale_factor': {
                'low': 0.8,
                'base': 0.9,
                'high': 1.0
            }
        }
    
    def create_config_variant(self, **kwargs) -> SimulationConfig:
        """Create a configuration with modified parameters"""
        config = SimulationConfig()
        
        # Apply parameter changes
        if 'prevalence' in kwargs:
            config.annual_prevalence_per_100k = kwargs['prevalence']
        if 'treatment_access' in kwargs:
            config.prop_treated = kwargs['treatment_access']
            config.prop_untreated = 1 - kwargs['treatment_access']
        if 'chronic_fraction' in kwargs:
            config.prop_chronic = kwargs['chronic_fraction']
            config.prop_episodic = 1 - kwargs['chronic_fraction']
        if 'instensity_scale_factor' in kwargs:
            import stats_utils
            stats_utils.INTENSITY_SCALE_FACTOR = kwargs['instensity_scale_factor']
        
        # Use smaller simulation size for speed
        config.percent_of_patients_to_simulate = 0.1
        return config
    
    def calculate_dles(self, simulation: Simulation) -> Dict[str, float]:
        """Calculate DLES and other key metrics from simulation results"""
        results = simulation.get_results()
        
        # Calculate total person-years at ≥9/10 intensity (index 90+)
        total_extreme_pain = sum(
            sum(group_data[90:]) for group_data in results['global_person_years'].values()
        )
        
        # Convert to days (DLES)
        dles = total_extreme_pain * 365
        
        # Also calculate ≥7/10 intensity (YLSS equivalent)
        total_severe_pain = sum(
            sum(group_data[70:]) for group_data in results['global_person_years'].values()
        )
        ylss = total_severe_pain * 365
        
        # Total person-years in any pain
        total_pain = sum(
            sum(group_data) for group_data in results['global_person_years'].values()
        )
        
        return {
            'dles': dles,
            'ylss': ylss,
            'total_person_years': total_pain,
            'total_ch_sufferers': simulation.get_total_ch_sufferers()
        }
    
    def run_flexible_sensitivity_analysis(self, vary_parameters: List[str]) -> pd.DataFrame:
        """
        Run sensitivity analysis for specified parameters only
        
        Args:
            vary_parameters: List of parameter names to vary (e.g., ['prevalence', 'treatment_access'])
        """
        param_ranges = self.define_parameter_ranges()
        
        # Filter to only the parameters we want to vary
        selected_ranges = {param: param_ranges[param] for param in vary_parameters if param in param_ranges}
        
        if not selected_ranges:
            raise ValueError(f"No valid parameters selected. Available: {list(param_ranges.keys())}")
        
        # Get all combinations of the selected parameters
        param_names = list(selected_ranges.keys())
        param_combinations = list(itertools.product(*[selected_ranges[param].items() for param in param_names]))
        
        print(f"Running {len(param_combinations)} scenarios for parameters: {vary_parameters}")
        print(f"Total combinations: {' × '.join([f'{len(selected_ranges[p])} {p}' for p in param_names])}")
        
        base_case_dles = None
        
        for i, combination in enumerate(param_combinations):
            # Build parameter dictionary for this scenario
            scenario_params = {}
            scenario_labels = {}
            scenario_values = {}
            
            for j, (param_name, (label, value)) in enumerate(zip(param_names, combination)):
                scenario_params[param_name] = value
                scenario_labels[f'{param_name}_label'] = label
                scenario_values[f'{param_name}_value'] = value
            
            # Create description
            desc_parts = [f"{label} {param}" for param, (label, _) in zip(param_names, combination)]
            description = ", ".join(desc_parts)
            print(f"Scenario {i+1}/{len(param_combinations)}: {description}")
            
            # Create configuration for this scenario
            config_params = {k: v for k, v in scenario_params.items()}
            config = self.create_config_variant(**config_params)
            
            # Run simulation
            simulation = Simulation(config)
            
            simulation.run()
            
            # Calculate metrics
            metrics = self.calculate_dles(simulation)
            
            # Store results
            result = {
                'scenario': i + 1,
                'dles': metrics['dles'],
                'ylss': metrics['ylss'],
                'total_person_years': metrics['total_person_years'],
                'total_ch_sufferers': metrics['total_ch_sufferers'],
            }
            
            # Add parameter info
            result.update(scenario_labels)
            result.update(scenario_values)
            
            # Check if this is base case (all parameters at 'base')
            result['is_base_case'] = all(label == 'base' for label, _ in combination)
            
            if result['is_base_case']:
                base_case_dles = metrics['dles']
            
            self.results.append(result)
        
        # Convert to DataFrame and calculate percentage changes
        df = pd.DataFrame(self.results)
        
        if base_case_dles:
            df['dles_pct_change'] = ((df['dles'] - base_case_dles) / base_case_dles) * 100
        
        return df
    
    def format_parameter_labels(self, df: pd.DataFrame, vary_parameters: List[str]) -> pd.DataFrame:
        """Format parameter labels with values and nice names"""
        param_ranges = self.define_parameter_ranges()
        
        df_formatted = df.copy()
        
        # Format each parameter that was varied
        for param in vary_parameters:
            if f'{param}_label' in df.columns:
                if param == 'prevalence':
                    df_formatted[f'{param}_formatted'] = df_formatted.apply(
                        lambda row: f"{int(row[f'{param}_value'])} ({row[f'{param}_label']})", axis=1
                    )
                elif param in ['treatment_access', 'chronic_fraction', 'instensity_scale_factor']:
                    df_formatted[f'{param}_formatted'] = df_formatted.apply(
                        lambda row: f"{int(row[f'{param}_value'] * 100)}% ({row[f'{param}_label']})", axis=1
                    )
        
        return df_formatted
    
    def create_flexible_detailed_table(self, df: pd.DataFrame, vary_parameters: List[str]) -> pd.DataFrame:
        """Create detailed results table for flexible parameter sets"""
        
        # Format the labels
        df_formatted = self.format_parameter_labels(df, vary_parameters)
        
        # Create sorting categories for each varied parameter
        order = ['low', 'base', 'high']
        for param in vary_parameters:
            if f'{param}_label' in df.columns:
                df_formatted[f'{param}_cat'] = pd.Categorical(
                    df_formatted[f'{param}_label'], categories=order, ordered=True
                )
        
        # Sort by all categorical columns
        sort_columns = [f'{param}_cat' for param in vary_parameters if f'{param}_cat' in df_formatted.columns]
        df_sorted = df_formatted.sort_values(sort_columns)
        
        # Create the final table
        table_data = {}
        
        # Add columns for each varied parameter
        param_name_map = {
            'prevalence': 'Prevalence',
            'treatment_access': 'Treatment access',
            'chronic_fraction': 'Chronic %',
            'instensity_scale_factor': 'Intensity scale'
        }
        
        for param in vary_parameters:
            if f'{param}_formatted' in df_sorted.columns:
                table_data[param_name_map.get(param, param.title())] = df_sorted[f'{param}_formatted']
        
        # Add result columns
        table_data['DLES'] = df_sorted['dles'].apply(lambda x: f"{int(x):,}")
        table_data['DLES (% change)'] = df_sorted['dles_pct_change'].apply(lambda x: f"{int(x):+d}%")
        
        detailed_table = pd.DataFrame(table_data)
        detailed_table.reset_index(drop=True, inplace=True)
        
        return detailed_table
    
    def create_summary_stats(self, df: pd.DataFrame) -> Dict:
        """Create summary statistics"""
        base_case = df[df['is_base_case']].iloc[0]
        
        return {
            'total_scenarios': len(df),
            'min_dles': df['dles'].min(),
            'max_dles': df['dles'].max(),
            'base_dles': base_case['dles'],
            'min_pct_change': df['dles_pct_change'].min(),
            'max_pct_change': df['dles_pct_change'].max(),
            'q25_dles': df['dles'].quantile(0.25),
            'q75_dles': df['dles'].quantile(0.75),
            'coefficient_variation': (df['dles'].std() / df['dles'].mean()) * 100
        }

def main_flexible(vary_parameters: Optional[List[str]] = None):
    """
    Run flexible sensitivity analysis
    
    Args:
        vary_parameters: List of parameters to vary. Options:
                        ['prevalence', 'treatment_access', 'chronic_fraction', 'instensity_scale_factor']
                        If None, defaults to the original three parameters
    """
    if vary_parameters is None:
        vary_parameters = ['prevalence', 'treatment_access', 'chronic_fraction']
    
    analyzer = FlexibleSensitivityAnalyzer()
    
    # Run analysis
    results_df = analyzer.run_flexible_sensitivity_analysis(vary_parameters)
    
    # Calculate summary statistics
    summary_stats = analyzer.create_summary_stats(results_df)
    
    # Create detailed table
    detailed_table = analyzer.create_flexible_detailed_table(results_df, vary_parameters)
    
    # Print results
    print("\n" + "="*80)
    print(f"FLEXIBLE SENSITIVITY ANALYSIS RESULTS")
    print(f"Varied parameters: {', '.join(vary_parameters)}")
    print("="*80)
    
    print(f"\nScenarios tested: {summary_stats['total_scenarios']}")
    print(f"Base case DLES: {summary_stats['base_dles']:,.0f} days")
    print(f"Range: {summary_stats['min_dles']:,.0f} - {summary_stats['max_dles']:,.0f} days")
    print(f"Percentage change from base: {summary_stats['min_pct_change']:.1f}% to +{summary_stats['max_pct_change']:.1f}%")
    
    print("\n" + "="*80)
    print("DETAILED RESULTS")
    print("="*80)
    print(detailed_table.to_string(index=False))
    
    # Save to CSV
    filename = f"sensitivity_{'_'.join(vary_parameters)}.csv"
    detailed_table.to_csv("./csv/" + filename, index=False)
    print(f"\nDetailed table saved to: {filename}")

In [3]:
# Choose from 'prevalence', 'treatment_access', 'chronic_fraction', 'instensity_scale_factor'
params = ['prevalence', 'treatment_access', 'chronic_fraction', 'instensity_scale_factor']
# params = ['instensity_scale_factor']
main_flexible(params)

Running 81 scenarios for parameters: ['prevalence', 'treatment_access', 'chronic_fraction', 'instensity_scale_factor']
Total combinations: 3 prevalence × 3 treatment_access × 3 chronic_fraction × 3 instensity_scale_factor
Scenario 1/81: low prevalence, low treatment_access, low chronic_fraction, low instensity_scale_factor
Scenario 2/81: low prevalence, low treatment_access, low chronic_fraction, base instensity_scale_factor
Scenario 3/81: low prevalence, low treatment_access, low chronic_fraction, high instensity_scale_factor
Scenario 4/81: low prevalence, low treatment_access, base chronic_fraction, low instensity_scale_factor
Scenario 5/81: low prevalence, low treatment_access, base chronic_fraction, base instensity_scale_factor
Scenario 6/81: low prevalence, low treatment_access, base chronic_fraction, high instensity_scale_factor
Scenario 7/81: low prevalence, low treatment_access, high chronic_fraction, low instensity_scale_factor
Scenario 8/81: low prevalence, low treatment_acce