In [440]:
import numpy as np
import numpy.typing as npt
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score
from sklearn.metrics import r2_score
from dataclasses import dataclass
from typing import Dict, Iterable, Sequence, Tuple, Optional, Union
import warnings

# Suppress sklearn numerical warnings
warnings.filterwarnings('ignore', message='Ill-conditioned matrix')
warnings.filterwarnings('ignore', category=UserWarning, module='sklearn')

@dataclass
class PowerLawModel:
    """Power law regression model for algorithm performance prediction."""
    coefficients: np.ndarray
    intercept: float
    r2_score: float
    parameter_names: list
    
    def predict(self, **kwargs) -> Union[np.ndarray, float]:
        """Predict performance using parameter values."""
        # Ensure all required parameters are provided
        for param in self.parameter_names:
            if param not in kwargs:
                raise ValueError(f"Missing required parameter: {param}")
        
        # Create feature matrix
        features = np.column_stack([np.log(np.maximum(kwargs[param], 1e-8)) 
                                   for param in self.parameter_names])
        
        # Handle single prediction vs batch prediction
        if features.shape[0] == 1:
            log_prediction = self.intercept + np.dot(features[0], self.coefficients)
            return np.exp(log_prediction)
        else:
            log_predictions = self.intercept + np.dot(features, self.coefficients)
            return np.exp(log_predictions)
    
    def get_formula(self) -> str:
        """Return human-readable power law formula."""
        a = np.exp(self.intercept)
        terms = [f"{param}**{coef:.3f}" for param, coef in zip(self.parameter_names, self.coefficients)]
        return f"Performance = {a:.10f} * " + " * ".join(terms)

class PowerLawRegressor:
    """Robust power law regression with flexible parameter handling."""
    
    def __init__(self):
        self.model = None
        self.parameter_names = None
    
    def fit(self, data: Dict[str, np.ndarray], performance: np.ndarray, 
            parameters: Optional[list] = None) -> PowerLawModel:
        """
        Fit power law model to performance data.
        
        Args:
            data: Dictionary with parameter names as keys and arrays as values
            performance: Array of performance measurements
            parameters: List of parameter names to include in model (None = all)
        """
        if parameters is None:
            parameters = list(data.keys())
        
        self.parameter_names = parameters
        
        # Create log-transformed feature matrix
        epsilon = 1e-8
        features = np.column_stack([
            np.log(np.maximum(data[param], epsilon)) for param in parameters
        ])
        log_performance = np.log(np.maximum(performance, epsilon))
        
        # Fit ridge regression for numerical stability
        ridge = Ridge(alpha=1e-6)
        ridge.fit(features, log_performance)
        
        # Calculate R² score
        log_predictions = ridge.predict(features)
        r2 = r2_score(log_performance, log_predictions)
        
        self.model = PowerLawModel(
            coefficients=ridge.coef_,
            intercept=ridge.intercept_,
            r2_score=r2,
            parameter_names=parameters
        )
        
        return self.model
    
    def cross_validate(self, data: Dict[str, np.ndarray], performance: np.ndarray, 
                      parameters: Optional[list] = None, cv: int = 5) -> Dict:
        """Perform cross-validation for power law model."""
        if parameters is None:
            parameters = list(data.keys())
        
        epsilon = 1e-8
        features = np.column_stack([
            np.log(np.maximum(data[param], epsilon)) for param in parameters
        ])
        log_performance = np.log(np.maximum(performance, epsilon))
        
        ridge = Ridge(alpha=1e-6)
        scores = cross_val_score(ridge, features, log_performance, cv=cv, scoring='r2')
        
        return {
            'mean_r2': scores.mean(),
            'std_r2': scores.std(),
            'individual_scores': scores
        }

def generate_realistic_test_data(
        n_samples: int = 1000, 
        o_fixed: Optional[int] = None, 
        g_fixed: Optional[int] = None, 
        random_seed: int = 42,
    ) -> Dict[str, npt.NDArray[np.float64]]:
    """
    Generate synthetic performance data with consistent power law relationship.
    
    Args:
        n_samples: Number of base samples for n and k
        o_fixed: If specified, use this fixed value for o
        g_fixed: If specified, use this fixed value for g  
        random_seed: Random seed for reproducibility
        
    Returns:
        Dictionary containing parameter arrays and performance measurements
    """
    np.random.seed(random_seed)
    
    # Generate base parameter samples
    n_base = np.random.randint(10, 1000, n_samples)
    k_base = np.random.randint(1, 100, n_samples)
    
    # Determine o and g values
    if o_fixed is not None and g_fixed is not None:
        # Fixed o and g case
        o_values = [o_fixed]
        g_values = [g_fixed]
    elif o_fixed is not None:
        # Fixed o, vary g
        o_values = [o_fixed]
        g_values = list(range(1, 6))
    elif g_fixed is not None:
        # Fixed g, vary o
        o_values = list(range(1, 6))
        g_values = [g_fixed]
    else:
        # Full grid of o and g values
        o_values = list(range(1, 6))
        g_values = list(range(1, 6))
    
    # Generate complete dataset
    all_n, all_k, all_o, all_g, all_performance = [], [], [], [], []
    
    # True power law parameters - consistent across all data
    true_a = 0.001
    true_n_exp = 1.2
    true_k_exp = 0.8
    true_o_exp = 0.6
    true_g_exp = 0.4
    
    for o_val in o_values:
        for g_val in g_values:
            # Use same n,k samples for each (o,g) combination
            n_samples_og = n_base.copy()
            k_samples_og = k_base.copy()
            o_samples_og = np.full(n_samples, o_val)
            g_samples_og = np.full(n_samples, g_val)
            
            # Calculate true performance using consistent power law
            true_performance = (true_a * 
                              (n_samples_og ** true_n_exp) * 
                              (k_samples_og ** true_k_exp) * 
                              (o_samples_og ** true_o_exp) * 
                              (g_samples_og ** true_g_exp))
            
            # Add realistic multiplicative noise
            noise = np.random.lognormal(0, 0.1, n_samples)
            noisy_performance = true_performance * noise
            
            all_n.extend(n_samples_og)
            all_k.extend(k_samples_og)
            all_o.extend(o_samples_og)
            all_g.extend(g_samples_og)
            all_performance.extend(noisy_performance)
    
    result = {
        'n': np.array(all_n),
        'k': np.array(all_k),
        'o': np.array(all_o),
        'g': np.array(all_g),
        'performance': np.array(all_performance)
    }
    
    # Print generation summary
    print(f"Generated {len(all_n)} total data points")
    print(f"True relationship: Performance = {true_a} × n^{true_n_exp} × k^{true_k_exp} × o^{true_o_exp} × g^{true_g_exp}")
    print(f"Parameter ranges: n=[{result['n'].min()}-{result['n'].max()}], k=[{result['k'].min()}-{result['k'].max()}], o=[{result['o'].min()}-{result['o'].max()}], g=[{result['g'].min()}-{result['g'].max()}]")
    print(f"Performance range: [{result['performance'].min():.2e} - {result['performance'].max():.2e}]")
    
    return result

def analyze_power_law_regression(data: Dict[str, npt.NDArray[np.float64]], 
                               scenarios: list) -> Dict:
    """
    Comprehensive power law regression analysis for different parameter scenarios.
    
    Args:
        data: Dictionary containing parameter arrays and performance
        scenarios: List of parameter combinations to analyze
    """
    
    regressor = PowerLawRegressor()
    results = {}
    
    performance = data['performance']
    
    for scenario in scenarios:
        # print(f"\n=== Analyzing scenario: {scenario} ===")
        
        # Fit model
        model = regressor.fit(data, performance, parameters=scenario)
        print(f"Fitted model: {model.get_formula()}")
        print(f"R² score: {model.r2_score:.4f}", end = '; ')
        
        # Cross-validation
        cv_results = regressor.cross_validate(data, performance, parameters=scenario)
        print(f"Cross-validation R²: {cv_results['mean_r2']:.4f} ± {cv_results['std_r2']:.4f}")
        
        results[tuple(scenario)] = {
            'model': model,
            'cv_results': cv_results
        }
    
    return results

# Example usage scenarios
def demonstrate_analysis():
    """Demonstrate different analysis scenarios."""
    
    print("=== Scenario 1: Full 4-parameter analysis ===")
    data_full = generate_realistic_test_data(n_samples=500)
    results_full = analyze_power_law_regression(data_full, [['n', 'k', 'o', 'g']])
    return results_full #, results_fixed, results_subset

def read_data(fn: str, function_name: str, parameter_names: dict[str, int]) -> dict[str, npt.NDArray[np.float64]]:
    import json
    with open(fn, 'r') as f:
        json_data = json.load(f)
    assert function_name in json_data.keys()
    samples = json_data[function_name]
    # sample1 = samples[0]
    # assert len(sample1) == len(parameter_names) + 1
    samples_arr = np.array(samples, dtype=np.float64)
    
    data = {}
    for parameter_name, i in parameter_names.items():
        data[parameter_name] = samples_arr[:,i]
    data['performance'] = samples_arr[:,-1]
    
    from pprint import pprint
    # print('data:')
    # pprint(data)
    return data


alg_names = ['var_direct_np', 'var_hypo']


print('*'*80)
print(f'* fit k: performance of var_direct_np')
data = read_data('running_time_data_compute_var.json', 'var_direct_np', {'k': 1})
results_full = analyze_power_law_regression(data, [['k']])

print(f'* fit o: performance of var_hypo')
data = read_data('running_time_data_compute_var.json', 'var_hypo', {'o': 2})
results_full = analyze_power_law_regression(data, [['o']])

print('\n' + '*'*80)
for alg_name in alg_names:
    print(f'* fit k, o: performance of {alg_name}')
    data = read_data('running_time_data_compute_var.json', alg_name, {'k': 1, 'o': 2})
    results_full = analyze_power_law_regression(data, [['k', 'o']])

print('\n' + '*'*80)
for alg_name in alg_names:
    print(f'* fit n, k, o, g: performance of {alg_name}')
    data = read_data('running_time_data_compute_var.json', alg_name, {'n': 0, 'k': 1, 'o': 2, 'g': 3})
    results_full = analyze_power_law_regression(data, [['n', 'k', 'o', 'g']])

********************************************************************************
* fit k: performance of var_direct_np
Fitted model: Performance = 0.0000006664 * k**0.769
R² score: 0.9239; Cross-validation R²: 0.9238 ± 0.0015
* fit o: performance of var_hypo
Fitted model: Performance = 0.0003939771 * o**1.954
R² score: 0.8363; Cross-validation R²: 0.8347 ± 0.0264

********************************************************************************
* fit k, o: performance of var_direct_np
Fitted model: Performance = 0.0000006062 * k**0.769 * o**0.099
R² score: 0.9241; Cross-validation R²: 0.9240 ± 0.0015
* fit k, o: performance of var_hypo
Fitted model: Performance = 0.0007118038 * k**-0.064 * o**1.954
R² score: 0.9005; Cross-validation R²: 0.8967 ± 0.0066

********************************************************************************
* fit n, k, o, g: performance of var_direct_np
Fitted model: Performance = 0.0000005710 * n**0.035 * k**0.705 * o**0.099 * g**0.012
R² score: 0.9246; Cross-

In [237]:
import numpy as np
import numpy.typing as npt
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score
from sklearn.metrics import r2_score
from scipy.optimize import minimize, differential_evolution
from dataclasses import dataclass
from typing import Dict, Iterable, Sequence, Tuple, Optional, Union
import warnings

# Suppress sklearn numerical warnings
warnings.filterwarnings('ignore', message='Ill-conditioned matrix')
warnings.filterwarnings('ignore', category=UserWarning, module='sklearn')

@dataclass
class PowerLawModel:
    """Power law regression model for algorithm performance prediction."""
    coefficients: np.ndarray
    intercept: float
    additive_constant: float
    r2_score: float
    parameter_names: list
    has_additive_constant: bool = False
    fitting_summary: str = ""
    
    def predict(self, **kwargs) -> Union[np.ndarray, float]:
        """Predict performance using parameter values."""
        # Ensure all required parameters are provided
        for param in self.parameter_names:
            if param not in kwargs:
                raise ValueError(f"Missing required parameter: {param}")
        
        if self.has_additive_constant:
            # Direct calculation for additive constant model
            # Performance = a × n^b1 × k^b2 × ... + c
            a = self.coefficients[0]  # First coefficient is 'a'
            exponents = self.coefficients[1:]  # Rest are exponents
            
            # Handle single vs batch prediction
            param_values = [kwargs[param] for param in self.parameter_names]
            param_arrays = [np.atleast_1d(val) for val in param_values]
            
            # Calculate power law part
            power_law_part = a * np.ones(len(param_arrays[0]))
            for i, param_array in enumerate(param_arrays):
                power_law_part *= param_array ** exponents[i]
            
            result = power_law_part + self.additive_constant
            
            # Return scalar if input was scalar
            if all(len(arr) == 1 for arr in param_arrays):
                return float(result[0])
            return result
            
        else:
            # Original log-space calculation
            features = np.column_stack([np.log(np.maximum(kwargs[param], 1e-8)) 
                                       for param in self.parameter_names])
            
            # Handle single prediction vs batch prediction
            if features.shape[0] == 1:
                log_prediction = self.intercept + np.dot(features[0], self.coefficients)
                return np.exp(log_prediction)
            else:
                log_predictions = self.intercept + np.dot(features, self.coefficients)
                return np.exp(log_predictions)
    
    def get_formula(self) -> str:
        """Return human-readable power law formula."""
        if self.has_additive_constant:
            a = self.coefficients[0]
            exponents = self.coefficients[1:]
            terms = [f"{param}**{exp:.3f}" for param, exp in zip(self.parameter_names, exponents)]
            power_part = f"{a:.10f} * " + " * ".join(terms)
            return f"Performance = {power_part} + {self.additive_constant}"
        else:
            a = np.exp(self.intercept)
            terms = [f"{param}**{coef:.3f}" for param, coef in zip(self.parameter_names, self.coefficients)]
            return f"Performance = {a:.10f} * " + " * ".join(terms)

class PowerLawRegressor:
    """Robust power law regression with flexible parameter handling and additive constant option."""
    
    def __init__(self):
        self.model = None
        self.parameter_names = None
    
    def fit(self, data: Dict[str, np.ndarray], performance: np.ndarray, 
            parameters: Optional[list] = None, 
            include_additive_constant: bool = False) -> PowerLawModel:
        """
        Fit power law model to performance data.
        
        Args:
            data: Dictionary with parameter names as keys and arrays as values
            performance: Array of performance measurements
            parameters: List of parameter names to include in model (None = all)
            include_additive_constant: If True, fit model with additive constant
        """
        if parameters is None:
            parameters = list(data.keys())
        
        self.parameter_names = parameters
        
        if include_additive_constant:
            return self._fit_with_additive_constant(data, performance, parameters)
        else:
            return self._fit_standard(data, performance, parameters)
    
    def _fit_standard(self, data: Dict[str, np.ndarray], performance: np.ndarray,
                     parameters: list) -> PowerLawModel:
        """Fit standard power law model without additive constant."""
        # Create log-transformed feature matrix
        epsilon = 1e-8
        features = np.column_stack([
            np.log(np.maximum(data[param], epsilon)) for param in parameters
        ])
        log_performance = np.log(np.maximum(performance, epsilon))
        
        # Fit ridge regression for numerical stability
        ridge = Ridge(alpha=1e-6)
        ridge.fit(features, log_performance)
        
        # Calculate R² score
        log_predictions = ridge.predict(features)
        r2 = r2_score(log_performance, log_predictions)
        
        return PowerLawModel(
            coefficients=ridge.coef_,
            intercept=ridge.intercept_,
            additive_constant=0.0,
            r2_score=r2,
            parameter_names=parameters,
            has_additive_constant=False,
            fitting_summary=""
        )
    
    def _fit_with_additive_constant(self, data: Dict[str, np.ndarray], performance: np.ndarray,
                                   parameters: list) -> PowerLawModel:
        """Fit power law model with additive constant using non-linear optimization."""
        
        # First get the standard power law fit as baseline
        standard_model = self._fit_standard(data, performance, parameters)
        standard_predictions = standard_model.predict(**{param: data[param] for param in parameters})
        standard_sse = np.sum((performance - standard_predictions) ** 2)
        
        summary_parts = []
        summary_parts.append(f"Standard power law R²: {standard_model.r2_score:.4f}")
        
        def objective_function(params):
            """Objective function to minimize: sum of squared residuals."""
            try:
                a = params[0]  # Multiplicative coefficient
                exponents = params[1:-1]  # Exponents for each parameter
                c = params[-1]  # Additive constant
                
                # Calculate predicted performance: a × n^b1 × k^b2 × ... + c
                predicted = a * np.ones(len(performance))
                for i, param in enumerate(parameters):
                    predicted *= data[param] ** exponents[i]
                predicted += c
                
                # Return sum of squared errors
                residuals = performance - predicted
                return np.sum(residuals ** 2)
                
            except (OverflowError, ValueError, FloatingPointError):
                return 1e15  # Return large value for invalid parameters
        
        # Strategy 1: Start from standard power law solution (c=0)
        a_init = np.exp(standard_model.intercept)
        exponents_init = standard_model.coefficients.copy()
        initial_guess_1 = [a_init] + list(exponents_init) + [0.0]
        
        # Strategy 2: Try small positive additive constant
        mean_performance = np.mean(performance)
        initial_guess_2 = [a_init] + list(exponents_init) + [0.01 * mean_performance]
        
        # Strategy 3: Try negative additive constant
        initial_guess_3 = [a_init] + list(exponents_init) + [-0.01 * mean_performance]
        
        # Set reasonable bounds
        bounds = []
        bounds.append((1e-15, 1e15))  # Bounds for coefficient 'a'
        for _ in parameters:
            bounds.append((-10.0, 10.0))  # Bounds for exponents
        bounds.append((-2*np.max(performance), 2*np.max(performance)))  # Bounds for additive constant
        
        # Try multiple optimization approaches with different starting points
        best_result = None
        best_objective = standard_sse  # Start with standard model performance
        best_method = "none"
        
        optimization_attempts = [
            (initial_guess_1, "standard_start"),
            (initial_guess_2, "positive_constant_start"),
            (initial_guess_3, "negative_constant_start")
        ]
        
        for initial_guess, description in optimization_attempts:
            # L-BFGS-B optimization
            try:
                result = minimize(objective_function, initial_guess, method='L-BFGS-B', 
                                bounds=bounds, options={'maxiter': 1000})
                if result.success and result.fun < best_objective:
                    best_result = result
                    best_objective = result.fun
                    best_method = f"L-BFGS-B with {description}"
            except:
                pass
            
            # Try differential evolution from this starting point
            try:
                # Use tighter bounds around the initial guess for more focused search
                tight_bounds = []
                for i, val in enumerate(initial_guess):
                    if i < len(bounds):
                        lower = max(bounds[i][0], val * 0.1)
                        upper = min(bounds[i][1], val * 10.0)
                        tight_bounds.append((lower, upper))
                    else:
                        tight_bounds.append(bounds[i])
                
                result = differential_evolution(objective_function, tight_bounds, 
                                              seed=42, maxiter=200)
                if result.success and result.fun < best_objective:
                    best_result = result
                    best_objective = result.fun
                    best_method = f"Differential evolution with {description}"
            except:
                pass
        
        # If no improvement found, return standard model
        if best_result is None or best_objective >= standard_sse * 0.999:  # Allow tiny numerical differences
            summary_parts.append("No improvement found with additive constant")
            standard_model.fitting_summary = "; ".join(summary_parts)
            return standard_model
        
        # Extract fitted parameters
        a_fitted = best_result.x[0]
        exponents_fitted = best_result.x[1:-1]
        c_fitted = best_result.x[-1]
        
        # Calculate R² score for the fitted model
        predicted_performance = a_fitted * np.ones(len(performance))
        for i, param in enumerate(parameters):
            predicted_performance *= data[param] ** exponents_fitted[i]
        predicted_performance += c_fitted
        
        r2 = r2_score(performance, predicted_performance)
        
        # Verify this actually improved over standard model
        if r2 < standard_model.r2_score * 0.999:  # Allow tiny numerical differences
            summary_parts.append(f"Additive constant R² ({r2:.4f}) worse than standard, using standard model")
            standard_model.fitting_summary = "; ".join(summary_parts)
            return standard_model
        
        improvement = r2 - standard_model.r2_score
        summary_parts.append(f"Additive constant R²: {r2:.4f} (improvement: +{improvement:.4f}, method: {best_method})")
        
        # Package results
        coefficients = np.array([a_fitted] + list(exponents_fitted))
        
        return PowerLawModel(
            coefficients=coefficients,
            intercept=0.0,  # Not used for additive constant model
            additive_constant=c_fitted,
            r2_score=r2,
            parameter_names=parameters,
            has_additive_constant=True,
            fitting_summary="; ".join(summary_parts)
        )
    
    def cross_validate(self, data: Dict[str, np.ndarray], performance: np.ndarray, 
                      parameters: Optional[list] = None, cv: int = 5,
                      include_additive_constant: bool = False) -> Dict:
        """Perform cross-validation for power law model."""
        if parameters is None:
            parameters = list(data.keys())
        
        if include_additive_constant:
            # For additive constant models, we need custom CV since sklearn doesn't support our model
            return self._cross_validate_additive_constant(data, performance, parameters, cv)
        else:
            # Standard CV for log-linear models
            epsilon = 1e-8
            features = np.column_stack([
                np.log(np.maximum(data[param], epsilon)) for param in parameters
            ])
            log_performance = np.log(np.maximum(performance, epsilon))
            
            ridge = Ridge(alpha=1e-6)
            scores = cross_val_score(ridge, features, log_performance, cv=cv, scoring='r2')
            
            return {
                'mean_r2': scores.mean(),
                'std_r2': scores.std(),
                'individual_scores': scores
            }
    
    def _cross_validate_additive_constant(self, data: Dict[str, np.ndarray], 
                                         performance: np.ndarray, parameters: list, 
                                         cv: int) -> Dict:
        """Custom cross-validation for additive constant models."""
        from sklearn.model_selection import KFold
        
        kf = KFold(n_splits=cv, shuffle=True, random_state=42)
        scores = []
        
        for train_idx, val_idx in kf.split(performance):
            # Split data
            train_data = {param: data[param][train_idx] for param in parameters}
            train_performance = performance[train_idx]
            val_data = {param: data[param][val_idx] for param in parameters}
            val_performance = performance[val_idx]
            
            # Fit model on training data
            try:
                train_model = self._fit_with_additive_constant(train_data, train_performance, parameters)
                
                # Predict on validation data
                val_predictions = train_model.predict(**val_data)
                
                # Calculate R² score
                r2 = r2_score(val_performance, val_predictions)
                scores.append(r2)
                
            except:
                # If fitting fails, append a poor score
                scores.append(-1.0)
        
        scores = np.array(scores)
        return {
            'mean_r2': scores.mean(),
            'std_r2': scores.std(),
            'individual_scores': scores
        }

def analyze_power_law_regression(data: Dict[str, npt.NDArray[np.float64]], 
                               scenarios: Optional[list] = None,
                               include_additive_constant: bool = False,
                               show_fitting_details: bool = False) -> Dict:
    """
    Comprehensive power law regression analysis for different parameter scenarios.
    
    Args:
        data: Dictionary containing parameter arrays and performance
        scenarios: List of parameter combinations to analyze
        include_additive_constant: If True, fit models with additive constants
        show_fitting_details: If True, show detailed fitting information
    """
    if scenarios is None:
        scenarios = [
            ['k', 'o'],
        ]
    
    regressor = PowerLawRegressor()
    results = {}
    
    performance = data['performance']
    
    for scenario in scenarios:
        # Fit model
        model = regressor.fit(data, performance, parameters=scenario, 
                            include_additive_constant=include_additive_constant)
        print(f"Fitted model: {model.get_formula()}")
        print(f"R² score: {model.r2_score:.4f}", end = '; ')
        
        # Show fitting summary if requested or if using additive constants
        if (show_fitting_details or include_additive_constant) and model.fitting_summary:
            print(f"\nFitting details: {model.fitting_summary}")
        
        # Cross-validation
        cv_results = regressor.cross_validate(data, performance, parameters=scenario,
                                            include_additive_constant=include_additive_constant)
        print(f"Cross-validation R²: {cv_results['mean_r2']:.4f} ± {cv_results['std_r2']:.4f}")
        
        results[tuple(scenario)] = {
            'model': model,
            'cv_results': cv_results
        }
    
    return results

# Example usage comparing both approaches
def compare_models_with_and_without_constant(data, parameters):
    """Compare standard power law vs power law with additive constant."""
    
    print("=== Standard Power Law Model ===")
    results_standard = analyze_power_law_regression(data, [parameters], include_additive_constant=False)
    
    print("\n=== Power Law with Additive Constant ===")
    results_additive = analyze_power_law_regression(data, [parameters], include_additive_constant=True)
    
    return results_standard, results_additive

# For your existing workflow, you can now use:
# results = analyze_power_law_regression(data, [['k', 'o']], include_additive_constant=True)


def read_data(fn: str, function_name: str, parameter_names: dict[str, int]) -> dict[str, npt.NDArray[np.float64]]:
    import json
    with open(fn, 'r') as f:
        json_data = json.load(f)
    assert function_name in json_data.keys()
    samples = json_data[function_name]
    # sample1 = samples[0]
    # assert len(sample1) == len(parameter_names) + 1
    samples_arr = np.array(samples, dtype=np.float64)
    
    data = {}
    for parameter_name, i in parameter_names.items():
        data[parameter_name] = samples_arr[:,i]
    data['performance'] = samples_arr[:,-1]
    
    from pprint import pprint
    # print('data:')
    # pprint(data)
    return data


# alg_names = ['var_direct_np', 'var_hypo']

# print('*'*80)
# print(f'* fit k: performance of var_direct_np')
# data = read_data('running_time_data_compute_var.json', 'var_direct_np', {'k': 1})
# # results_full = analyze_power_law_regression(data, [['k']], include_additive_constant=False)
# results_full = analyze_power_law_regression(data, [['k']], include_additive_constant=True)

# print('\n' + '*'*80)
# print(f'* fit o: performance of var_hypo')
# data = read_data('running_time_data_compute_var.json', 'var_hypo', {'o': 2})
# # results_full = analyze_power_law_regression(data, [['o']], include_additive_constant=False)
# results_full = analyze_power_law_regression(data, [['o']], include_additive_constant=True)

# for alg_name in alg_names:
#     print('\n' + '*'*80)
#     print(f'* fit k, o: performance of {alg_name}')
#     data = read_data('running_time_data_compute_var.json', alg_name, {'k': 1, 'o': 2})
#     # results_full = analyze_power_law_regression(data, [['k', 'o']], include_additive_constant=False)
#     results_full = analyze_power_law_regression(data, [['k', 'o']], include_additive_constant=True)

# for alg_name in alg_names:
#     print('\n' + '*'*80)
#     print(f'* fit n, k, o, g: performance of {alg_name}')
#     data = read_data('running_time_data_compute_var.json', alg_name, {'n': 0, 'k': 1, 'o': 2, 'g': 3})
#     # results_full = analyze_power_law_regression(data, [['n', 'k', 'o', 'g']], include_additive_constant=False)
#     results_full = analyze_power_law_regression(data, [['n', 'k', 'o', 'g']], include_additive_constant=True)

alg_names = ['mean_direct_np', 'mean_hypo']

print('*'*80)
print(f'* fit k: performance of mean_direct_np')
data = read_data('running_time_data_compute_mean.json', 'mean_direct_np', {'k': 1})
# results_full = analyze_power_law_regression(data, [['k']], include_additive_constant=False)
results_full = analyze_power_law_regression(data, [['k']], include_additive_constant=True)

print('\n' + '*'*80)
print(f'* fit o: performance of mean_hypo')
data = read_data('running_time_data_compute_mean.json', 'mean_hypo', {'o': 2})
# results_full = analyze_power_law_regression(data, [['o']], include_additive_constant=False)
results_full = analyze_power_law_regression(data, [['o']], include_additive_constant=True)

for alg_name in alg_names:
    print('\n' + '*'*80)
    print(f'* fit k, o: performance of {alg_name}')
    data = read_data('running_time_data_compute_mean.json', alg_name, {'k': 1, 'o': 2})
    # results_full = analyze_power_law_regression(data, [['k', 'o']], include_additive_constant=False)
    results_full = analyze_power_law_regression(data, [['k', 'o']], include_additive_constant=True)

for alg_name in alg_names[1:]:
    print('\n' + '*'*80)
    print(f'* fit n, k, o: performance of {alg_name}')
    data = read_data('running_time_data_compute_mean.json', alg_name, {'n': 0, 'k': 1, 'o': 2})
    # results_full = analyze_power_law_regression(data, [['n', 'k', 'o', 'g']], include_additive_constant=False)
    results_full = analyze_power_law_regression(data, [['n', 'k', 'o']], include_additive_constant=True)

********************************************************************************
* fit k: performance of mean_direct_np
Fitted model: Performance = 0.0000001035 * k**0.951 + 0.0
R² score: 0.9826; 
Fitting details: Standard power law R²: 0.9151; Additive constant R²: 0.9826 (improvement: +0.0675, method: Differential evolution with standard_start)
Cross-validation R²: 0.9825 ± 0.0043

********************************************************************************
* fit o: performance of mean_hypo
Fitted model: Performance = 0.0002021981 * o**0.855
R² score: 0.8272; 
Fitting details: Standard power law R²: 0.8272; Additive constant R² (0.7051) worse than standard, using standard model
Cross-validation R²: 0.6933 ± 0.0181

********************************************************************************
* fit k, o: performance of mean_direct_np
Fitted model: Performance = 0.0000000927 * k**0.951 * o**0.103 + 0.0
R² score: 0.9861; 
Fitting details: Standard power law R²: 0.9155; Additive c

In [267]:
import time
import gamma
import math
import importlib
import gamma
importlib.reload(gamma)

def predicted_mean_direct_np(k: float) -> str:
    # return gamma.format_time(0.0000001010 * k**0.952)
    return gamma.format_time(0.0000000927 * k**0.951 * o**0.103)

def predicted_mean_hypo(k: float, o: float) -> str:
    return gamma.format_time(0.0002378857 * k**-0.017 * o**0.849)
    # return gamma.format_time(0.0001912085 * o**0.873)
    # return gamma.format_time(0.0000032289 * n**0.051 * k**0.274 * o**0.904)

def actual_mean_direct_np(n, k, o, g) -> str:
    start = time.perf_counter()
    gamma.mean_direct_np(n, k, o, g)
    end = time.perf_counter()
    return gamma.format_time(end - start)

def actual_mean_hypo(n, k, o, g) -> str:
    start = time.perf_counter()
    gamma.mean_hypo(n, k, o, g)
    end = time.perf_counter()
    return gamma.format_time(end - start)

n = 10**6
k = round(math.sqrt(n))
o = 1
g = 1

print(f'      {predicted_mean_hypo(k,o)=}')
print(f'     {actual_mean_hypo(n,k,o,g)=}')
print()
print(f'   {predicted_mean_direct_np(k)=}')
print(f'{actual_mean_direct_np(n,k,o,g)=}')

      predicted_mean_hypo(k,o)='212 μs'
     actual_mean_hypo(n,k,o,g)='258 μs'

   predicted_mean_direct_np(k)='66.1 μs'
actual_mean_direct_np(n,k,o,g)='359 μs'


In [522]:
import time
import gamma
import math
import importlib
import gamma
importlib.reload(gamma)

def predicted_direct(k: float) -> str:
    return gamma.format_time(0.0000000666 * k**0.974)

def predicted_hypo(k: float, o: float) -> str:
    # return gamma.format_time(0.0002419766 * o**2.341 + 0.000374)
    # return gamma.format_time(0.0007250757 * k**-0.065 * o**1.928)
    return gamma.format_time(0.0007118038 * k**-0.064 * o**1.954)
    # return gamma.format_time(0.0003820319 * o**1.958)
    # return gamma.format_time(0.0003939771 * o**1.954)

def actual_direct(n, k, o, g) -> str:
    start = time.perf_counter()
    gamma.var_direct_np(n, k, o, g)
    end = time.perf_counter()
    return gamma.format_time(end - start)

def actual_hypo(n, k, o, g) -> str:
    start = time.perf_counter()
    gamma.var_hypo(n, k, o, g)
    end = time.perf_counter()
    return gamma.format_time(end - start)

n = 10**12
k = round(math.sqrt(n))
o = 5
g = 1

print(f'   {predicted_hypo(k,o)=}')
print(f'  {actual_hypo(n,k,o,g)=}')
print()
print(f'   {predicted_direct(k)=}')
print(f'{actual_direct(n,k,o,g)=}')

   predicted_hypo(k,o)='6.83 ms'
  actual_hypo(n,k,o,g)='3.77 ms'

   predicted_direct(k)='46.5 ms'
actual_direct(n,k,o,g)='19.1 ms'


In [549]:
from scipy.special import comb as sp_comb
from scipy.special import binom
import math

def reciprocals(n: int, k: int, o: int, g: int, method: str) -> npt.NDArray[np.float64]:
    top_values = np.arange(k) * g + n
    if method == 'binom':
        binomial_values = binom(top_values, o)
    elif method == 'sp_comb_exact':
        binomial_values = np.array([sp_comb(top_val, o, exact=True) for top_val in top_values])
    elif method == 'sp_comb':
        binomial_values = sp_comb(top_values, o, exact=False)
    elif method == 'math_comb':
        binomial_values = np.array([math.comb(top_val, o) for top_val in top_values])
    else:
        raise NotImplementedError
    return 1.0 / binomial_values

n = 10**12
k = round(math.sqrt(n))
o = 5
g = 3
# print(f'{reciprocals(n, k, o, g, 'math_comb')=}')
# print(f'{reciprocals(n, k, o, g, 'binom')=}')
# print(f'{reciprocals(n, k, o, g, 'sp_comb')=}')
# print(f'{reciprocals(n, k, o, g, 'sp_comb_exact')=}')
# for method in ['math_comb', 'sp_comb', 'binom', 'sp_comb_exact']:
#     print(f'{method}: ', end = '')
#     %timeit reciprocals(n, k, o, g, method)
# sp_comb(np.array([5,10]), np.array([2,2]), exact=True)
%timeit sp_comb(10**3, 10**2, exact=True)
%timeit math.comb(10**3, 10**2)


11.5 μs ± 594 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
2.86 μs ± 44 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
