In [1]:
import numpy as np
import pandas as pd
from scipy.stats import invgamma, multivariate_normal, t, gamma
from numpy.linalg import cholesky


np.random.seed(200)

# Nombre d'observations
nombre_observations = 50

# Matrice de modèle simulée
X = np.column_stack([np.ones(nombre_observations), np.random.normal(0, 1, nombre_observations),
                     np.random.normal(5, 10, nombre_observations), np.random.normal(100, 10, nombre_observations)])

# Vrais coefficients beta
vrais_coefficients_beta = np.array([1000, 50, -50, 10])

# Vraie valeur de phi
vraie_phi = 10000
matrice_identite = np.eye(nombre_observations)  # Matrice identité utilisée pour la matrice de covariance

# Simuler la variable dépendante pour la régression
y = multivariate_normal.rvs(mean=np.dot(X, vrais_coefficients_beta),
                            cov=vraie_phi * matrice_identite)

# valeurs initiales
n,p = X.shape

beta_sampled = np.zeros(p)
eta_sampled = np.zeros(p)
zeta_sampled = 0
sigma_sampled = 1

print(n,p)

# On fait une fonction d'échantillonnage pour chaque paramètre : 

50 4


**Fonction qui sample les eta :** 

In [2]:
def slice_sampling_eta(beta, sigma_sq, zeta, nu, eta_t, step_size=1.0):
    current_value_eta = eta_t
    # Étape de "slice"
    height_eta = np.random.uniform(0, target_density_eta(current_value_eta, beta, sigma_sq, zeta, nu))
    # Étape de réduction de la tranche
    left_eta = current_value_eta - np.random.exponential(scale=step_size)
    right_eta = left_eta + step_size
    while target_density_eta(left_eta, beta, sigma_sq, zeta, nu) < height_eta:
        left_eta -= step_size
    while target_density_eta(right_eta, beta, sigma_sq, zeta, nu) < height_eta:
        right_eta += step_size
    # Étape d'échantillonnage
    new_value_eta = np.random.uniform(left_eta, right_eta)
    samples_eta.append(new_value_eta)
        
    return samples_eta[1:]  # On retire la valeur initiale


def target_density_eta(eta, beta, sigma_sq, zeta, nu):
    p = len(beta)
    density = 1.0
    for j in range(p):
        m_tj = zeta * beta[j]**2 / (2 * sigma_sq)
        term1 = np.exp(-m_tj * eta[j])
        term2 = eta[j]**((1 - nu) / 2) * (1 + nu * eta[j])**(nu + 1)
        density *= term1 / term2
    return density


# test : 

if __name__ == "__main__":
    # Exécution de l'algorithme de slice sampling pour échantillonner eta
    samples_eta = slice_sampling_eta(beta_sampled, 1, zeta_sampled, 1, eta_sampled, 1)
    
    # Affichage des résultats
    print("éta_sampled:", samples_eta )
    print("Moyenne des échantillons de eta:", np.mean(samples_eta, axis=0))
    print("Écart type des échantillons de eta:", np.std(samples_eta, axis=0))




NameError: name 'samples_eta' is not defined

In [18]:
def sample_eta(beta, sigma_sq, zeta, nu):
    """
    Échantillonne les valeurs de eta_t+1 conditionnellement à beta, sigma_sq, zeta, et nu.

    Arguments :
    beta : Vecteur de coefficients beta_t.
    sigma_sq : Variance sigma_t^2.
    zeta : Valeur zeta_t.
    nu : Paramètre nu.

    Returns :
    eta_sampled : Valeurs échantillonnées de eta_t+1.
    """
    
    eta_sampled = np.zeros(p)

    for j in range(p):
        m_tj = zeta * beta[j]**2 / (2 * sigma_sq)
        # Calcul du terme de pondération
        weight = np.exp(-m_tj * eta_sampled[j])
        # Calcul du terme de normalisation
        normalization = eta_sampled[j]**((1 - nu) / 2) * (1 + nu * eta_sampled[j])**(nu + 1)
        # Échantillonnage de eta_t+1,j
        eta_sampled[j] = np.random.gamma(shape=(1 - nu) / 2, scale=1 / normalization) * weight
    return eta_sampled






**Fonction qui sample les zeta (ou xi dans l'article) :** 

In [None]:
# il faut faire une fonction calculate_likelihood ici 

def calculate_likelihood(y, zeta, eta):    # il faut que les béta_sampled et les sigma_sq_sampled existe pour le faire.
    """
    Calcule la vraisemblance de y sachant zeta_t+1 et eta_t+1.

    Arguments :
    y : Vecteur des observations.
    X : Matrice des prédicteurs.
    beta_t: Échantillons de beta.
    sigma_sq_samples : Échantillons de sigma_sq.
    zeta : Valeur de zeta_t+1.
    eta : Vecteur des valeurs eta_t+1.

    Returns :
    likelihood : La vraisemblance de y sachant zeta_t+1 et eta_t+1.
    """
    n_samples = len(beta_t)
    likelihood = 0

    for i in range(n_samples):
        beta = beta_t[i]
        sigma_sq = sigma_sq_samples[i]
        
        # Calcul de la densité conditionnelle de y sachant beta et sigma_sq
        data_density = np.exp(-0.5 * n * np.log(2 * np.pi * sigma_sq) - 0.5 * np.linalg.slogdet(Sigma)[1] \
                     - 0.5 * np.dot(np.dot((y - np.dot(X.T, beta)).T, np.linalg.inv(Sigma)), (y - np.dot(X.T, beta))))
        
        # Calcul de la densité jointe de beta et sigma_sq sachant zeta et eta
        joint_prior = np.exp(-0.5 * np.sum(beta**2) / (zeta * eta) - 0.5 * np.log(sigma_sq))
        
        # Ajout de la contribution de cet échantillon à la vraisemblance
        likelihood += data_density * joint_prior
    
    # Moyenne des contributions de tous les échantillons
    likelihood /= n_samples
    
    return likelihood



def sample_zeta(zeta_previous, eta_tplus1, beta, sigma_sq, sigma_mrth=0.8): 
# y, X, beta, sigma_sq ???
    
    """
    Échantillonne la valeur de zeta_t+1 conditionnellement à zeta_previous et eta_t+1.

    Arguments :
    zeta_previous : Valeur zeta_t.
    eta_tplus1 : Valeur échantillonnée de eta_t+1.
    y : Vecteur de données.
    X : Matrice de design.
    beta : Vecteur de coefficients beta_t.
    sigma_sq : Variance sigma_t^2.
    sigma_mrth : Écart-type de la proposition normale = 0.8 selon l'article.

    Returns :
    zeta_sampled : Valeur échantillonnée de zeta_t+1.
    """
    # Calcul de la matrice Sigma
    Sigma = np.dot(X, X.T) + zeta_proposed * np.diag(eta_tplus1)

    # Proposition d'un nouvel échantillon de log(zeta_t+1)
    log_zeta_proposed = np.random.normal(np.log(zeta_previous), sigma_mrth)

    # Calcul des termes de probabilité a priori
    prior_current = -0.5 * zeta_previous ** 2
    prior_proposed = -0.5 * np.exp(2 * log_zeta_proposed)

    # On calcule la vraisemblance conditionnelle des données
    log_likelihood = np.log(calculate_likelihood(y, zeta_proposed, eta_sampled)) # cf fonction calculate_likelihood au dessus

    
    # Log-probabilité du log-posterior pour les valeurs actuelles et proposées
    log_posterior_current = log_likelihood + prior_current
    log_posterior_proposed = log_likelihood + prior_proposed

    # Calcul du ratio de probabilité
    acceptance_ratio = np.exp(log_posterior_proposed - log_posterior_current)

    # Acceptation ou rejet de la proposition
    if np.random.uniform(0, 1) < acceptance_ratio:
        zeta_sampled = np.exp(log_zeta_proposed)
    else:
        zeta_sampled = zeta_previous

    return zeta_sampled

if __name__ == "__main__":
    # Exécution de l'algorithme pour échantillonner zeta
    sample_zeta = slice_sampling_eta(zeta_sampled, eta_sampled,beta_sampled , sigma_sampled, 0,8)
    # Affichage des résultats
    print("zéta_sampled:", samples_zeta )

**Fonction qui sample les sigma :** 

In [None]:
def sample_sigma_sq(zeta, eta):
    """
    Échantillonne la valeur de sigma_t+1^2 conditionnellement à zeta_t+1, et eta_t+1.

    Arguments :
    zeta : Valeur zeta_t+1.
    eta : Vecteur de valeurs eta_t+1.

    Returns :
    sigma_sq_sampled : Valeur échantillonnée de sigma_t+1^2.
    """
    
    # Calcul de M
    inv_eta = np.diag(1 / eta)
    M = np.eye(n) + zeta**(-1) * X @ inv_eta @ X.T
    
    # Calcul de y^T M^-1 y
    y_M_inv_y = y.T @ np.linalg.inv(M) @ y
    
    # Échantillonnage de sigma_t+1^2
    sigma_sq_sampled = 1 / np.random.gamma((1 + n) / 2, (y_M_inv_y + 1) / 2)
    
    return sigma_sq_sampled


**Fonction qui sample les béta :** 

In [None]:
def sample_beta(zeta, eta, sigma_sq):
    """
    Échantillonne la valeur de beta_t+1 conditionnellement à y, X, zeta_t+1, eta_t+1, et sigma_t+1^2.

    Arguments :
    y : Vecteur de réponse.
    X : Matrice de design.
    zeta : Valeur zeta_t+1.
    eta : Vecteur de valeurs eta_t+1.
    sigma_sq : Variance sigma_t+1^2.

    Returns :
    beta_sampled : Valeur échantillonnée de beta_t+1.
    """
    
    # Calcul de Sigma
    
    inv_eta = np.diag(1 / eta)
    Sigma = np.linalg.inv(X.T @ X + zeta * inv_eta)
    
    # Calcul de mu
    mu = Sigma @ X.T @ y
    
    # Échantillonnage de beta_t+1
    beta_sampled = np.random.multivariate_normal(mu, sigma_sq * Sigma)
    
    return beta_sampled


**On recompile tout maintenant : pour faire notre block gibbs sampling** 

In [None]:
def blocked_gibbs_sampler(X, y, n_iter, burn_in = 600):
    """
    Réalise un échantillonnage de Gibbs en blocs pour estimer les paramètres d'un modèle linéaire bayésien.
    
    Arguments :
    X : Matrice de design.
    y : Vecteur de réponse.
    n_iter : Nombre d'itérations de l'algorithme.
    burn_in : Nombre d'itérations à brûler = 600 pour nu = 1 selon l'article
    
    Returns :
    betas : Liste des valeurs échantillonnées des coefficients beta.
    sigmas_squared : Liste des valeurs échantillonnées des variances sigma^2.
    etas :  Liste des valeurs échantillonnées de eta.
    zetas : Liste des valeurs échantillonnées de zeta.
    """
    n, p = X.shape
    
    beta_current = np.zeros(p)
    sigma_squared_current = 1
    zeta_current = 1
    eta_current = np.ones(p)

    nu = 1   # on se cantonne au cas nu = 1 qui est la meilleure valeur selon l'article mais on pourrait changer la valeur

    betas = []
    sigmas_squared = []
    etas = []
    zetas = []
    
    for _ in range(n_iter):
        # Étape 1 : Échantillonnage de eta_t+1
        eta_current = sample_eta(beta_current, sigma_squared_current, zeta_current, nu)
      
        # Étape 2 : Échantillonnage de zeta_t+1
        zeta_current = sample_zeta(zeta_current, eta_current, beta_current sigma_squared_current, sigma_mrth=0.8)
        
        # Étape 3 : Échantillonnage de sigma_t+1^2
        sigma_squared_current = sample_sigma_sq(zeta_current, eta_current)
        
        # Étape 4 : Échantillonnage de beta_t+1
        beta_current = sample_beta(zeta_current, eta_current, sigma_squared_current)

        # Sauvegarde des échantillons après la période de brûlage
        if _ >= burn_in:
            betas.append(beta_current)
            sigmas_squared.append(sigma_squared_current)
            etas.append(eta_current)
            zetas.append(zeta_current)
    
    return betas, sigmas_squared, etas, zetas
