# Implementation of the Gibbs sampler from the section 3

In [12]:
from particles import state_space_models as ssm
from particles import distributions as dists
import matplotlib.pyplot as plt
import numpy as np
import numpy as np
from scipy import linalg
import particles
from particles import distributions
from scipy import stats
from particles import mcmc

On creer notre state space modele lorsque conditionnellement à z et zeta. Deplus ce state space modèle est défini unique pour un actif car conditionnelement à z et zeta nos actif sont independant.

In [13]:
class IdentifiedLoadingSSM(ssm.StateSpaceModel):
    """
    Modèle espace d'état pour les chargements d'un actif 'i' avec contraintes d'identification.
    
    Règles d'identification :
    - Pour les p premières lignes (i < p):
        * La matrice est triangulaire inférieure (lambda_{ik} = 0 pour k > i).
        * Les éléments diagonaux (lambda_{ii}) sont stricts positifs -> modélisés en log.
    - Pour les lignes suivantes (i >= p):
        * Pas de contrainte de structure ou de signe.
        
    Attributs:
        row_idx (int): Indice de l'actif courant (i).
        p_factors (int): Nombre total de facteurs.
        dim_state (int): Dimension effective du vecteur d'état pour cet actif.
    """
    def __init__(self, row_idx, p_factors, mu, phi, sigma_eta, z, zeta, data_x):
        self.i = row_idx
        self.p = p_factors
        
        # Dimension effective: Pour i < p, on ne simule que les i+1 premiers éléments.
        self.dim_state = min(row_idx + 1, p_factors)
        
        # On ne conserve que les paramètres pertinents pour la dimension active
        self.mu = mu[:self.dim_state]         # (dim_state,)
        self.phi = phi[:self.dim_state]       # (dim_state,)
        self.sigma_eta = sigma_eta[:self.dim_state] # (dim_state,)
        
        self.z = z       # Facteurs communs (T, p)
        self.zeta = zeta # Variable de mélange (T,)
        self.data_x = data_x # Données observées (T,)

    def PX0(self):
        """Distribution initiale des états h_0 """
        # Variance stationnaire: sigma^2 / (1 - phi^2)
        var_0 = self.sigma_eta**2 / (1 - self.phi**2)
        
        # Distribution indépendante sur chaque composante du vecteur d'état
        return distributions.IndepProd(
            distributions.Normal(loc=self.mu, scale=np.sqrt(var_0))
        )

    def PX(self, t, xp):
        """Transition des états h_t | h_{t-1} """
        # Dynamique AR(1) Gaussienne sur h_t
        # Note: Si k=i (diagonale), h_t correspond au LOG-loading.
        mean = self.mu + self.phi * (xp - self.mu)
        return distributions.IndepProd(
            distributions.Normal(loc=mean, scale=self.sigma_eta)
        )

    def PY(self, t, xp, x):
        """
        Vraisemblance p(x_it | h_it, z_t, zeta_t) 
        Transforme l'état latent h_t en chargement lambda_t puis calcule la densité.
        """
        # x est de forme (N_particules, dim_state)
        N = x.shape[0]
        
        # 1. Reconstitution du vecteur de chargements complet (taille p)
        real_loadings = np.zeros((N, self.p)) 
        
        # Copie des états latents
        real_loadings[:, :self.dim_state] = x
        
        # 2. Application de la contrainte de positivité (Log -> Exp)
        # Uniquement pour les éléments diagonaux des p premiers actifs
        if self.i < self.p:
            # L'élément diagonal est le dernier de l'état actif (indice self.i)
            # h_{ii} = log(lambda_{ii})  =>  lambda_{ii} = exp(h_{ii})
            real_loadings[:, self.i] = np.exp(x[:, self.i])
            
            # Les éléments k > i restent à 0.0 (Triangulaire inf)
            
        # 3. Calcul de la composante factorielle et du scaling
        z_t = self.z[t] # (p,)
        
        # Produit scalaire lambda' * z
        factor_comp = np.dot(real_loadings, z_t) # (N,)
        
        # Norme au carré des lambda pour le scaling sigma_{it} 
        lam_sq = np.sum(real_loadings**2, axis=1) # (N,)
        
        # Scaling factor: sqrt(1 + lambda'lambda)
        scaling = np.sqrt(1.0 + lam_sq)
        
        # Écart-type résiduel: sigma = 1 / scaling
        sigma_eps = 1.0 / scaling
        
        # 4. Paramètres de la loi Normale de l'observation x_it
        # x_it = sqrt(zeta) * ( (lambda/scaling)'z + (1/scaling)*eps )
        zeta_sqrt = np.sqrt(self.zeta[t])
        
        mean_obs = zeta_sqrt * (factor_comp / scaling)
        scale_obs = zeta_sqrt * sigma_eps
        
        return distributions.Normal(loc=mean_obs, scale=scale_obs).logpdf(self.data_x[t])

In [14]:
class UnivariateLoadingSSM(ssm.StateSpaceModel):
    """
    Modèle espace d'état pour les chargements d'un seul actif i.
    Transition: Eq (2) [cite: 89]
    Observation: Eq (5) avec scaling Eq (6)-(7) [cite: 146, 156]
    """
    def __init__(self, mu, phi, sigma_eta, z, zeta, data_x):
        """
        :param mu: Moyenne long terme (scalaire ou vecteur taille p)
        :param phi: Autocorrélation (scalaire ou diag)
        :param sigma_eta: Variance du bruit de transition (scalaire ou diag)
        :param z: Facteurs communs (T, p) [cite: 150]
        :param zeta: Variable de mélange (T,) [cite: 220]
        :param data_x: Données observées transformées x_it (T,) [cite: 5]
        """
        self.mu = mu
        self.phi = phi
        self.sigma_eta = sigma_eta
        self.z = z
        self.zeta = zeta
        self.data_x = data_x
        
    def PX0(self):
        """Distribution initiale de lambda_{i,1} """
        # Stationnaire: N(mu, sigma^2 / (1-phi^2))
        var_0 = self.sigma_eta / (1 - self.phi**2)
        return distributions.Normal(loc=self.mu, scale=np.sqrt(var_0))

    def PX(self, t, xp):
        """Transition lambda_{i, t} | lambda_{i, t-1} (Eq 2) """
        # AR(1): mu + phi * (prev - mu) + noise
        mean = self.mu + self.phi * (xp - self.mu)
        return distributions.Normal(loc=mean, scale=np.sqrt(self.sigma_eta))

    def PY(self, t, xp, x):
        """
        Densité d'observation p(x_it | lambda_{it}, z_t, zeta_t)
        Utilise les équations de scaling (6) et (7).
        """
        # 1. Récupérer les covariables au temps t
        z_t = self.z[t]       # Facteurs (p,)
        zeta_t = self.zeta[t] # Variable de mélange (scalaire)
        obs = self.data_x[t]  # Donnée x_it
        
        # 2. Calcul du scaling 
        # Dans le cas p=1 (un seul facteur par loading pour simplifier l'exemple)
        # Si p > 1, x est un vecteur et il faut sommer x^2
        # Scaling factor: D = 1 + lambda'lambda
        # Note: x ici est la particule courante représentant lambda_{it}
        
        # Gestion vectorielle pour les particules (N_particles,)
        lam_sq = x**2 
        scaling_factor = np.sqrt(1.0 + lam_sq) 
        
        # 3. Chargements et variance transformés 
        # tilde_lambda = lambda / sqrt(1 + lambda^2)
        tilde_lambda = x / scaling_factor
        
        # sigma_eps = 1 / sqrt(1 + lambda^2)
        sigma_eps = 1.0 / scaling_factor
        
        # 4. Moyenne et Variance conditionnelles de x_it
        # x_it = sqrt(zeta) * (tilde_lambda * z_t + sigma_eps * epsilon)
        # Donc x_it ~ N( Mean, Var )
        
        # Moyenne = sqrt(zeta) * tilde_lambda * z_t
        mean_obs = np.sqrt(zeta_t) * tilde_lambda * z_t
        
        # Ecart-type = sqrt(zeta) * sigma_eps
        scale_obs = np.sqrt(zeta_t) * sigma_eps
        
        return distributions.Normal(loc=mean_obs, scale=scale_obs).logpdf(obs)

On ne pourra pas directement utilisé le mcmc.ParticleGibbs de particules car l'un des principales aventage de l'augmented Gibbs est que conditionnelement au facteur on peut faire chaque actif est indépendant

In [15]:
class FactorCopulaGibbs(mcmc.GenericGibbs):
    """
    Échantillonneur de Gibbs pour le modèle de Copule Factorielle Dynamique.
    Basé sur l'Appendix A de Creal & Tsay (2015).
    """
    
    def __init__(self, data, n_factors, prior, **kwargs):
        super().__init__(**kwargs)
        self.data = data # u_it
        self.n = data.shape[1]
        self.T = data.shape[0]
        self.p = n_factors
        
        # Initialisation des paramètres Theta 
        self.theta = prior.sample_initial() 
        # Contient: mu, phi, sigma, nu (degrees of freedom), beta
        
        # Initialisation des variables latentes 
        self.z = np.random.normal(size=(self.T, self.p)) # Facteurs communs
        self.zeta = np.ones((self.T, self.n))            # Variables de mélange
        self.lambdas = np.zeros((self.T, self.n, self.p)) # Chargements (State)
        
        # Pour le Particle Gibbs
        self.ssm_cls = StochasticLoadingsSSM
        # Stockage de la trajectoire des particules (pour conditional SMC)
        self.lambda_path = None 

    def step(self):
        """Une itération complète du Gibbs Sampler (Appendix A)"""
        
        # Step 1: Missing Data 
        # On travaille avec des données complètes pour cet exemple
        
        # Step 2: Variables de mélange (Zeta)
        self.update_zeta()
        
        # Step 3: Degrees of Freedom (nu)
        self.update_nu()
        
        # Step 4: Facteurs Communs (z_t)
        self.update_z()
        
        # Step 5: State Variables (Lambda)
        self.update_lambda_pg()
        
        # Step 6: Bruit VAR (Sigma) 
        self.update_sigma()
        
        # Step 7: Paramètres VAR (mu, Phi)
        self.update_mu_phi()
        
    def update_zeta(self):
        """
        Step 2: Independence Metropolis-Hastings pour zeta.
        
        """
        pass 

    def update_nu(self):
        """
        Step 3: Random-walk Metropolis pour nu.
        
        """
        pass

    def update_z(self):
        """
        Step 4: Gibbs standard pour z_t.
        Mise à jour des facteurs latents communs.
        
        Sources:
        - Distribution conditionnelle z_t ~ N(Mean, Var)
        - Calcul de sigma_it et lambda_tilde_it
        - Structure de l'inverse de la matrice de corrélation
        """
        T, n = self.data.shape
        p = self.p  # Nombre de facteurs
        
        # 1. Préparation des conteneurs
        new_z = np.zeros((T, p))
        I_p = np.eye(p)
        
        # On boucle sur le temps car les paramètres Lambda changent à chaque t
        for t in range(T):
            # --- A. Récupération des variables au temps t ---
            # Données x_it (imputées/transformées)
            x_t = self.data[t]  # (n,)
            
            # Chargements latents lambda_it 
            # self.lambdas est de forme (T, n, p)
            lambda_t = self.lambdas[t] # (n, p)
            
            # Variables de mélange zeta_t (pour Student-t/Grouped-t)
            # Si le modèle est Gaussien, zeta_t = 1 partout.
            zeta_t = self.zeta[t] # (n,)

            # --- B. Calcul des paramètres redimensionnés (Scaling) ---
            # Selon , les paramètres utilisés dans la copule 
            # (tilde_lambda et sigma) dépendent de l'état latent lambda.
            
            # Calcul de la norme au carré de lambda pour chaque série i
            # lambda_sq = sum(lambda_{it}^2)
            lam_sq_norm = np.sum(lambda_t**2, axis=1) # (n,)
            
            # Variance idiosyncratique sigma_{it}^2 
            # sigma^2 = 1 / (1 + lambda'lambda)
            sigma_sq_t = 1.0 / (1.0 + lam_sq_norm) # (n,)
            sigma_t = np.sqrt(sigma_sq_t)          # (n,)
            
            # Chargements redimensionnés tilde_lambda 
            # tilde_lambda = lambda / sqrt(1 + lambda'lambda) = lambda * sigma
            lambda_tilde_t = lambda_t * sigma_t[:, np.newaxis] # (n, p)
            
            # --- C. Construction de la "Donnée Transformée" ---
            # Selon Step 4 : x_dot = x / sqrt(zeta)
            # Cela normalise la variance induite par la variable de mélange
            x_dot_t = x_t / np.sqrt(zeta_t) # (n,)

            # --- D. Calcul de la Moyenne et Variance Postérieure ---
            # Le prior sur z_t est N(0, I_p).
            # La vraisemblance est x_dot ~ N(tilde_lambda * z, D)
            # où D est diagonale avec éléments sigma_sq_t.
            
            # Precision Matrix = I_p + C' D^-1 C
            # Ici C = lambda_tilde. D^-1 = diag(1/sigma^2).
            
            # Astuce numérique : tilde_lambda' D^-1 tilde_lambda
            # revient à : lambda' lambda (car tilde_lambda = lambda * sigma)
            # Preuve: (lambda*sigma)' * (1/sigma^2) * (lambda*sigma) = lambda' * lambda
            
            # Calcul de la matrice de précision du likelihood
            # precision_data = lambda_t.T @ lambda_t # Ce serait l'astuce
            # Mais restons fidèles à la notation de l'article pour la clarté :
            
            # D_inv est un vecteur (diagonale inverse)
            D_inv_diag = 1.0 / sigma_sq_t # (n,)
            
            # Terme C' D^-1
            # On multiplie chaque colonne de lambda_tilde par D_inv
            Ct_Dinv = lambda_tilde_t.T * D_inv_diag[np.newaxis, :] # (p, n)
            
            # Precision Postérieure = I + C' D^-1 C
            # terme entre crochets
            precision_post = I_p + Ct_Dinv @ lambda_tilde_t # (p, p)
            
            # Covariance Postérieure (Sigma_z)
            # On utilise Cholesky pour la stabilité numérique et pour le tirage ensuite
            # L = cholesky(Precision) -> Precision = L L.T
            try:
                L_prec = linalg.cholesky(precision_post, lower=True)
                # Pour inverser, on résout le système linéaire. 
                # Cov = Prec^-1.
                # Mais on a juste besoin de résoudre Mean = Cov * Terme_Lineaire
                # => Prec * Mean = Terme_Lineaire
            except linalg.LinAlgError:
                # Fallback en cas de problèmes numériques rares
                L_prec = linalg.cholesky(precision_post + 1e-6 * np.eye(p), lower=True)

            # Moyenne Postérieure (Mu_z)
            # Mean = Cov_post * (C' D^-1 x_dot)
            # terme de droite dans la moyenne
            linear_term = Ct_Dinv @ x_dot_t # (p,)
            
            # Résolution de : Precision * Mean = linear_term
            # On utilise cholesky_solve pour la rapidité
            mu_post = linalg.cho_solve((L_prec, True), linear_term)
            
            # --- E. Tirage aléatoire (Draw) ---
            # z_t = mu_post + chol(Cov_post) * epsilon
            # Note: chol(Cov) = inv(L_prec.T)
            
            epsilon = np.random.normal(size=p)
            
            # On résout L_prec.T * z_noise = epsilon pour obtenir z_noise ~ N(0, Cov)
            # C'est plus stable que d'inverser explicitement
            z_noise = linalg.solve_triangular(L_prec.T, epsilon, lower=False)
            
            new_z[t] = mu_post + z_noise

        # Mise à jour de l'attribut de la classe
        self.z = new_z

    def update_lambda_pg_old(self):
        """
        Step 5: Draw state variables Lambda using Particle Gibbs.
        [cite: 21, 302]
        
        Assumption: Loadings are independent across i conditional on z_t[cite: 23].
        """
        T, n = self.data.shape
        p = self.p
        
        # Conteneur pour les nouvelles trajectoires
        new_lambdas = np.zeros((T, n, p))
        
        # Paramètres courants (supposés diagonaux pour permettre le traitement par actif)
        mu_vec = self.theta['mu']      # (n, p) ou (n,)
        phi_vec = self.theta['phi']    # (n, p) ou (n,)
        sigma_vec = self.theta['sigma'] # (n, p) ou (n,)
        
        # Boucle séquentielle sur chaque actif (série temporelle)
        # [cite: 307] "we draw each path lambda_{i,1:T} separately"
        for i in range(n):
            
            # 1. Extraction des paramètres spécifiques à l'actif i
            # On suppose ici p=1 pour simplifier l'indexation, ou que les p facteurs sont indépendants
            mu_i = mu_vec[i]
            phi_i = phi_vec[i]
            sigma_i = sigma_vec[i]
            
            # Données spécifiques à i
            data_i = self.data_x[:, i] # Données transformées x_it (T,)
            
            # 2. Instanciation du SSM pour l'actif i
            # On passe les facteurs communs z (T, p) qui sont conditionnés 
            ssm_i = UnivariateLoadingSSM(
                mu=mu_i,
                phi=phi_i,
                sigma_eta=sigma_i,
                z=self.z,          # Facteurs communs fixes pour cette étape
                zeta=self.zeta[:, i], # Variables de mélange pour i
                data_x=data_i      # Observations
            )
            
            # 3. Exécution du Filtre Particulaire (SMC)
            # [cite: 305] "Discrete approximation through a set of particles"
            # N=100 selon le papier [cite: 345]
            fk_model = ssm.Bootstrap(ssm=ssm_i, data=data_i)
            pf = particles.SMC(fk=fk_model, N=100, store_history=True)
            pf.run()
            
            # 4. Backward Sampling (FFBSm)
            # [cite: 334] "Draw a path ... using backwards sampling algorithm"
            # [cite: 343] "The draw is a draw from the full-conditional distribution"
            # La méthode backward_sampling de particles retourne une liste de (T,)
            traj = pf.hist.backward_sampling(1)
            
            # 5. Stockage
            # traj est une liste de ndarrays, on convertit en array (T,)
            new_lambdas[:, i, 0] = np.array(traj).flatten()
            
        # Mise à jour de l'état global
        self.lambdas = new_lambdas

    def update_lambda_pg(self):
        """
        Step 5: Mise à jour des chargements factoriels via Particle Gibbs.
        Gère l'identification (Triangulaire + Diagonale positive).
        
        Sources:
        - Particle Gibbs sampler
        - Parallelizable across i (simulated sequentially here)
        - Backward sampling step
        """
        T, n = self.data.shape
        p = self.p
        
        # Conteneurs temporaires pour cette itération
        # new_lambdas: valeurs réelles (avec exp() appliquées) pour le modèle
        new_lambdas = np.zeros((T, n, p))
        
        # new_states: valeurs latentes (avec log()) pour la régression AR(1)
        # Note: On stocke des vecteurs de taille p, remplis de 0 pour les dimensions inactives
        new_states = np.zeros((T, n, p))
        
        # Paramètres courants (diffusés/broadcasted si scalaires)
        mu_vec = self.theta['mu']       # (n, p)
        phi_vec = self.theta['phi']     # (n, p)
        sigma_vec = self.theta['sigma'] # (n, p)
        
        # Boucle sur chaque actif i
        for i in range(n):
            # Données transformées pour l'actif i
            data_i = self.transform_u_to_x()[:, i] # (T,) ou pré-calculé
            
            # --- A. Instanciation du Modèle ---
            ssm_i = IdentifiedLoadingSSM(
                row_idx=i,
                p_factors=p,
                mu=mu_vec[i],
                phi=phi_vec[i],
                sigma_eta=sigma_vec[i],
                z=self.z,             # Facteurs communs conditionnels
                zeta=self.zeta[:, i], # Variable de mélange
                data_x=data_i
            )
            
            # --- B. Particle Gibbs (SMC + Backward) ---
            # Nombre de particules M=100
            fk_model = ssm.Bootstrap(ssm=ssm_i, data=data_i)
            pf = particles.SMC(fk=fk_model, N=100, store_history=True)
            pf.run()
            
            # Backward Sampling: tire une trajectoire d'états h_{1:T}
            # Retourne une liste de arrays (dim_state,)
            traj_list = pf.hist.backward_sampling(1)
            
            # Conversion en array numpy (T, dim_state)
            traj_arr = np.array(traj_list)[:, :, 0] # shape (T, dim_state)
            
            # --- C. Stockage et Post-Traitement ---
            dim = ssm_i.dim_state
            
            # 1. Sauvegarde des états latents (pour Update Mu/Phi/Sigma)
            # On remplit les 'dim' premières colonnes
            new_states[:, i, :dim] = traj_arr
            
            # 2. Sauvegarde des vrais chargements (pour Update Z / Likelihood)
            # Copie de base
            new_lambdas[:, i, :dim] = traj_arr
            
            # Application de la transformation Log -> Exp pour la diagonale
            if i < p:
                # L'élément à l'indice [:, i] est le log-loading
                new_lambdas[:, i, i] = np.exp(traj_arr[:, i])
                
                # Force explicitement les éléments triangulaires sup à 0
                new_lambdas[:, i, dim:] = 0.0
                new_states[:, i, dim:] = 0.0 # Optionnel (déjà 0)

        # Mise à jour des attributs de la classe
        self.lambdas = new_lambdas       # Utilisé dans update_z
        self.latent_states = new_states  # Utilisé dans update_mu_phi, update_sigma

    def update_sigma(self):
        """
        Step 6: Tirage de la variance d'état Sigma (Inverse Gamma).
        
        Met à jour la variance des chocs de transition Lambda conditionnellement
        à la trajectoire des états Lambda_{1:T}.
        
        Sources:
        [cite_start]- [cite: 25] Draw Sigma conditional on state variables Lambda and (mu, Phi).
        [cite_start]- [cite: 26] With Inverse Gamma prior, posterior is Inverse Gamma (Standard).
        [cite_start]- [cite: 363] Prior utilisé dans l'article: InvGamma(20, 0.25).
        [cite_start]- [cite: 89] Equation de transition: Lambda_{t+1} = mu + Phi(Lambda_t - mu) + eta
        """
        
        # 1. Calcul des résidus du processus AR(1)
        # [cite_start]L'équation (2) [cite: 89] définit la dynamique des états.
        # Nous calculons l'erreur de prédiction entre t et t+1.
        
        # lambda_{2:T} (Observed Next State)
        lambda_next = self.lambdas[1:] # Shape: (T-1, n, p)
        
        # lambda_{1:T-1} (Current State)
        lambda_curr = self.lambdas[:-1] # Shape: (T-1, n, p)
        
        # Paramètres courants (avec broadcasting pour la dimension temporelle)
        # mu et phi sont de forme (n, p) -> on ajoute l'axe temps: (1, n, p)
        mu = self.theta['mu'][np.newaxis, :, :]
        phi = self.theta['phi'][np.newaxis, :, :]
        
        # Prédiction : mu + phi * (lambda_t - mu)
        lambda_pred = mu + phi * (lambda_curr - mu)
        
        # Résidus eta_t ~ N(0, Sigma)
        residuals = lambda_next - lambda_pred # Shape: (T-1, n, p)
        
        # 2. Calcul de la Somme des Carrés des Erreurs (SSE)
        # On somme sur l'axe temporel (axis=0)
        sse = np.sum(residuals**2, axis=0) # Shape: (n, p)
        
        # 3. Paramètres du Postérieur (Inverse Gamma)
        # [cite_start]Prior donné dans l'application[cite: 363]: alpha=20, beta=0.25
        alpha_prior = 20.0
        beta_prior = 0.25
        
        # Nombre d'observations effectives pour la transition
        T_eff = self.lambdas.shape[0] - 1
        
        # Mise à jour des hyperparamètres (Règle standard conjuguée)
        # alpha_post = alpha_prior + T/2
        alpha_post = alpha_prior + (T_eff / 2.0)
        
        # beta_post = beta_prior + SSE/2
        beta_post = beta_prior + (sse / 2.0)
        
        # 4. Échantillonnage (Sampling)
        # NumPy n'a pas de np.random.invgamma direct paramétré en alpha/beta.
        # On utilise la relation: Si X ~ Gamma(alpha, beta_rate), alors 1/X ~ InvGamma(alpha, beta_rate).
        # Note: np.random.gamma prend (shape, scale). scale = 1/rate = 1/beta_post.
        
        gamma_draws = np.random.gamma(shape=alpha_post, scale=1.0/beta_post)
        
        # On inverse pour obtenir l'Inverse Gamma
        new_sigma = 1.0 / gamma_draws # Shape: (n, p)
        
        # Mise à jour du dictionnaire de paramètres
        self.theta['sigma'] = new_sigma

    def update_mu_phi(self):
        """
        Step 7: Tirage de mu et Phi conditionnellement aux états Lambda et Sigma.
        
        Met à jour la moyenne long terme (mu) et l'autocorrélation (Phi)
        des processus AR(1) des états latents.
        
        Transition Model: Lambda_{t+1} = mu + Phi * (Lambda_t - mu) + eta_t
        eta_t ~ N(0, Sigma)
        
        Sources:
        -  "Draw mu, Phi... acceptance sampling for truncated normal... standard."
        - [cite: 362] Prior mu ~ N(0.4, 2)
        - [cite: 363] Prior Phi ~ N(0.985, 0.001) truncated to stationarity region (-1, 1).
        - [cite: 362] Assumption: Phi and Sigma are diagonal (indépendance élément par élément).
        """
        
        # Données: Séparation des états en t (current) et t+1 (next)
        # Dimensions: lam_curr (T-1, n, p), lam_next (T-1, n, p)
        lam_curr = self.lambdas[:-1]
        lam_next = self.lambdas[1:]
        T_eff = lam_curr.shape[0] # T-1
        
        # Récupération de la variance courante sigma^2 (n, p)
        # Diffusée (broadcasted) pour les calculs
        sigma_sq = self.theta['sigma'] 
        
        # -------------------------------------------------------
        # A. Mise à jour de MU (Moyenne Long Terme)
        # -------------------------------------------------------
        # On réécrit l'équation: Lambda_{t+1} - Phi*Lambda_t = (1 - Phi)*mu + eta_t
        # Forme: Y = X*mu + eta. Ici X = (1-Phi) est constant dans le temps.
        
        # Paramètres courants de Phi (n, p)
        phi = self.theta['phi']
        
        # Priors pour Mu [cite: 362]
        mu_prior_mean = 0.4
        mu_prior_var = 2.0
        mu_prior_prec = 1.0 / mu_prior_var
        
        # 1. Construction des "pseudo-données"
        # Y_t = Lambda_{t+1} - Phi * Lambda_t
        y_mu = lam_next - phi[np.newaxis, :, :] * lam_curr # (T-1, n, p)
        
        # "Regresseur" constant A = (1 - Phi)
        A = (1.0 - phi) # (n, p)
        
        # 2. Calcul des statistiques suffisantes (Précision et Moyenne pondérée)
        # Likelihood Precision = sum(A^2 / sigma^2) = T * A^2 / sigma^2
        lik_prec_mu = T_eff * (A**2) / sigma_sq
        
        # Likelihood Mean term = sum(A * Y_t / sigma^2) = (A / sigma^2) * sum(Y_t)
        lik_mean_term_mu = (A / sigma_sq) * np.sum(y_mu, axis=0)
        
        # 3. Postérieur pour Mu (Conjugaison Normale-Normale)
        post_prec_mu = mu_prior_prec + lik_prec_mu
        post_var_mu = 1.0 / post_prec_mu
        post_mean_mu = post_var_mu * (mu_prior_prec * mu_prior_mean + lik_mean_term_mu)
        
        # 4. Tirage de Mu
        self.theta['mu'] = np.random.normal(loc=post_mean_mu, scale=np.sqrt(post_var_mu))
        
        # -------------------------------------------------------
        # B. Mise à jour de PHI (Autocorrélation) - Normale Tronquée
        # -------------------------------------------------------
        # On réécrit l'équation: (Lambda_{t+1} - mu) = Phi * (Lambda_t - mu) + eta_t
        # Forme: Y = Phi*X + eta (Régression sans constante passant par l'origine)
        
        # Nouvelle valeur de mu (n, p) venant d'être tirée
        mu = self.theta['mu']
        
        # Priors pour Phi [cite: 363]
        phi_prior_mean = 0.985
        phi_prior_var = 0.001
        phi_prior_prec = 1.0 / phi_prior_var
        
        # 1. Données centrées
        # X_t = Lambda_t - mu
        x_phi = lam_curr - mu[np.newaxis, :, :]
        # Y_t = Lambda_{t+1} - mu
        y_phi = lam_next - mu[np.newaxis, :, :]
        
        # 2. Statistiques suffisantes
        # sum_x_sq = sum(X_t^2)
        sum_x_sq = np.sum(x_phi**2, axis=0)
        # sum_xy = sum(X_t * Y_t)
        sum_xy = np.sum(x_phi * y_phi, axis=0)
        
        # Likelihood Precision = sum(X^2) / sigma^2
        lik_prec_phi = sum_x_sq / sigma_sq
        
        # Likelihood Mean term = sum(X*Y) / sigma^2
        lik_mean_term_phi = sum_xy / sigma_sq
        
        # 3. Postérieur non tronqué (Conjugaison Normale-Normale)
        post_prec_phi = phi_prior_prec + lik_prec_phi
        post_var_phi = 1.0 / post_prec_phi
        post_mean_phi = post_var_phi * (phi_prior_prec * phi_prior_mean + lik_mean_term_phi)
        post_std_phi = np.sqrt(post_var_phi)
        
        # 4. Tirage Tronqué sur (-1, 1) 
        # "We use acceptance sampling for the truncated normal... which is standard."
        # Pour l'efficacité et la robustesse vectorisée, on utilise scipy.stats.truncnorm
        # Paramétrage de truncnorm: a et b sont les bornes normalisées
        
        a_clip = (-1.0 - post_mean_phi) / post_std_phi
        b_clip = (1.0 - post_mean_phi) / post_std_phi
        
        self.theta['phi'] = stats.truncnorm.rvs(
            a_clip, 
            b_clip, 
            loc=post_mean_phi, 
            scale=post_std_phi, 
            size=mu.shape
        )


In [16]:
class UnivariateLoadingSSM(ssm.StateSpaceModel):
    """
    Modèle espace d'état pour les chargements d'un seul actif i.
    Transition: Eq (2) [cite: 89]
    Observation: Eq (5) avec scaling Eq (6)-(7) [cite: 146, 156]
    """
    def __init__(self, mu, phi, sigma_eta, z, zeta, data_x):
        """
        :param mu: Moyenne long terme (scalaire ou vecteur taille p)
        :param phi: Autocorrélation (scalaire ou diag)
        :param sigma_eta: Variance du bruit de transition (scalaire ou diag)
        :param z: Facteurs communs (T, p) [cite: 150]
        :param zeta: Variable de mélange (T,) [cite: 220]
        :param data_x: Données observées transformées x_it (T,) [cite: 5]
        """
        self.mu = mu
        self.phi = phi
        self.sigma_eta = sigma_eta
        self.z = z
        self.zeta = zeta
        self.data_x = data_x
        
    def PX0(self):
        """Distribution initiale de lambda_{i,1} """
        # Stationnaire: N(mu, sigma^2 / (1-phi^2))
        var_0 = self.sigma_eta / (1 - self.phi**2)
        return distributions.Normal(loc=self.mu, scale=np.sqrt(var_0))

    def PX(self, t, xp):
        """Transition lambda_{i, t} | lambda_{i, t-1} (Eq 2) """
        # AR(1): mu + phi * (prev - mu) + noise
        mean = self.mu + self.phi * (xp - self.mu)
        return distributions.Normal(loc=mean, scale=np.sqrt(self.sigma_eta))

    def PY(self, t, xp, x):
        """
        Densité d'observation p(x_it | lambda_{it}, z_t, zeta_t)
        Utilise les équations de scaling (6) et (7).
        """
        # 1. Récupérer les covariables au temps t
        z_t = self.z[t]       # Facteurs (p,)
        zeta_t = self.zeta[t] # Variable de mélange (scalaire)
        obs = self.data_x[t]  # Donnée x_it
        
        # 2. Calcul du scaling 
        # Dans le cas p=1 (un seul facteur par loading pour simplifier l'exemple)
        # Si p > 1, x est un vecteur et il faut sommer x^2
        # Scaling factor: D = 1 + lambda'lambda
        # Note: x ici est la particule courante représentant lambda_{it}
        
        # Gestion vectorielle pour les particules (N_particles,)
        lam_sq = x**2 
        scaling_factor = np.sqrt(1.0 + lam_sq) 
        
        # 3. Chargements et variance transformés 
        # tilde_lambda = lambda / sqrt(1 + lambda^2)
        tilde_lambda = x / scaling_factor
        
        # sigma_eps = 1 / sqrt(1 + lambda^2)
        sigma_eps = 1.0 / scaling_factor
        
        # 4. Moyenne et Variance conditionnelles de x_it
        # x_it = sqrt(zeta) * (tilde_lambda * z_t + sigma_eps * epsilon)
        # Donc x_it ~ N( Mean, Var )
        
        # Moyenne = sqrt(zeta) * tilde_lambda * z_t
        mean_obs = np.sqrt(zeta_t) * tilde_lambda * z_t
        
        # Ecart-type = sqrt(zeta) * sigma_eps
        scale_obs = np.sqrt(zeta_t) * sigma_eps
        
        return distributions.Normal(loc=mean_obs, scale=scale_obs).logpdf(obs)