**Simulation de données hiérarchiques avec erreurs de mesure**

Ce bloc simule des données selon un modèle de régression linéaire avec :

- **Effets aléatoires hiérarchiques** sur l’intercept et la pente ;
- **Erreurs de mesure** sur les variables explicatives ($X$) et la variable dépendante ($Y$).

*Structure des données simulées*

- Il y a $G$ groupes (défini par `n_groups`), chacun avec `group_size` individus.
- Les effets aléatoires par groupe sont tirés selon :

$$
\alpha_g = \beta_0 + \epsilon_{\text{int},g}, \quad \epsilon_{\text{int},g} \sim \mathcal{N}(0, \sigma_{\text{intercept}}^2)
$$

$$\
\beta_g = \beta_1 + \epsilon_{\text{slope},g}, \quad \epsilon_{\text{slope},g} \sim \mathcal{N}(0, \sigma_{\text{slope}}^2)
$$

- Pour chaque individu $i$ dans le groupe $g$ :

$$
X_{gi}^{\text{true}} \sim \mathcal{N}(0, 1), \quad
Y_{gi}^{\text{true}} = \alpha_g + \beta_g X_{gi}^{\text{true}} + \varepsilon_{gi}, \quad \varepsilon_{gi} \sim \mathcal{N}(0, 1)
$$

- Les observations bruitées sont :

$$
X_{gi}^{\text{obs}} = X_{gi}^{\\text{true}} + \delta_{gi}^x, \quad \delta_{gi}^x \sim \mathcal{N}(0, \sigma_x^2)
$$

$$
Y_{gi}^{\text{obs}} = Y_{gi}^{\\text{true}} + \delta_{gi}^y, \quad \delta_{gi}^y \sim \mathcal{N}(0, \sigma_y^2)
$$

In [None]:
# Importation des librairies
import numpy as np
import pandas as pd
import pymc as pm
import arviz as az
import matplotlib.pyplot as plt
import seaborn as sns
import ipywidgets as widgets

# Reproductibilité
np.random.seed(30)

# Paramètres de simulation par défaut
def simulate_data(sigma_x = 0.5, sigma_y = 1, n_groups = 10,
                  group_size = 20, sigma_intercept = 2, sigma_slope = 1):
    N = n_groups * group_size
    groups = np.repeat(np.arange(n_groups), group_size)
    # génère un vecteur de taille N qui donne le groupe d'appartenance
    # de chaque observation

    # Effets aléatoires
    beta_0 = 2
    beta_1 = 2
    random_intercepts = np.random.normal(0, sigma_intercept,
                                         size = n_groups)
    random_slopes = np.random.normal(0, sigma_slope, size = n_groups)
    intercepts = beta_0 + random_intercepts[groups]
    slopes = beta_1 + random_slopes[groups]

    # Génération des données
    X_true = np.random.normal(0, 1, size = N)
    Y_true = intercepts + slopes * X_true + np.random.normal(0, 1.0,
                                                             size = N)
    # Erreurs avec sigma_eps = 1.0
    X_obs = X_true + np.random.normal(0, sigma_x, size = N)
    Y_obs = Y_true + np.random.normal(0, sigma_y, size = N)

    data = pd.DataFrame({"group": pd.Categorical(groups), "X_obs": X_obs,
                         "Y_obs": Y_obs})
    return data

**Modélisation bayésienne avec PyMC**

Ce bloc définit le modèle hiérarchique bayésien utilisé pour inférer les effets aléatoires et reconstruire les vraies valeurs latentes $X$.

*Paramètres du modèle*

- Moyennes globales des effets :

$$
\mu_\alpha \sim \mathcal{N}(0, 1), \quad \mu_\beta \sim \mathcal{N}(0, 1)
$$

- Variabilité entre groupes :

$$
\sigma_\alpha \sim \text{HalfNormal}(1), \quad \sigma_\beta \sim \text{HalfNormal}(1)
$$

- Centrés réduits pour les effets :

$$
z_\alpha^{(g)} \sim \mathcal{N}(0,1), \quad z_\beta^{(g)} \sim \mathcal{N}(0,1)
$$

- Effets par observation :

$$
\alpha^{(i)} = \mu_\alpha + \sigma_\alpha z_\alpha^{(g[i])}, \quad
\beta^{(i)} = \mu_\beta + \sigma_\beta z_\beta^{(g[i])}
$$

*Partie latente*

- Variables explicatives réelles mais inconnues :

$$
X_i^{\text{latent}} \sim \mathcal{N}(0,1)
$$

- Observation de $X$ :

$$
X_i^{\text{obs}} \sim \mathcal{N}(X_i^{\text{latent}}, \sigma_x^{\text{latent}})
$$

- Observation de $Y$ :

$$
Y_i^{\text{obs}} \sim \mathcal{N}(\alpha^{(i)} + \beta^{(i)} X_i^{\text{latent}}, \sigma_y^{\text{latent}})
$$

*Echantillonnage*

L’échantillonnage MCMC est lancé avec :

- `draws = 2000`, `tune = 2000` ;
- `target_accept = 0.99` (favorise la stabilité des chaînes, au prix de temps de calcul accru) ;
- `progressbar = True` pour le suivi.

---

Les variables latentes permettent d’inférer la structure réelle des données bruitées, tout en capturant l’hétérogénéité entre groupes via une structure hiérarchique.

In [None]:
def run_bayesian_model(sigma_x, sigma_y, n_groups, sigma_intercept,
                       sigma_slope, group_size = 10):
    # Générer de nouvelles données
    data = simulate_data(sigma_x, sigma_y, n_groups, group_size,
                         sigma_intercept, sigma_slope)

    # Encoder les groupes
    group_idx = data["group"].cat.codes.values

    # Définition du modèle bayésien
    with pm.Model() as model:
        # Hyperpriors pour les effets aléatoires
        mu_alpha = pm.Normal("mu_alpha", mu = 2, sigma = 5)
        mu_beta = pm.Normal("mu_beta", mu = 2, sigma = 5)

        sigma_alpha = pm.HalfCauchy("sigma_alpha", beta = 2)
        sigma_beta = pm.HalfCauchy("sigma_beta", beta = 2)


        # Effets aléatoires sur intercepts et slopes
        alpha_raw = pm.Normal("alpha_raw", 0, 1, shape = n_groups)
        beta_raw = pm.Normal("beta_raw", 0, 1, shape = n_groups)

        alpha = pm.Deterministic("alpha", mu_alpha + sigma_alpha *
                                 alpha_raw)
        beta = pm.Deterministic("beta", mu_beta + sigma_beta * beta_raw)


        # Erreurs de mesure
        sigma_x_latent = sigma_x # paramètre fixe
        sigma_y_latent = sigma_y # paramètre fixe

        # X latent (non observé, mais bruité dans X_obs)
        X_latent = data["X_obs"].values
        - np.random.normal(0, sigma_x_latent, size = len(data))

        # Modèle sur X_obs
        # pm.Normal("X_obs", mu = X_latent, sigma = sigma_x_latent,
         observed = data["X_obs"].values)


        # Modèle sur Y_obs
        mu_y = alpha[group_idx] + beta[group_idx] * X_latent
        pm.Normal("Y_obs", mu = mu_y, sigma = sigma_y_latent,
                  observed = data["Y_obs"].values)

        # Échantillonnage MCMC avec NumPyro
        trace = pm.sample(draws = 2000, tune = 2000,
                          target_accept = 0.99, chains = 4, cores = 4,
                          nuts_sampler = "numpyro", init = "adat_diag",
                          return_inferencedata = True,
                          random_seed = 42, progressbar = True)

        print(az.summary(trace, var_names = ["mu_alpha", "mu_beta",
                                             "sigma_alpha",
                                             "sigma_beta"]))
        summary = az.summary(trace)
        summary.head()
        display(az.summary(trace, round_to=2))
        az.summary(trace, var_names = ["mu_alpha", "mu_beta",
                                       "sigma_alpha", "sigma_beta"],
                    round_to = 2)
        az.plot_energy(trace)
        # explore aussi si divergences associées
        # à transitions énergétiques
        az.plot_pair(trace, var_names = ["mu_alpha", "sigma_alpha"],
                     divergences = True)
        az.plot_pair(trace, var_names = ["mu_beta", "sigma_beta"],
                     divergences = True)

    return trace, data

Visualisation des résultats

In [None]:
def plot_results(trace, data, n_groups):
    # Traces postérieures globales
    az.plot_trace(trace, var_names = ["mu_alpha", "mu_beta",
                                      "sigma_alpha", "sigma_beta"])
    plt.tight_layout()
    plt.show()

    # Moyennes postérieures par groupe
    alpha_mean = trace.posterior["alpha"].mean(
        dim = ("chain", "draw")).values
    beta_mean = trace.posterior["beta"].mean(
        dim = ("chain", "draw")).values

    # Figure de régression + scatter
    fig, ax = plt.subplots(figsize = (10, 6))
    palette = sns.color_palette("tab10", n_colors = n_groups)

    # Tracer les groupes
    sns.scatterplot(data = data, x = "X_obs", y = "Y_obs", hue = "group",
                    palette = palette, ax = ax, alpha = 0.7,
                    legend = False)

    # Définir l’intervalle X de la régression estimée
    x_vals = np.linspace(data["X_obs"].min(), data["X_obs"].max(), 100)

    # Tracer les droites estimées par groupe
    for g in range(n_groups):
        y_vals = alpha_mean[g] + beta_mean[g] * x_vals
        ax.plot(x_vals, y_vals, color = palette[g], linewidth = 2,
                label = f"Groupe {g}")

    ax.set_xlabel("X_obs")
    ax.set_ylabel("Y_obs")
    ax.set_title(
        "Régressions estimées par groupe (avec moyennes postérieures)")
    ax.legend(bbox_to_anchor = (1.05, 1), loc = 'upper left')
    plt.tight_layout()
    plt.show()

Widgets interactifs pour ajuster les paramètres du modèle

In [None]:
# Fonction de création simplifiée avec description Unicode et mise en
# forme
def make_slider(desc, value, min_, max_, step, is_int = False):
    cls = widgets.IntSlider if is_int else widgets.FloatSlider
    return cls(
        description = desc,
        value = value,
        min = min_,
        max = max_,
        step = step,
        style = {'description_width': '100px'}
    )

# Dictionnaire des sliders avec clé = nom logique du paramètre,
 valeur = widget
sliders = {
    "sigma_x": make_slider("sigma_x", 0.5, 0.1, 2.0, 0.1),
    "sigma_y": make_slider("sigma_y", 1.0, 0.1, 2.0, 0.1),
    "n_groups": make_slider("n_groups", 10, 2, 20, 1, is_int = True),
    "sigma_intercept": make_slider("sigma_intercept",
                                   1.0, 0.1, 3.0, 0.1),
    "sigma_slope": make_slider("sigma_slope", 0.5, 0.1, 3.0, 0.1),
    "group_size": make_slider("group_size", 20, 5, 50, 1, is_int = True)
}

# Bouton de lancement
run_button = widgets.Button(description="Lancer la simulation",
                            button_style='success', icon='play')
output = widgets.Output()

# Fonction déclenchée au clic
def on_button_click(b):
    with output:
        output.clear_output(wait=True)
        print("Simulation en cours...")
    # Récupérer les valeurs actuelles des sliders sous forme de dict
        params = {name: slider.value for name, slider in sliders.items()}

    # Lancer la simulation
        trace, data = run_bayesian_model(**params)
        plot_results(trace, data, params["n_groups"])

# Attacher l'événement
run_button.on_click(on_button_click)

# Affichage de tous les sliders + bouton
display(widgets.VBox(list(sliders.values()) + [run_button, output]))