# Bagheri Lab Tutorial | Parameter Estimation
### Augest 05 2025
Objectives:

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize, differential_evolution
from scipy.stats import norm, uniform
import pandas as pd
from collections import defaultdict
import random
import warnings
warnings.filterwarnings('ignore')

In [None]:
! git clone https://github.com/BIOL359A-FoundationsOfQBio-Spr25/week10_parameterEstimation.git
! cp -r week10_parameterEstimation/* .
! ls

In [None]:
from sir.sir import SIR_ABM

class ParameterEstimator:
    """Simple parameter estimation methods for SIR ABM"""
    
    def __init__(self, observed_data, lattice_size=40):
        """
        observed_data: dict with 'time', 'S', 'I', 'R' arrays
        """
        self.observed_data = observed_data
        self.lattice_size = lattice_size
        self.results = {}
        
    def simulate_sir(self, params):
        """Run SIR simulation with given parameters"""
        Pm, PI, PR = params
        
        # Bounds check
        if any(p < 0 or p > 2 for p in params):
            return None
            
        try:
            model = SIR_ABM(
                lattice_size=self.lattice_size,
                initial_infected_fraction=0.01,
                initial_susceptible_fraction=0.49,
                Pm=Pm, PI=PI, PR=PR
            )
            
            max_time = max(self.observed_data['time'])
            model.run(max_time=max_time, record_interval=0.1)
            
            return model.history
        except:
            return None
    
    def calculate_mse(self, params):
        """Calculate Mean Squared Error between simulation and observed data"""
        sim_data = self.simulate_sir(params)
        if sim_data is None:
            return 1e6
            
        # Interpolate simulation data to match observed time points
        obs_times = np.array(self.observed_data['time'])
        sim_times = np.array(sim_data['time'])
        
        if len(sim_times) == 0:
            return 1e6
            
        # Simple linear interpolation for I (infected fraction)
        sim_I = np.interp(obs_times, sim_times, sim_data['I'])
        obs_I = np.array(self.observed_data['I'])
        
        mse = np.mean((sim_I - obs_I)**2)
        return mse
    
    def least_squares(self, initial_guess=None):
        """Least squares parameter estimation"""
        print("Running Least Squares estimation...")
        
        if initial_guess is None:
            initial_guess = [1.0, 0.05, 0.005]
            
        bounds = [(0.1, 2.0), (0.01, 0.2), (0.001, 0.05)]
        
        result = minimize(
            self.calculate_mse,
            initial_guess,
            method='L-BFGS-B',
            bounds=bounds
        )
        
        self.results['least_squares'] = {
            'params': result.x,
            'mse': result.fun,
            'success': result.success
        }
        
        print(f"LS Result: Pm={result.x[0]:.3f}, PI={result.x[1]:.4f}, PR={result.x[2]:.4f}, MSE={result.fun:.6f}")
        return result.x, result.fun
    
    def mle_estimation(self, initial_guess=None):
        """Maximum Likelihood Estimation (assuming Gaussian errors)"""
        print("Running MLE estimation...")
        
        def neg_log_likelihood(params):
            sim_data = self.simulate_sir(params)
            if sim_data is None:
                return 1e6
                
            obs_times = np.array(self.observed_data['time'])
            sim_times = np.array(sim_data['time'])
            
            if len(sim_times) == 0:
                return 1e6
                
            sim_I = np.interp(obs_times, sim_times, sim_data['I'])
            obs_I = np.array(self.observed_data['I'])
            
            # Assume constant variance (could be estimated)
            sigma = 0.05
            nll = -np.sum(norm.logpdf(obs_I, sim_I, sigma))
            return nll
        
        if initial_guess is None:
            initial_guess = [1.0, 0.05, 0.005]
            
        bounds = [(0.1, 2.0), (0.01, 0.2), (0.001, 0.05)]
        
        result = minimize(
            neg_log_likelihood,
            initial_guess,
            method='L-BFGS-B',
            bounds=bounds
        )
        
        mse = self.calculate_mse(result.x)
        
        self.results['mle'] = {
            'params': result.x,
            'nll': result.fun,
            'mse': mse,
            'success': result.success
        }
        
        print(f"MLE Result: Pm={result.x[0]:.3f}, PI={result.x[1]:.4f}, PR={result.x[2]:.4f}, MSE={mse:.6f}")
        return result.x, result.fun
    
    def bayesian_mcmc(self, n_samples=1000, burn_in=200):
        """Simple Bayesian estimation using Metropolis-Hastings"""
        print(f"Running Bayesian MCMC ({n_samples} samples)...")
        
        # Prior distributions (uniform)
        def log_prior(params):
            Pm, PI, PR = params
            if 0.1 <= Pm <= 2.0 and 0.01 <= PI <= 0.2 and 0.001 <= PR <= 0.05:
                return 0.0  # log(1) for uniform prior
            return -np.inf
        
        def log_likelihood(params):
            sim_data = self.simulate_sir(params)
            if sim_data is None:
                return -np.inf
                
            obs_times = np.array(self.observed_data['time'])
            sim_times = np.array(sim_data['time'])
            
            if len(sim_times) == 0:
                return -np.inf
                
            sim_I = np.interp(obs_times, sim_times, sim_data['I'])
            obs_I = np.array(self.observed_data['I'])
            
            sigma = 0.05
            return np.sum(norm.logpdf(obs_I, sim_I, sigma))
        
        # Initialize chain
        current_params = np.array([1.0, 0.05, 0.005])
        current_ll = log_likelihood(current_params)
        
        samples = []
        accepted = 0
        
        # Proposal covariance
        prop_cov = np.diag([0.1, 0.01, 0.001])**2
        
        for i in range(n_samples + burn_in):
            # Propose new parameters
            proposal = np.random.multivariate_normal(current_params, prop_cov)
            
            # Calculate acceptance probability
            prior_ratio = log_prior(proposal) - log_prior(current_params)
            if prior_ratio == -np.inf:
                continue
                
            prop_ll = log_likelihood(proposal)
            ll_ratio = prop_ll - current_ll
            
            alpha = min(0, prior_ratio + ll_ratio)
            
            if np.log(np.random.random()) < alpha:
                current_params = proposal
                current_ll = prop_ll
                accepted += 1
            
            if i >= burn_in:
                samples.append(current_params.copy())
            
            if (i + 1) % 200 == 0:
                acc_rate = accepted / (i + 1)
                print(f"  Iteration {i+1}, acceptance rate: {acc_rate:.3f}")
        
        samples = np.array(samples)
        
        # Calculate posterior means
        posterior_means = np.mean(samples, axis=0)
        posterior_stds = np.std(samples, axis=0)
        mse = self.calculate_mse(posterior_means)
        
        self.results['bayesian'] = {
            'params': posterior_means,
            'std': posterior_stds,
            'samples': samples,
            'mse': mse,
            'acceptance_rate': accepted / (n_samples + burn_in)
        }
        
        print(f"Bayesian Result: Pm={posterior_means[0]:.3f}±{posterior_stds[0]:.3f}, "
              f"PI={posterior_means[1]:.4f}±{posterior_stds[1]:.4f}, "
              f"PR={posterior_means[2]:.4f}±{posterior_stds[2]:.4f}, MSE={mse:.6f}")
        
        return posterior_means, samples
    
    def abc_estimation(self, n_samples=500, tolerance=0.01):
        """Approximate Bayesian Computation"""
        print(f"Running ABC estimation ({n_samples} samples, tolerance={tolerance})...")
        
        accepted_params = []
        n_attempts = 0
        max_attempts = n_samples * 50  # Prevent infinite loops
        
        while len(accepted_params) < n_samples and n_attempts < max_attempts:
            n_attempts += 1
            
            # Sample from prior
            Pm = np.random.uniform(0.1, 2.0)
            PI = np.random.uniform(0.01, 0.2)
            PR = np.random.uniform(0.001, 0.05)
            params = [Pm, PI, PR]
            
            # Calculate distance
            mse = self.calculate_mse(params)
            
            if mse < tolerance:
                accepted_params.append(params)
            
            if (n_attempts % 1000) == 0:
                print(f"  Attempted {n_attempts}, accepted {len(accepted_params)}")
        
        if len(accepted_params) == 0:
            print("No parameters accepted! Try increasing tolerance.")
            return None, None
        
        accepted_params = np.array(accepted_params)
        posterior_means = np.mean(accepted_params, axis=0)
        posterior_stds = np.std(accepted_params, axis=0)
        mse = self.calculate_mse(posterior_means)
        
        self.results['abc'] = {
            'params': posterior_means,
            'std': posterior_stds,
            'samples': accepted_params,
            'mse': mse,
            'acceptance_rate': len(accepted_params) / n_attempts,
            'tolerance': tolerance
        }
        
        print(f"ABC Result: Pm={posterior_means[0]:.3f}±{posterior_stds[0]:.3f}, "
              f"PI={posterior_means[1]:.4f}±{posterior_stds[1]:.4f}, "
              f"PR={posterior_means[2]:.4f}±{posterior_stds[2]:.4f}, MSE={mse:.6f}")
        
        return posterior_means, accepted_params

def generate_synthetic_data(true_params=[1.2, 0.08, 0.01], noise_level=0.02, lattice_size=40):
    """Generate synthetic observed data"""
    print("Generating synthetic observed data...")
    
    Pm, PI, PR = true_params
    model = SIR_ABM(
        lattice_size=lattice_size,
        initial_infected_fraction=0.01,
        initial_susceptible_fraction=0.49,
        Pm=Pm, PI=PI, PR=PR
    )
    
    model.run(max_time=3.0, record_interval=0.1)
    
    # Add noise to infected fraction
    times = np.array(model.history['time'])
    I_clean = np.array(model.history['I'])
    I_noisy = I_clean + np.random.normal(0, noise_level, len(I_clean))
    I_noisy = np.clip(I_noisy, 0, 1)  # Keep in [0,1]
    
    observed_data = {
        'time': times,
        'S': np.array(model.history['S']),
        'I': I_noisy,
        'R': np.array(model.history['R'])
    }
    
    print(f"Generated data with {len(times)} time points")
    return observed_data, true_params

def compare_all_methods(observed_data, lattice_size=40):
    """Run all estimation methods and compare results"""
    estimator = ParameterEstimator(observed_data, lattice_size)
    
    print("="*60)
    print("PARAMETER ESTIMATION COMPARISON")
    print("="*60)
    
    # Run all methods
    estimator.least_squares()
    print()
    
    estimator.mle_estimation()
    print()
    
    estimator.bayesian_mcmc(n_samples=800, burn_in=200)
    print()
    
    estimator.abc_estimation(n_samples=300, tolerance=0.015)
    print()
    
    return estimator

def plot_results(estimator, true_params=None):
    """Plot comparison of results"""
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    
    # Plot 1: Parameter estimates
    methods = []
    param_estimates = []
    param_errors = []
    
    for method, result in estimator.results.items():
        methods.append(method.replace('_', ' ').title())
        param_estimates.append(result['params'])
        if 'std' in result:
            param_errors.append(result['std'])
        else:
            param_errors.append([0, 0, 0])
    
    param_estimates = np.array(param_estimates)
    param_errors = np.array(param_errors)
    
    x = np.arange(len(methods))
    width = 0.25
    
    axes[0,0].bar(x - width, param_estimates[:, 0], width, yerr=param_errors[:, 0], 
                  label='Pm', alpha=0.8, capsize=5)
    axes[0,0].bar(x, param_estimates[:, 1]*10, width, yerr=param_errors[:, 1]*10, 
                  label='PI×10', alpha=0.8, capsize=5)
    axes[0,0].bar(x + width, param_estimates[:, 2]*100, width, yerr=param_errors[:, 2]*100, 
                  label='PR×100', alpha=0.8, capsize=5)
    
    if true_params:
        axes[0,0].axhline(true_params[0], color='red', linestyle='--', alpha=0.7, label='True Pm')
        axes[0,0].axhline(true_params[1]*10, color='green', linestyle='--', alpha=0.7, label='True PI×10')
        axes[0,0].axhline(true_params[2]*100, color='blue', linestyle='--', alpha=0.7, label='True PR×100')
    
    axes[0,0].set_xlabel('Method')
    axes[0,0].set_ylabel('Parameter Value')
    axes[0,0].set_title('Parameter Estimates Comparison')
    axes[0,0].set_xticks(x)
    axes[0,0].set_xticklabels(methods, rotation=45)
    axes[0,0].legend()
    axes[0,0].grid(True, alpha=0.3)
    
    # Plot 2: MSE comparison
    mse_values = [result['mse'] for result in estimator.results.values()]
    axes[0,1].bar(methods, mse_values, alpha=0.8, color='orange')
    axes[0,1].set_xlabel('Method')
    axes[0,1].set_ylabel('Mean Squared Error')
    axes[0,1].set_title('Model Fit Quality (MSE)')
    axes[0,1].tick_params(axis='x', rotation=45)
    axes[0,1].grid(True, alpha=0.3)
    
    # Plot 3: Fit visualization
    obs_times = estimator.observed_data['time']
    obs_I = estimator.observed_data['I']
    
    axes[1,0].plot(obs_times, obs_I, 'ko-', label='Observed', markersize=4)
    
    colors = ['red', 'blue', 'green', 'purple']
    for i, (method, result) in enumerate(estimator.results.items()):
        sim_data = estimator.simulate_sir(result['params'])
        if sim_data:
            sim_times = np.array(sim_data['time'])
            sim_I = np.array(sim_data['I'])
            axes[1,0].plot(sim_times, sim_I, color=colors[i%len(colors)], 
                          label=f"{method.replace('_', ' ').title()}", alpha=0.8)
    
    axes[1,0].set_xlabel('Time')
    axes[1,0].set_ylabel('Infected Fraction')
    axes[1,0].set_title('Model Fits')
    axes[1,0].legend()
    axes[1,0].grid(True, alpha=0.3)
    
    # Plot 4: Posterior distributions (if available)
    if 'bayesian' in estimator.results and 'samples' in estimator.results['bayesian']:
        samples = estimator.results['bayesian']['samples']
        axes[1,1].hist(samples[:, 1], bins=30, alpha=0.7, label='PI (Bayesian)', density=True)
        axes[1,1].axvline(samples[:, 1].mean(), color='red', linestyle='--', label='Mean')
        if true_params:
            axes[1,1].axvline(true_params[1], color='green', linestyle='--', label='True')
        axes[1,1].set_xlabel('PI Parameter')
        axes[1,1].set_ylabel('Density')
        axes[1,1].set_title('Posterior Distribution (PI)')
        axes[1,1].legend()
        axes[1,1].grid(True, alpha=0.3)
    else:
        axes[1,1].text(0.5, 0.5, 'No posterior\nsamples available', 
                      transform=axes[1,1].transAxes, ha='center', va='center')
        axes[1,1].set_title('Posterior Distribution')
    
    plt.tight_layout()
    plt.show()

def create_summary_table(estimator, true_params=None):
    """Create a summary table of results"""
    data = []
    
    for method, result in estimator.results.items():
        row = {
            'Method': method.replace('_', ' ').title(),
            'Pm': f"{result['params'][0]:.3f}",
            'PI': f"{result['params'][1]:.4f}",
            'PR': f"{result['params'][2]:.4f}",
            'MSE': f"{result['mse']:.6f}"
        }
        
        if 'std' in result:
            row['Pm'] += f" ± {result['std'][0]:.3f}"
            row['PI'] += f" ± {result['std'][1]:.4f}"
            row['PR'] += f" ± {result['std'][2]:.4f}"
        
        if 'acceptance_rate' in result:
            row['Accept Rate'] = f"{result['acceptance_rate']:.3f}"
        
        data.append(row)
    
    if true_params:
        data.append({
            'Method': 'TRUE VALUES',
            'Pm': f"{true_params[0]:.3f}",
            'PI': f"{true_params[1]:.4f}",
            'PR': f"{true_params[2]:.4f}",
            'MSE': 'N/A'
        })
    
    return pd.DataFrame(data)

# Example usage function
def run_comparison_example():
    """Run a complete comparison example"""
    print("SIMPLE SIR PARAMETER ESTIMATION COMPARISON")
    print("==========================================")
    
    # Generate synthetic data
    true_params = [1.2, 0.08, 0.01]  # Pm, PI, PR
    observed_data, _ = generate_synthetic_data(true_params, noise_level=0.02)
    
    # Run comparison
    estimator = compare_all_methods(observed_data)
    
    # Show results
    print("\n" + "="*60)
    print("SUMMARY TABLE")
    print("="*60)
    summary = create_summary_table(estimator, true_params)
    print(summary.to_string(index=False))
    
    # Plot results
    plot_results(estimator, true_params)
    
    return estimator, observed_data, true_params

# Quick test functions
def quick_test_ls(observed_data):
    """Quick least squares test"""
    estimator = ParameterEstimator(observed_data)
    return estimator.least_squares()

def quick_test_mle(observed_data):
    """Quick MLE test"""
    estimator = ParameterEstimator(observed_data)
    return estimator.mle_estimation()

if __name__ == "__main__":
    # Run the complete example
    estimator, data, true_params = run_comparison_example()