# Problema:Insurance Risk Model


In [1]:
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import heapq
import seaborn as sns
from scipy.stats import chi2_contingency
from Insurance import InsuranceRiskSimulation
from statsmodels.stats.proportion import proportion_confint


### Parámetros de la simulación utilizados

In [12]:
params = {
    'nu': 0.2,    
    'mu': 0.05,     
    'lambd': 0.3,  
    'c': 11,      
    'a0': 100,   
    'n0': 5,      
    'T': 365,      
    'claim_dist': lambda: np.random.exponential(scale=30)  
}

###  Estimación de la probabilidad de bancarrota(capital negativo antes de T) con 95% de seguridad

In [11]:
def estimar_prob_precision_d(d, params, batch_size=100):
    resultados = []
    n = 0
    while True:
        # Ejecutar un lote de simulaciones
        for _ in range(batch_size):
            sim = InsuranceRiskSimulation(**params)
            resultados.append(sim.run())
            n += 1
        
        # Calcular estadísticas cada 30 simulaciones 
        if n >= 30:
            p_hat = np.mean(resultados)
            s_cuadrado = np.var(resultados, ddof=1)  # S² insesgado
            error_estandar = np.sqrt(s_cuadrado / n)
            
            if error_estandar < d:
                break
    ci_low, ci_high = proportion_confint(sum(resultados), n, method='wilson')
    return p_hat, (ci_low, ci_high), n

p_hat, (ci_low, ci_high), n = estimar_prob_precision_d(
    d=0.005, 
    params=params, 
    batch_size=100
)
print(f"Probabilidad estimada de no acabar en capital negativo: {p_hat:.2%}")
print(f"Intervalo de Confianza 95%: [{ci_low:.2%}, {ci_high:.2%}]")
print(f"Simulaciones realizadas: {n}")


Probabilidad estimada de no acabar en capital negativo: 100.00%
Intervalo de Confianza 95%: [96.30%, 100.00%]
Simulaciones realizadas: 100


### Función para correr y recolectar información de n simulaciones

In [4]:

def run_multiple_simulations(params, n_simulations=1000):
    results = []
    ruin_times = []
    for _ in range(n_simulations):
        sim = InsuranceRiskSimulation(**params)
        result = sim.run()
        results.append(result)
        if result == 0:
            ruin_times.append(sim.ruin_time)
    return results, ruin_times


## Análisis Exploratorio

### Análisis de sensibilidad para parámetros de forma independiente

In [None]:
# Análisis de sensibilidad para un parámetro
def sensitivity_analysis(param_name, param_values, base_params, n_simulations=1000):
    ruin_probs = []
    lowers = []
    uppers = []
    
    for value in param_values:
        modified_params = base_params.copy()
        modified_params[param_name] = value
        results, _ = run_multiple_simulations(modified_params, n_simulations)
        
        n_ruins = len([r for r in results if r == 0])
        ruin_prob = n_ruins / n_simulations
        ci_low, ci_upp = proportion_confint(n_ruins, n_simulations, method='wilson')
        
        ruin_probs.append(ruin_prob)
        lowers.append(ruin_prob - ci_low)
        uppers.append(ci_upp - ruin_prob)
    
    return ruin_probs, lowers, uppers


def full_sensitivity_analysis(base_params, n_simulations=500):
    """
    Realiza análisis de sensibilidad para todos los parámetros principales
    
    """
    results = {}
    
    # 1. Sensibilidad a ν (tasa de llegada de nuevos clientes)
    nu_values = np.linspace(0.05, 0.5, 10)
    results['nu'] = sensitivity_analysis('nu', nu_values, base_params, n_simulations)
    
    # 2. Sensibilidad a μ (tasa de abandono de clientes)
    mu_values = np.linspace(0.01, 0.3, 10)
    results['mu'] = sensitivity_analysis('mu', mu_values, base_params, n_simulations)
    
    # 3. Sensibilidad a λ (tasa de reclamos por cliente)
    lambda_values = np.linspace(0.01, 0.2, 10)
    results['lambda'] = sensitivity_analysis('lambd', lambda_values, base_params, n_simulations)
    
    # 4. Sensibilidad a c (ingreso por cliente)
    c_values = np.linspace(0.5, 2.0, 10)
    results['c'] = sensitivity_analysis('c', c_values, base_params, n_simulations)
    
    # 5. Sensibilidad a a0 (capital inicial)
    a0_values = np.linspace(10, 200, 10)
    results['a0'] = sensitivity_analysis('a0', a0_values, base_params, n_simulations)
    
    # 6. Sensibilidad a n0 (clientes iniciales)
    n0_values = np.linspace(1, 50, 10)
    results['n0'] = sensitivity_analysis('n0', n0_values, base_params, n_simulations)
    
    # 7. Sensibilidad a T (horizonte temporal)
    T_values = np.linspace(10, 200, 10)
    results['T'] = sensitivity_analysis('T', T_values, base_params, n_simulations)
    
    # Visualización de todos los resultados
    plot_all_sensitivity_results(results)
    
    return results

def plot_all_sensitivity_results(results):
    """Visualiza todos los resultados del análisis de sensibilidad"""
    plt.figure(figsize=(15, 12))
    
    params_order = ['nu', 'mu', 'lambda', 'c', 'a0', 'n0', 'T']
    titles = {
        'nu': 'Tasa de llegada de clientes (ν)',
        'mu': 'Tasa de abandono de clientes (μ)',
        'lambda': 'Tasa de reclamos por cliente (λ)',
        'c': 'Ingreso por cliente (c)',
        'a0': 'Capital inicial (a₀)',
        'n0': 'Clientes iniciales (n₀)',
        'T': 'Horizonte temporal (T)'
    }
    
    for i, param in enumerate(params_order, 1):
        plt.subplot(3, 3, i)
        values = np.linspace(0, 1, 10) 
        ruin_probs, lowers, uppers = results[param]
        
        plt.errorbar(values, ruin_probs, yerr=[lowers, uppers], 
                    fmt='-o', capsize=5, label=param)
        plt.title(titles[param])
        plt.xlabel('Valor del parámetro (normalizado)')
        plt.ylabel('Probabilidad de ruina')
        plt.grid(True)
    
    plt.tight_layout()
    plt.show()
    
sensitivity_results = full_sensitivity_analysis(params, n_simulations=500)

### Análisis de sensibilidad para la distribución del parámetro de reclamos

In [None]:
def claim_distribution_sensitivity(base_params, n_simulations=500):
    """Analiza sensibilidad a diferentes distribuciones de reclamos"""
    distributions = {
        'Exponencial (media=10)': lambda: np.random.exponential(scale=10),
        'Pareto (forma=2, escala=5)': lambda: np.random.pareto(2) * 5,
        'Lognormal (μ=2, σ=0.5)': lambda: np.random.lognormal(2, 0.5),
        'Normal truncada (μ=10, σ=3)': lambda: max(0, np.random.normal(10, 3))
    }
    
    results = {}
    for name, dist in distributions.items():
        params = base_params.copy()
        params['claim_dist'] = dist
        sim_results, _ = run_multiple_simulations(params, n_simulations)
        ruin_prob = sum(1 for r in sim_results if r == 0)/len(sim_results)
        results[name] = ruin_prob
    
    # Visualización
    plt.figure(figsize=(10, 6))
    names = list(results.keys())
    values = list(results.values())
    plt.barh(names, values, color=sns.color_palette("husl", len(names)))
    
    for i, v in enumerate(values):
        plt.text(v + 0.01, i, f"{v:.3f}", color='black', va='center')
    
    plt.title('Probabilidad de Ruina por Distribución de Reclamos')
    plt.xlabel('Probabilidad de Ruina')
    plt.ylabel('Distribución de Reclamos')
    plt.xlim(0, max(values) * 1.2)
    plt.grid(axis='x')
    plt.show()
    
    return results

claim_results = claim_distribution_sensitivity(params)

### Análisis de tiempos de bancarrota 

In [None]:
def ruin_time_analysis(base_params, n_simulations=1000):
    
    results, ruin_times = run_multiple_simulations(base_params, n_simulations)
    ruin_prob = len(ruin_times)/n_simulations
    
    if not ruin_times:
        print(f"No se observaron ruinas en {n_simulations} simulaciones")
        return
    
    print(f"Probabilidad de ruina: {ruin_prob:.3f}")
    print(f"Tiempo medio de ruina: {np.mean(ruin_times):.1f} (mediana: {np.median(ruin_times):.1f})")
    
    plt.figure(figsize=(15, 5))
    
    # 1. Histograma
    plt.subplot(1, 3, 1)
    sns.histplot(ruin_times, kde=True, bins=20, color='skyblue')
    plt.title('Distribución de Tiempos de Ruina')
    plt.xlabel('Tiempo')
    plt.ylabel('Frecuencia')
    
    # 2. Función de supervivencia empírica
    plt.subplot(1, 3, 2)
    sorted_times = np.sort(ruin_times)
    survival = 1 - np.arange(1, len(sorted_times)+1)/len(sorted_times)
    plt.step(sorted_times, survival, where='post', color='orange')
    plt.title('Función de Supervivencia Empírica')
    plt.xlabel('Tiempo')
    plt.ylabel('Probabilidad de Supervivencia')
    plt.axhline(y=1-ruin_prob, color='r', linestyle='--')
    
    # 3. Hazard rate empírico
    plt.subplot(1, 3, 3)
    time_bins = np.linspace(0, base_params['T'], 20)
    bin_counts, _ = np.histogram(ruin_times, bins=time_bins)
    hazard_rate = bin_counts / (n_simulations * (time_bins[1]-time_bins[0]))
    plt.bar(time_bins[:-1], hazard_rate, width=time_bins[1]-time_bins[0], alpha=0.7, color='green')
    plt.title('Tasa de Riesgo Empírica')
    plt.xlabel('Tiempo')
    plt.ylabel('Tasa de Ruina Instantánea')
    
    plt.tight_layout()
    plt.show()

ruin_time_analysis(params)

## Exploración y comprobación de otros resultados

### Calcular constante de ingreso(c) necesaria para tener probabilidad de bancarrota 0%(dado la configuración del resto de parámetros)

In [None]:
def c_minimo(target_prob, params_base, c_min, c_max, tol=0.1, n_sim=500):
    while c_max - c_min > tol:
        c_medio = (c_min + c_max) / 2
        params = params_base.copy()
        params['c'] = c_medio
        prob = sum(InsuranceRiskSimulation(**params).run() for _ in range(n_sim)) / n_sim
        if prob >= target_prob:
            c_max = c_medio
        else:
            c_min = c_medio
    return c_max

c_min = c_minimo(0.95, params, 50, 200)
print(f"Ingreso mínimo requerido: {c_min:.2f}")

### Prueba de Hipótesis #1 La distribución de reclamos es influyente en la probabilidad de ruina

In [None]:
def claim_distribution_hypothesis_test(base_params, n_simulations=10000):
    """
    Prueba si diferentes distribuciones de reclamos producen diferentes probabilidades de ruina.
    """
    distributions = {
        'Exponencial': lambda: np.random.exponential(scale=10),
        'Pareto': lambda: np.random.pareto(2) * 5,
        'Normal': lambda: max(0, np.random.normal(10, 3))
    }
    
    results = {}
    for name, dist in distributions.items():
        params = base_params.copy()
        params['claim_dist'] = dist
        sim_results, _ = run_multiple_simulations(params, n_simulations)
        results[name] = sim_results
    
    # Prueba chi-cuadrado para proporciones
    contingency_table = []
    for name, res in results.items():
        ruins = sum(1 for r in res if r == 0)
        non_ruins = len(res) - ruins
        contingency_table.append([ruins, non_ruins])
    
    chi2, p, dof, expected = chi2_contingency(contingency_table)
    print(f"\nChi-square Test: χ² = {chi2:.3f}, p-value = {p:.4f}")
    
    if p < 0.05:
        print("Existen diferencias significativas entre las distribuciones")
        
        # Comparaciones por pares con corrección Bonferroni
        print("\nComparaciones por pares (Chi-square ajustado):")
        dist_names = list(distributions.keys())
        for i in range(len(dist_names)):
            for j in range(i+1, len(dist_names)):
                obs = np.array([contingency_table[i], contingency_table[j]])
                chi2, p, _, _ = chi2_contingency(obs)
                print(f"{dist_names[i]} vs {dist_names[j]}: p = {p:.4f}")
    else:
        print("No hay diferencias significativas entre las distribuciones")
    
    return results

claim_dist_results = claim_distribution_hypothesis_test(params)

### Prueba de Hipótesis #2 La probabilidad del tiempo de bancarrota sigue distribución exponencial o lognormal

In [None]:
from scipy.stats import chisquare, kstest


def goodness_of_fit_ruin_times_final(base_params, n_simulations=1000, theoretical_dist='expon'):
    # Obtener tiempos de ruina empíricos
    _, ruin_times = run_multiple_simulations(base_params, n_simulations)
    ruin_times = np.array(ruin_times)
    
    if len(ruin_times) < 30:
        print(f"Error: Solo {len(ruin_times)} eventos de ruina. Mínimo requerido: 30")
        return None
    
    # Ajustar parámetros de la distribución teórica
    if theoretical_dist == 'expon':
        loc, scale = 0, np.mean(ruin_times)
        cdf = lambda x: stats.expon.cdf(x, loc, scale)
    elif theoretical_dist == 'weibull':
        params = stats.weibull_min.fit(ruin_times, floc=0)
        cdf = lambda x: stats.weibull_min.cdf(x, *params)
    elif theoretical_dist == 'lognorm':
        params = stats.lognorm.fit(ruin_times, floc=0)
        cdf = lambda x: stats.lognorm.cdf(x, *params)
    
    # Determinar bins usando cuantiles empíricos
    n_bins = max(3, min(10, len(ruin_times)//20))  # Entre 3 y 10 bins
    bins = np.unique(np.quantile(ruin_times, np.linspace(0, 1, n_bins + 1)))
    
    # Calcular frecuencias observadas y esperadas
    observed, _ = np.histogram(ruin_times, bins=bins)
    expected = len(ruin_times) * np.diff(cdf(bins))
    
    # Combinar bins con frecuencias esperadas <5 de forma segura
    i = 0
    while i < len(expected):
        if expected[i] < 5 and len(expected) > 1:
            # Combinar con el siguiente bin
            new_obs = observed[i] + observed[i+1]
            new_exp = expected[i] + expected[i+1]
            
            observed = np.concatenate([observed[:i], [new_obs], observed[i+2:]])
            expected = np.concatenate([expected[:i], [new_exp], expected[i+2:]])
            bins = np.concatenate([bins[:i+1], bins[i+2:]])
        else:
            i += 1
    
    # Asegurar suma exacta de frecuencias esperadas
    total_obs = np.sum(observed)
    expected = expected * total_obs / np.sum(expected)  # Normalizar
    
    # Verificación final de condiciones
    if len(expected) < 3 or np.any(expected < 5):
        print("No se cumplen los supuestos del test chi-cuadrado")
        print("Usando solo prueba KS")
        chi2_stat, p_value = np.nan, np.nan
    else:
        try:
            chi2_stat, p_value = chisquare(observed, expected)
        except ValueError as e:
            print(f"Error en chi-cuadrado: {str(e)}")
            print("Realizando ajuste de precisión...")
            # Ajuste final de precisión
            observed = observed.astype(np.float64)
            expected = expected.astype(np.float64)
            observed_sum = observed.sum()
            expected = expected * observed_sum / expected.sum()
            chi2_stat, p_value = chisquare(observed, expected)
    
    # Resultados
    print(f"\nChi-square test ({theoretical_dist}):")
    print(f"χ² = {chi2_stat:.3f}, p = {p_value:.4f}")
    print(f"Bins: {len(bins)-1}, Observados: {observed}, Esperados: {expected.round(2)}")
    
    # Prueba KS
    ks_stat, ks_p = kstest(ruin_times, cdf)
    print(f"\nKS test: D = {ks_stat:.3f}, p = {ks_p:.4f}")
    
    # Visualización
    plt.figure(figsize=(10, 5))
    plt.bar(np.arange(len(observed)), observed, width=0.4, label='Observado')
    plt.bar(np.arange(len(expected)) + 0.4, expected, width=0.4, label='Esperado')
    plt.xticks(np.arange(len(expected)), [f"Bin {i+1}" for i in range(len(expected))])
    plt.title('Distribución de Frecuencias')
    plt.legend()
    plt.show()
    
    return {
        'chi2': (chi2_stat, p_value),
        'ks': (ks_stat, ks_p),
        'bins': bins,
        'observed': observed,
        'expected': expected
    }


gof_results = goodness_of_fit_ruin_times_final(params, theoretical_dist='expon')
gof_results = goodness_of_fit_ruin_times_final(params, theoretical_dist='lognorm')