# QUASI MONTE CARLO CDO PRICING 

In [12]:
import math
import numpy as np
from numpy.random import default_rng
rng = default_rng()
from scipy.stats import norm
from scipy.stats import multivariate_normal
from scipy.stats import t
from scipy.stats import norm, norminvgauss
from scipy.stats.qmc import Sobol
from scipy.integrate import quad
from primePy import primes
import pandas as pd
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy.core.fromnumeric")
warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy.core._methods")

### Les méthodes de Quasi Monte Carlo : 

Les méthodes de quasi monte-carlo sont des méthodes qui supposent de remplacer les nombres pseudo-aléatoires par des séquences déterministes. Autrement, les nombres pseudo-aléatoires sont remplacés par des séquences déterministes de nombres qui une fois remplacés stricto-sensus en lieu et place des nombres pseudo-alétoires, permettent de réduire les temps de simulation, d'avoir une meilleure approximation et surtout d'avoir un control sur nos simulations. 


Les séquences déterministes de nombres qui remplacent les nombres pseudo-aléatoires sont appelés des **suites a discrépance faible**, ie des séquences dont la propriété est de bien occuper l'espace \[0,1\]. Parmi ces suites, nous avons : 


**Suite de Van der Corput**
La suite de Van der Corput repose sur la manipulation des coefficients de la décomposition $p-$adique de $n \ge 1$:
- Soit $p$ un nombre premier qui sert de base à la décomposition $p-$adique
- Décomposition de $n$: 
\begin{equation*}
    n = a_0 + a_1 p + \dots a_r p^r, \quad a_k \in \{0, \dots, p-1\}, \quad a_r \neq 0,
\end{equation*}
- Construction de $\xi_n$: 
\begin{equation*}
    \xi^{(p)}_n = \frac{a_0}{p} + \frac{a_1}{p^2} + \dots + \frac{a_r}{p^{r+1}} \in [0,1].
\end{equation*}

\begin{equation*}
    D_n^*(\xi^{(p)}) \le \frac{1}{n} \left(1 + (p-1) \frac{\log(p n)}{\log(p)}\right)
\end{equation*}


**Suite de Halton**

C'est une Van der Corput multidimensionnelle.
Soit $d$ la dimension, et $p_1,\dots, p_d$ les $d$ premiers nombres premiers.

La suite d'Halton est définie par  
\begin{equation*}
    \Xi^{(d)}_n = (\xi^{(p_1)}_n, \dots, \xi^{(p_d)}_n)
\end{equation*}

**Discrépance:** 
\begin{equation*}
    D_n^*(\xi^{(p)}) \le \frac{1}{n} \left(1 + \prod_{i=1}^d(p_i-1) \frac{\log(p_i n)}{\log(p_i)}\right)
\end{equation*}



**Suite de Sobol**

L'algorithme le plus populaire pour les suites à discrépances faible. La génération d'une suite de Sobol est très rapide car toute l'arithmétique se fait en base `2` (à la différence de la suite de Halton).
Il est recommandé d'utiliser uniquement des algorithmes déjà codés, testés et éprouvés! Il ne faut pas recoder son prore algorithme qui sera peut-être invalide.

**Halton & Copule Gaussienne a 1 facteur et Approximation LHP**


### Classe Quasi\_Monte\_Carlo\_Gaussian\_CDO:


$\_\_init\_\_$ initialise les paramètres de la classe avec les arguments donnés:

 - attachment : Point d'attachement
 - detachment : Point de détachement
 - rho : Coefficient de corrélation
 - intensity : Intensité de défaut
 - T : Horizon de temps
 - d : Dimension de l'espace des variables aléatoires
 - r : Taux d'intérêt sans risque
 - notionel : Montant notionnel du CDO
 - n\_samples : Nombre d'échantillons pour la simulation

  
- Génération des échantillons Quasi-Monte Carlo:

  La méthode **quasi_monte_carlo_stopping_times** génère des temps d'arrêt corrélés en utilisant la méthode Quasi-Monte Carlo. Les étapes sont les suivantes :

    - Génération de la suite de van der Corput avec la fonction van_der_corput_r.
    - Génération de la suite de van der Corput pour n échantillons et une base p avec la fonction van_der_corput.
    -  Génération de la suite de Halton pour n échantillons et d dimensions avec la fonction halton.
    -  Transformation Box-Muller pour générer des échantillons multivariés à partir des échantillons quasi-aléatoires avec la fonction box_muller_multivariate_sample.
    -  Génération des échantillons multivariés corrélés en utilisant la décomposition de Cholesky avec la fonction correlated_brownian_multivariate_sample.
    -  Filtrage des temps d'arrêt pour ne garder que ceux inférieurs à l'horizon de temps T.


- Probabilités neutres au risque:

  La méthode **risk_neutral_probabilities** calcule la probabilité neutre au risque à un instant t donné.


-  Perte attendue:

   La méthode expected_loss calcule la perte attendue à un instant t en utilisant une distribution bivariée normale. Nous avons utilisé la meme expected loss que celle dans le cas de la copule gaussienne avec une copule gaussienne.



-  Calcul du spread du CDO synthétique:

     La méthode **synthetic_cdo_spread_computation_gaussian_normal** calcule le spread du CDO synthétique en utilisant l'approche Quasi-Monte Carlo. Les étapes sont les suivantes :
     
   - Génération des temps d'arrêt avec la méthode quasi_monte_carlo_stopping_times.

   -  Pour chaque échantillon de temps d'arrêt, calcul des flux de prime et des flux de protection en utilisant la perte attendue et les probabilités neutres au risque et en considérant le taux de recouvrement.

   -  Calcul du spread pour chaque échantillon en divisant la somme des flux de prime par la somme des flux de protection. Ajout d'un petit nombre epsilon pour éviter les divisions par zéro.

   -  Calcul de la moyenne, de la variance et de l'intervalle de confiance des spreads obtenus à partir des échantillons.

   -  Retour des résultats sous forme d'un DataFrame pandas.





In [17]:
class Quasi_Monte_Carlo_Gaussian_CDO:
    
    def __init__(self, attachment, detachment, rho, intensity, T, d, r, notionel, n_samples):
        
        self.attachment = attachment
        self.detachment = detachment
        self.rho = rho
        self.intensity = intensity
        self.T = T
        self.d = d
        self.r = r
        self.notionel = notionel
        self.n_samples = n_samples
        
    def quasi_monte_carlo_stopping_times(self):
        
        def van_der_corput_r(r: int = 1, p: int = 2, scramble: bool = True):
            rng = np.random.default_rng() 
            coeffs = np.arange(p)[np.newaxis]
            if scramble:
                rng.shuffle(coeffs, axis=1)
            seq = coeffs / p
            for k in range(2, r+1):
                if scramble:
                    rng.shuffle(coeffs, axis=1)
                seq = (seq + coeffs.T/p**k).reshape((1, -1))
            seq = seq.flatten()
            return seq

        def van_der_corput(n=self.n_samples, p=self.d):
            r = np.ceil(np.log(n) / np.log(p))
            seq = van_der_corput_r(int(r), p, scramble=True)
            return seq[:n]

        def halton(n=self.n_samples, d=self.d):
            n_primes = primes.first(self.d + 4) 
            result = [van_der_corput(n, n_primes[k]) for k in range(d)]
            return np.array(result).T

        def box_muller_multivariate_sample():
            quasi_random_samples = halton()
            epsilon = 1e-10  
            array = np.empty((self.n_samples,self.d))

            for i in range(0,int(quasi_random_samples.shape[1]/2)):
              
                array[:, 2 * i] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.cos(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

                array[:, 2 * i + 1] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.sin(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

            return array

        
        def correlated_brownian_multivariate_sample():
            
            multivariate_samples = box_muller_multivariate_sample()
            
            covariance_matrix = np.full((self.d,self.d),self.rho) + (1 - self.rho) * np.eye(self.d)
            
            if np.allclose(covariance_matrix,covariance_matrix.T):
                try : 
                    L = np.linalg.cholesky(covariance_matrix)


                except np.linalg.LinAlgError : 
                    print("La matrice n'est pas sémi définie positive")
                
            # Une fois obtenue L , nous pouvons trouver l'échantillon final correspondant 
            multivariate_samples_correlated = L @ multivariate_samples.T
        
            multivariate_samples_correlated = multivariate_samples_correlated.T
            
            
            # simulons les temps d'arret
        
            stopping_times = (-1/self.intensity) * np.log(norm.cdf(multivariate_samples_correlated))
        
        
            for i in range(stopping_times.shape[0]):
            
                stopping_times[i] = np.sort(stopping_times[i])
            
            filtered_stopping_times = []
        
            for i in range(stopping_times.shape[0]):
                filtered_times = stopping_times[i][stopping_times[i] < self.T]
            
                filtered_stopping_times.append(filtered_times)
            
            filtered_stopping_times = np.array(filtered_stopping_times,dtype=object)
            
        
        
            return filtered_stopping_times
        
        
        return correlated_brownian_multivariate_sample()
    
    
    
    def risk_neutral_probabilities(self,t):
        
        return 1 - np.exp(-self.intensity * t)
    
    
    
    def expected_loss(self, t, recovery):
        
        covariance_matrix = np.array([[1, -np.sqrt(1-self.rho**2)],
                                      [-np.sqrt(1-self.rho**2), 1]])
        
        mean = np.array([0, 0])
        
        bivariate_distribution = multivariate_normal(mean, covariance_matrix)
        
        inverse_K_1 = - norm.ppf(self.attachment/(1 - recovery))
        
        inverse_K_2 = - norm.ppf(self.detachment/(1 - recovery))
        
        x = np.array([inverse_K_1, norm.ppf(self.risk_neutral_probabilities(t))])
        
        y = np.array([inverse_K_2, norm.ppf(self.risk_neutral_probabilities(t))])

        # Calcul de la perte attendue en fonction de la distribution bivariée
        return (bivariate_distribution.cdf(x) - 
                 bivariate_distribution.cdf(y)) / ((self.detachment - self.attachment)/(1 - recovery))

    # Calcul du spread synthétique du CDO en utilisant l'approximation gaussienne
    def synthetic_cdo_spread_computation_gaussian_normal(self, proba,recovery):
        
        spread_list = []
    
        stopping_times_sim = self.quasi_monte_carlo_stopping_times()
        
        for i in range(len(stopping_times_sim)):
            premium_list = []
            protection_list = []

            stopping_times_sample = stopping_times_sim[i]

            for j in range(len(stopping_times_sample) - 1):
                current_time = stopping_times_sample[j]
                next_time = stopping_times_sample[j + 1]
                current_expected_loss = self.expected_loss(current_time,recovery)
                next_expected_loss = self.expected_loss(next_time,recovery)
                time_diff = next_time - current_time
                discount_factor = np.exp(-self.r * current_time)

                # Calcul de la différence des flux de primes entre les deux temps d'arrêt
                premium_flow_difference = (next_expected_loss - current_expected_loss) * discount_factor
                premium_list.append(premium_flow_difference)

                # Calcul de la différence des flux de protection entre les deux temps d'arrêt
                protection_flow_difference = time_diff * (1 - current_expected_loss) * discount_factor
                protection_list.append(protection_flow_difference)
            
            epsilon = 1e-10
            premium_leg = np.sum(premium_list)
            protection_leg = np.sum(protection_list)
            
            
            if protection_leg != 0 and not math.isnan(premium_leg) and not math.isnan(protection_leg):
                spread_list.append(premium_leg / protection_leg + epsilon)
            
            # Calcul de la moyenne, de la variance et de l'intervalle de confiance pour le spread synthétique
            mean = np.mean(spread_list)
            var = np.var(spread_list, ddof=1)
            alpha = 1 - proba 
            quantile = norm.ppf(1 - alpha / 2)  # fonction quantile 
            ci_size = quantile * np.sqrt(var / np.array(spread_list).size)
        
        return pd.DataFrame({'Monte-Carlo Spread': mean,
                             'Variance': var,
                             'lower_confidence': mean - ci_size,
                             'upper_confidence': mean + ci_size}, index=['Halton_Gaussienne']).T

In [18]:
quasi_gaussian = Quasi_Monte_Carlo_Gaussian_CDO(attachment=0.3,detachment=0.7,rho=0.3,intensity=0.2,
                              T = 5, d=40,r=0.02,notionel=1,n_samples=30)

In [19]:
%%time
df_quasi_gaussien = quasi_gaussian.synthetic_cdo_spread_computation_gaussian_normal(0.95,recovery=0.2)

CPU times: user 705 ms, sys: 10.9 ms, total: 716 ms
Wall time: 709 ms


In [20]:
df_quasi_gaussien

Unnamed: 0,Halton_Gaussienne
Monte-Carlo Spread,0.122565
Variance,0.000495
lower_confidence,0.114469
upper_confidence,0.13066


**Halton Copule de Student a 1 facteur et Approximation LHP**

In [27]:
class Quasi_Monte_Carlo_Student_t_copula:
    
    def __init__(self, attachment, detachment, rho, intensity, T, d, r, notionel, n_samples, nu):
        self.attachment = attachment
        self.detachment = detachment
        self.rho = rho
        self.intensity = intensity
        self.T = T
        self.d = d
        self.r = r
        self.notionel = notionel
        self.n_samples = n_samples
        self.nu = nu
    

    def quasi_monte_carlo_stopping_times(self):
        
        def van_der_corput_r(r: int = 1, p: int = 2, scramble: bool = True):
            rng = np.random.default_rng() 
            coeffs = np.arange(p)[np.newaxis]
            if scramble:
                rng.shuffle(coeffs, axis=1)
            seq = coeffs / p
            for k in range(2, r+1):
                if scramble:
                    rng.shuffle(coeffs, axis=1)
                seq = (seq + coeffs.T/p**k).reshape((1, -1))
            seq = seq.flatten()
            return seq

        def van_der_corput(n=self.n_samples, p=self.d):
            r = np.ceil(np.log(n) / np.log(p))
            seq = van_der_corput_r(int(r), p, scramble=True)
            return seq[:n]

        def halton(n=self.n_samples, d=self.d):
            n_primes = primes.first(self.d + 4) 
            result = [van_der_corput(n, n_primes[k]) for k in range(d)]
            return np.array(result).T

        def box_muller_multivariate_sample():
            quasi_random_samples = halton()
            epsilon = 1e-10  
            array = np.empty((self.n_samples,self.d))

            for i in range(0,int(quasi_random_samples.shape[1]/2)):
              
                array[:, 2 * i] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.cos(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

                array[:, 2 * i + 1] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.sin(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

            return array

        
        def correlated_brownian_multivariate_sample():
            
            multivariate_samples = box_muller_multivariate_sample()
            
            covariance_matrix = np.full((self.d,self.d),self.rho) + (1 - self.rho) * np.eye(self.d)
            
            if np.allclose(covariance_matrix,covariance_matrix.T):
                try : 
                    L = np.linalg.cholesky(covariance_matrix)


                except np.linalg.LinAlgError : 
                    print("La matrice n'est pas sémi définie positive")
                
            # Une fois obtenue L , nous pouvons trouver l'échantillon final correspondant 
            multivariate_samples_correlated = L @ multivariate_samples.T
        
            multivariate_samples_correlated = multivariate_samples_correlated.T
            
            
            # simulons les temps d'arret
        
            stopping_times = (-1/self.intensity) * np.log(norm.cdf(multivariate_samples_correlated))
        
        
            for i in range(stopping_times.shape[0]):
            
                stopping_times[i] = np.sort(stopping_times[i])
            
            filtered_stopping_times = []
        
            for i in range(stopping_times.shape[0]):
                filtered_times = stopping_times[i][stopping_times[i] < self.T]
            
                filtered_stopping_times.append(filtered_times)
            
            filtered_stopping_times = np.array(filtered_stopping_times,dtype=object)
            
        
        
            return filtered_stopping_times
        
        
        return correlated_brownian_multivariate_sample()
    
    
    def risk_neutral_probabilities(self,t):
        
        return 1 - np.exp(- self.intensity * t)
    

    
    
    def student_copula_percentile(self,time,index):
        
        proba = self.risk_neutral_probabilities(time)
        
        quantile = t.ppf(proba,self.nu)
        
        return quantile
    
    
    def expected_loss(self,time,index,recovery=0):
        
        def f(x):
            
            return (min(x,(self.detachment/(1 - recovery))) - (self.attachment/(1 - recovery)))
        
        def g(x):
            
            c_t = self.student_copula_percentile(time,index)
            
            return (np.sqrt(1-self.rho**2)/self.rho) * (1/t.pdf(t.ppf(x,self.nu),self.nu)) * t.pdf((np.sqrt(1-self.rho**2) * t.ppf(x,self.nu) - c_t)/self.rho,self.nu)
        
        def integrand(x):
            
            return (f(x) * g(x))/((self.detachment - self.attachment)/(1 - recovery))
        
        
        a = (self.attachment/(1 - recovery))
        b = 1
        
        return quad(integrand,a,b)[0]
    
    
    def synthetic_cdo_spread_computation_student(self,proba,recovery=0):
        
         spread_list = []
    
         stopping_times_sim = self.quasi_monte_carlo_stopping_times()
        
         for i in range(len(stopping_times_sim)):
            premium_list = []
            protection_list = []

            stopping_times_sample = stopping_times_sim[i]

            for j in range(len(stopping_times_sample) - 1):
                current_time = stopping_times_sample[j]
                next_time = stopping_times_sample[j + 1]
                current_expected_loss = self.expected_loss(current_time,i,recovery)
                next_expected_loss = self.expected_loss(next_time,i,recovery)
                time_diff = next_time - current_time
                discount_factor = np.exp(-self.r * current_time)

                premium_flow_difference = (next_expected_loss - current_expected_loss) * discount_factor
                premium_list.append(premium_flow_difference)

                protection_flow_difference = time_diff * (1 - current_expected_loss) * discount_factor
                protection_list.append(protection_flow_difference)
            
            epsilon = 1e-10
            premium_leg = np.sum(premium_list)
            protection_leg = np.sum(protection_list)
           
            if protection_leg != 0 and not math.isnan(premium_leg) and not math.isnan(protection_leg):
                spread_list.append(premium_leg / protection_leg + epsilon)
            
            
            mean = np.mean(spread_list)
            var = np.var(spread_list, ddof=1)
            alpha = 1 - proba 
            quantile = norm.ppf(1 - alpha/2)  # fonction quantile 
            ci_size = quantile * np.sqrt(var / np.array(spread_list).size)
        
         return pd.DataFrame({'Monte-Carlo Spread':mean,
                             'Variance':var,
                             'lower_confidence':mean - ci_size,
                             'upper_confidence':mean + ci_size},index=['Halton_Student']).T
        

In [28]:
quasi_student = Quasi_Monte_Carlo_Student_t_copula(attachment=0.3,detachment=0.7,rho=0.3,intensity=0.2,
                              T = 5, d=40,r=0.02,notionel=1,n_samples=10,nu=10)

In [29]:
%%time
df_halton_student = quasi_student.synthetic_cdo_spread_computation_student(0.95,recovery=0.2)

CPU times: user 1min, sys: 179 ms, total: 1min
Wall time: 1min


In [30]:
df_halton_student

Unnamed: 0,Halton_Student
Monte-Carlo Spread,0.117727
Variance,0.00018
lower_confidence,0.10942
upper_confidence,0.126033


**Halton Copule Normale Inverse Gaussienne a 1 facteur et Approximation LHP**


Ce code définit une classe Quasi_Monte_Carlo_Normal_Inverse_Gaussian qui permet de simuler un modèle de CDO synthétique basé sur la méthode de Quasi-Monte Carlo et la distribution normale inverse gaussienne (NIG). Voici une description détaillée étape par étape du code :

### Quasi_Monte_Carlo_Normal_Inverse_Gaussian :
- attachment : point d'attachement du CDO
- detachment : point de détachement du CDO
- rho : corrélation entre les différents actifs sous-jacents du CDO
- intensity : intensité du processus de Poisson pour les temps d'arrêt
- T : horizon de temps
- d : dimension de l'espace des actifs sous-jacents
- r : taux d'intérêt sans risque
- notionel : montant notionnel du CDO
- n_samples : nombre d'échantillons à utiliser pour les simulations Quasi-Monte Carlo
- alpha et beta : paramètres de la distribution normale inverse gaussienne (NIG)


### quasi_monte_carlo_stopping_times() 

est définie pour simuler les temps d'arrêt pour chaque actif sous-jacent en utilisant la méthode Quasi-Monte Carlo. Cette méthode est composée des sous-fonctions suivantes :
- van_der_corput_r(): génère une séquence de Van der Corput avec un nombre de termes r, une base p, et une option pour mélanger la séquence (scramble)
- van_der_corput(): génère une séquence de Van der Corput avec un nombre de termes n et une base p
- halton(): génère une séquence de Halton avec un nombre de termes n et une dimension d
- box_muller_multivariate_sample(): génère des échantillons multivariés suivant une distribution normale standard en utilisant la méthode de Box-Muller et la séquence de Halton
- correlated_brownian_multivariate_sample(): génère des échantillons multivariés avec une corrélation rho en utilisant la décomposition de Cholesky
- La méthode risk_neutral_probabilities() est définie pour calculer les probabilités neutres au risque pour un temps t.


### nig_cumulative_distribution(), nig_density_distribution() et inverse_nig_distribution() 

sont définies pour calculer respectivement la fonction de répartition, la fonction de densité et la fonction de répartition inverse de la distribution normale inverse gaussienne (NIG) pour un argument x et un paramètre de taille s.


### nig_expected_loss() 

est définie pour calculer la perte attendue pour un paramètre a et un temps time.


### synthetic_cdo_spread_normal_inverse_gaussian() 

   - Génération des temps d'arrêt avec la méthode quasi_monte_carlo_stopping_times.

   -  Pour chaque échantillon de temps d'arrêt, calcul des flux de prime et des flux de protection en utilisant la perte attendue et les probabilités neutres au risque.

   -  Calcul du spread pour chaque échantillon en divisant la somme des flux de prime par la somme des flux de protection. Ajout d'un petit nombre epsilon pour éviter les divisions par zéro.

   -  Calcul de la moyenne, de la variance et de l'intervalle de confiance des spreads obtenus à partir des échantillons.

   -  Retour des résultats sous forme d'un DataFrame pandas.





In [31]:
class Quasi_Monte_Carlo_Normal_Inverse_Gaussian:
    
    def __init__(self, attachment, detachment, rho, intensity, T, d, r, notionel, n_samples, alpha, beta):
        
        self.attachment = attachment
        self.detachment = detachment
        self.rho = rho
        self.intensity = intensity
        self.T = T
        self.d = d
        self.r = r
        self.notionel = notionel
        self.n_samples = n_samples
        self.alpha = alpha
        self.beta = beta
        
    def quasi_monte_carlo_stopping_times(self):
        
        def van_der_corput_r(r: int = 1, p: int = 2, scramble: bool = True):
            rng = np.random.default_rng()  # Initialize random number generator
            coeffs = np.arange(p)[np.newaxis]
            if scramble:
                rng.shuffle(coeffs, axis=1)
            seq = coeffs / p
            for k in range(2, r+1):
                if scramble:
                    rng.shuffle(coeffs, axis=1)
                seq = (seq + coeffs.T/p**k).reshape((1, -1))
            seq = seq.flatten()
            return seq

        def van_der_corput(n=self.n_samples, p=self.d):
            r = np.ceil(np.log(n) / np.log(p))
            seq = van_der_corput_r(int(r), p, scramble=True)
            return seq[:n]

        def halton(n=self.n_samples, d=self.d):
            n_primes = primes.first(self.d + 4)  # Prime numbers
            result = [van_der_corput(n, n_primes[k]) for k in range(d)]
            return np.array(result).T

        def box_muller_multivariate_sample():
            quasi_random_samples = halton()
            epsilon = 1e-10  # Small offset to avoid division by zero and invalid values
            array = np.empty((self.n_samples,self.d))

            for i in range(0,int(quasi_random_samples.shape[1]/2)):
              
                array[:, 2 * i] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.cos(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

                array[:, 2 * i + 1] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.sin(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

            return array

        
        def correlated_brownian_multivariate_sample():
            
            multivariate_samples = box_muller_multivariate_sample()
            
            covariance_matrix = np.full((self.d,self.d),self.rho) + (1 - self.rho) * np.eye(self.d)
            
            if np.allclose(covariance_matrix,covariance_matrix.T):
                try : 
                    L = np.linalg.cholesky(covariance_matrix)


                except np.linalg.LinAlgError : 
                    print("La matrice n'est pas sémi définie positive")
                
            # Une fois obtenue L , nous pouvons trouver l'échantillon final correspondant 
            multivariate_samples_correlated = L @ multivariate_samples.T
        
            multivariate_samples_correlated = multivariate_samples_correlated.T
            
            
            # simulons les temps d'arret
        
            stopping_times = (-1/self.intensity) * np.log(norm.cdf(multivariate_samples_correlated))
        
        
            for i in range(stopping_times.shape[0]):
            
                stopping_times[i] = np.sort(stopping_times[i])
            
            filtered_stopping_times = []
        
            for i in range(stopping_times.shape[0]):
                filtered_times = stopping_times[i][stopping_times[i] < self.T]
            
                filtered_stopping_times.append(filtered_times)
            
            filtered_stopping_times = np.array(filtered_stopping_times,dtype=object)
            
        
        
            return filtered_stopping_times
        
        
        return correlated_brownian_multivariate_sample()
    
    
    def risk_neutral_probabilities(self, t):
        
        return  1 - np.exp(-self.intensity * t)
    
    def nig_cumulative_distribution(self, x, s):
        
        gamma = np.sqrt(self.alpha ** 2 - self.beta ** 2)
        alpha, beta = s * self.alpha, s * self.beta
        loc = -s * ((-self.beta * gamma ** 2) / (self.alpha ** 2))
        scale = s * (gamma ** 3 / (self.alpha ** 2))
        
        return norminvgauss.cdf(x, alpha * scale, beta * scale, loc, scale)
    
    def nig_density_distribution(self, x, s):
        
        gamma = np.sqrt(self.alpha ** 2 - self.beta ** 2)
        alpha, beta = s * self.alpha, s * self.beta
        loc = -s * ((-self.beta * gamma ** 2) / (self.alpha ** 2))
        scale = s * (gamma ** 3 / (self.alpha ** 2))
        
        return norminvgauss.pdf(x, alpha * scale, beta * scale, loc, scale)
    
    def inverse_nig_distribution(self, x, s):
        gamma = np.sqrt(self.alpha ** 2 - self.beta ** 2)
        alpha, beta = s * self.alpha, s * self.beta
        loc = -s * ((-self.beta * gamma ** 2) / (self.alpha ** 2))
        scale = s * (gamma ** 3 / (self.alpha ** 2))
        
        return norminvgauss.ppf(x, alpha * scale, beta * scale, loc, scale)
    
    def nig_expected_loss(self,time, recovery=0):
        
        f = lambda x: self.nig_cumulative_distribution(x, (np.sqrt(1 - self.rho**2) / self.rho)) - (self.attachment/(1 - recovery))
        g = lambda x: self.nig_density_distribution((self.inverse_nig_distribution(self.risk_neutral_probabilities(time), 1 / self.rho) - np.sqrt(1 - self.rho**2) * x) / self.rho, 1) * (np.sqrt(1 - self.rho**2) / self.rho)
        f_infinity = lambda x, time: 1 - self.nig_cumulative_distribution(((self.inverse_nig_distribution(self.risk_neutral_probabilities(time), 1 / self.rho) - np.sqrt(1 - self.rho**2) * self.inverse_nig_distribution(x, (np.sqrt(1 - self.rho**2) / self.rho))) / self.rho), 1)
        
        b = self.inverse_nig_distribution((self.detachment/(1 - recovery)), (np.sqrt(1 - self.rho**2) / self.rho))
        z = self.inverse_nig_distribution((self.attachment/(1 - recovery)), (np.sqrt(1 - self.rho**2) / self.rho))
        
        integrand = lambda x: (f(x) * g(x)) / ((self.detachment - self.attachment)/(1 - recovery))
        infinity = f_infinity((self.detachment/(1 - recovery)), time)
        
        return quad(integrand, z, b)[0] + (1 - infinity)
    
    def synthetic_cdo_spread_normal_inverse_gaussian(self,proba,recovery=0):
    
        spread_list = []
        stopping_times_sim = self.quasi_monte_carlo_stopping_times()
        
        for i in range(len(stopping_times_sim)):
            premium_list = []
            protection_list = []

            stopping_times_sample = stopping_times_sim[i]

            for j in range(len(stopping_times_sample) - 1):
                current_time = stopping_times_sample[j]
                next_time = stopping_times_sample[j + 1]
                current_expected_loss = self.nig_expected_loss(current_time,recovery)
                next_expected_loss = self.nig_expected_loss(next_time,recovery)
                time_diff = next_time - current_time
                discount_factor = np.exp(-self.r * current_time)

                premium_flow_difference = (next_expected_loss - current_expected_loss) * discount_factor
                premium_list.append(premium_flow_difference)

                protection_flow_difference = time_diff * (1 - current_expected_loss) * discount_factor
                protection_list.append(protection_flow_difference)

            epsilon = 1e-10    
            premium_leg = np.sum(premium_list)
            protection_leg = np.sum(protection_list)
            
            if protection_leg != 0 and not math.isnan(premium_leg) and not math.isnan(protection_leg):
                spread_list.append(premium_leg / protection_leg + epsilon)
        
        mean = np.mean(spread_list)
        var = np.var(spread_list, ddof=1)
        alpha = 1 - proba
        quantile = norm.ppf(1 - alpha / 2)
        ci_size = quantile * np.sqrt(var / len(spread_list))
    
        return pd.DataFrame({'Monte-Carlo Spread': mean,
                             'Variance': var,
                             'lower_confidence': mean - ci_size,
                             'upper_confidence': mean + ci_size}, index=['Halton_Norm.Inv.Gauss.']).T

In [32]:
quasi_nig = Quasi_Monte_Carlo_Normal_Inverse_Gaussian(attachment=0.3,detachment=0.7,rho=0.3, intensity=0.2,T=5,
                                     d =40, r=0.02,notionel=1, n_samples=10,alpha=1,beta=1/2)

In [33]:
%%time
df_quasi_nig = quasi_nig.synthetic_cdo_spread_normal_inverse_gaussian(0.95,recovery=0.2)

CPU times: user 8min 4s, sys: 6.34 s, total: 8min 10s
Wall time: 11min 27s


In [34]:
df_quasi_nig

Unnamed: 0,Halton_Norm.Inv.Gauss.
Monte-Carlo Spread,0.1181
Variance,7.5e-05
lower_confidence,0.112722
upper_confidence,0.123478


In [35]:
pd.concat([df_quasi_gaussien,df_quasi_nig],axis=1)

Unnamed: 0,Halton_Gaussienne,Halton_Norm.Inv.Gauss.
Monte-Carlo Spread,0.122565,0.1181
Variance,0.000495,7.5e-05
lower_confidence,0.114469,0.112722
upper_confidence,0.13066,0.123478


**Sobol Gaussian Copula**

In [36]:
class Quasi_Monte_Carlo_Gaussian_CDO:
    
    def __init__(self, attachment, detachment, rho, intensity, T, d, r, notionel, n_samples):
        
        self.attachment = attachment
        self.detachment = detachment
        self.rho = rho
        self.intensity = intensity
        self.T = T
        self.d = d
        self.r = r
        self.notionel = notionel
        self.n_samples = n_samples
        
    def quasi_monte_carlo_stopping_times(self):
        
        
        
        def Sobol_sequence():
            
            return Sobol(self.d,scramble=True,seed=42).random(self.n_samples)
        

        def box_muller_multivariate_sample():
            quasi_random_samples = Sobol_sequence()
            epsilon = 1e-10  # Small offset to avoid division by zero and invalid values
            array = np.empty((self.n_samples,self.d))

            for i in range(0,int(quasi_random_samples.shape[1]/2)):
              
                array[:, 2 * i] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.cos(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

                array[:, 2 * i + 1] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.sin(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

            return array

        
        def correlated_brownian_multivariate_sample():
            
            multivariate_samples = box_muller_multivariate_sample()
            
            covariance_matrix = np.full((self.d,self.d),self.rho) + (1 - self.rho) * np.eye(self.d)
            
            if np.allclose(covariance_matrix,covariance_matrix.T):
                try : 
                    L = np.linalg.cholesky(covariance_matrix)


                except np.linalg.LinAlgError : 
                    print("La matrice n'est pas sémi définie positive")
                
            # Une fois obtenue L , nous pouvons trouver l'échantillon final correspondant 
            multivariate_samples_correlated = L @ multivariate_samples.T
        
            multivariate_samples_correlated = multivariate_samples_correlated.T
            
            
            # simulons les temps d'arret
        
            stopping_times = (-1/self.intensity) * np.log(norm.cdf(multivariate_samples_correlated))
        
        
            for i in range(stopping_times.shape[0]):
            
                stopping_times[i] = np.sort(stopping_times[i])
            
            filtered_stopping_times = []
        
            for i in range(stopping_times.shape[0]):
                filtered_times = stopping_times[i][stopping_times[i] < self.T]
            
                filtered_stopping_times.append(filtered_times)
            
            filtered_stopping_times = np.array(filtered_stopping_times,dtype=object)
            
        
        
            return filtered_stopping_times
        
        
        return correlated_brownian_multivariate_sample()
    
    
    
    def risk_neutral_probabilities(self,t):
        
        return 1 - np.exp(-self.intensity * t)
    
    
    
    def expected_loss(self, t, recovery):
        
        covariance_matrix = np.array([[1, -np.sqrt(1-self.rho**2)],
                                      [-np.sqrt(1-self.rho**2), 1]])
        
        mean = np.array([0, 0])
        
        bivariate_distribution = multivariate_normal(mean, covariance_matrix)
        
        inverse_K_1 = - norm.ppf(self.attachment/(1 - recovery))
        
        inverse_K_2 = - norm.ppf(self.detachment/(1 - recovery))
        
        x = np.array([inverse_K_1, norm.ppf(self.risk_neutral_probabilities(t))])
        
        y = np.array([inverse_K_2, norm.ppf(self.risk_neutral_probabilities(t))])

        # Calcul de la perte attendue en fonction de la distribution bivariée
        return (bivariate_distribution.cdf(x) - 
                 bivariate_distribution.cdf(y)) / ((self.detachment - self.attachment)/(1 - recovery))

    # Calcul du spread synthétique du CDO en utilisant l'approximation gaussienne
    def synthetic_cdo_spread_computation_gaussian_normal(self, proba,recovery):
        
        spread_list = []
    
        stopping_times_sim = self.quasi_monte_carlo_stopping_times()
        
        for i in range(len(stopping_times_sim)):
            premium_list = []
            protection_list = []

            stopping_times_sample = stopping_times_sim[i]

            for j in range(len(stopping_times_sample) - 1):
                current_time = stopping_times_sample[j]
                next_time = stopping_times_sample[j + 1]
                current_expected_loss = self.expected_loss(current_time,recovery)
                next_expected_loss = self.expected_loss(next_time,recovery)
                time_diff = next_time - current_time
                discount_factor = np.exp(-self.r * current_time)

                # Calcul de la différence des flux de primes entre les deux temps d'arrêt
                premium_flow_difference = (next_expected_loss - current_expected_loss) * discount_factor
                premium_list.append(premium_flow_difference)

                # Calcul de la différence des flux de protection entre les deux temps d'arrêt
                protection_flow_difference = time_diff * (1 - current_expected_loss) * discount_factor
                protection_list.append(protection_flow_difference)
            
            epsilon = 1e-10
            premium_leg = np.sum(premium_list)
            protection_leg = np.sum(protection_list)
            
            
            if protection_leg != 0 and not math.isnan(premium_leg) and not math.isnan(protection_leg):
                spread_list.append(premium_leg / protection_leg + epsilon)
            
            # Calcul de la moyenne, de la variance et de l'intervalle de confiance pour le spread synthétique
            mean = np.mean(spread_list)
            var = np.var(spread_list, ddof=1)
            alpha = 1 - proba 
            quantile = norm.ppf(1 - alpha / 2)  # fonction quantile 
            ci_size = quantile * np.sqrt(var / np.array(spread_list).size)
        
        return pd.DataFrame({'Monte-Carlo Spread': mean,
                             'Variance': var,
                             'lower_confidence': mean - ci_size,
                             'upper_confidence': mean + ci_size}, index=['Sobol_Gaussienne']).T

In [37]:
quasi_gaussian = Quasi_Monte_Carlo_Gaussian_CDO(attachment=0.3,detachment=0.7,rho=0.3,intensity=0.2,
                              T = 5, d=40,r=0.02,notionel=1,n_samples=200)

In [38]:
%%time
df_sobol_gaussian = quasi_gaussian.synthetic_cdo_spread_computation_gaussian_normal(0.95,0.2)



CPU times: user 5.47 s, sys: 952 ms, total: 6.42 s
Wall time: 4.71 s


In [39]:
df_sobol_gaussian

Unnamed: 0,Sobol_Gaussienne
Monte-Carlo Spread,0.121976
Variance,0.000323
lower_confidence,0.119481
upper_confidence,0.124472


In [40]:
pd.concat([df_quasi_gaussien,df_quasi_nig,df_halton_student,df_sobol_gaussian],axis=1)

Unnamed: 0,Halton_Gaussienne,Halton_Norm.Inv.Gauss.,Halton_Student,Sobol_Gaussienne
Monte-Carlo Spread,0.122565,0.1181,0.117727,0.121976
Variance,0.000495,7.5e-05,0.00018,0.000323
lower_confidence,0.114469,0.112722,0.10942,0.119481
upper_confidence,0.13066,0.123478,0.126033,0.124472


**Sobol Student Copula**

In [41]:
class Quasi_Monte_Carlo_Student_t_copula:
    
    def __init__(self, attachment, detachment, rho, intensity, T, d, r, notionel, n_samples, nu):
        self.attachment = attachment
        self.detachment = detachment
        self.rho = rho
        self.intensity = intensity
        self.T = T
        self.d = d
        self.r = r
        self.notionel = notionel
        self.n_samples = n_samples
        self.nu = nu
    

    def quasi_monte_carlo_stopping_times(self):
        
        
        def Sobol_sequence():
            
            return Sobol(self.d,scramble=True,seed=42).random(self.n_samples)
        

        def box_muller_multivariate_sample():
            quasi_random_samples = Sobol_sequence()
            epsilon = 1e-10  # Small offset to avoid division by zero and invalid values
            array = np.empty((self.n_samples,self.d))

            for i in range(0,int(quasi_random_samples.shape[1]/2)):
              
                array[:, 2 * i] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.cos(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

                array[:, 2 * i + 1] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.sin(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

            return array

        
        def correlated_brownian_multivariate_sample():
            
            multivariate_samples = box_muller_multivariate_sample()
            
            covariance_matrix = np.full((self.d,self.d),self.rho) + (1 - self.rho) * np.eye(self.d)
            
            if np.allclose(covariance_matrix,covariance_matrix.T):
                try : 
                    L = np.linalg.cholesky(covariance_matrix)


                except np.linalg.LinAlgError : 
                    print("La matrice n'est pas sémi définie positive")
                
            # Une fois obtenue L , nous pouvons trouver l'échantillon final correspondant 
            multivariate_samples_correlated = L @ multivariate_samples.T
        
            multivariate_samples_correlated = multivariate_samples_correlated.T
            
            
            # simulons les temps d'arret
        
            stopping_times = (-1/self.intensity) * np.log(norm.cdf(multivariate_samples_correlated))
        
        
            for i in range(stopping_times.shape[0]):
            
                stopping_times[i] = np.sort(stopping_times[i])
            
            filtered_stopping_times = []
        
            for i in range(stopping_times.shape[0]):
                filtered_times = stopping_times[i][stopping_times[i] < self.T]
            
                filtered_stopping_times.append(filtered_times)
            
            filtered_stopping_times = np.array(filtered_stopping_times,dtype=object)
            
        
        
            return filtered_stopping_times
        
        
        return correlated_brownian_multivariate_sample()
    
    
    def risk_neutral_probabilities(self,t):
        
        return 1 - np.exp(- self.intensity * t)
    

    
    
    def student_copula_percentile(self,time,index):
        
        proba = self.risk_neutral_probabilities(time)
        
        quantile = t.ppf(proba,self.nu)
        
        return quantile
    
    
    def expected_loss(self,time,index,recovery=0):
        
        def f(x):
            
            return (min(x,(self.detachment/(1 - recovery))) - (self.attachment/(1 - recovery)))
        
        def g(x):
            
            c_t = self.student_copula_percentile(time,index)
            
            return (np.sqrt(1-self.rho**2)/self.rho) * (1/t.pdf(t.ppf(x,self.nu),self.nu)) * t.pdf((np.sqrt(1-self.rho**2) * t.ppf(x,self.nu) - c_t)/self.rho,self.nu)
        
        def integrand(x):
            
            return (f(x) * g(x))/((self.detachment - self.attachment)/(1 - recovery))
        
        
        a = (self.attachment/(1 - recovery))
        b = 1
        
        return quad(integrand,a,b)[0]
    
    
    def synthetic_cdo_spread_computation_student(self,proba,recovery=0):
        
         spread_list = []
    
         stopping_times_sim = self.quasi_monte_carlo_stopping_times()
        
         for i in range(len(stopping_times_sim)):
            premium_list = []
            protection_list = []

            stopping_times_sample = stopping_times_sim[i]

            for j in range(len(stopping_times_sample) - 1):
                current_time = stopping_times_sample[j]
                next_time = stopping_times_sample[j + 1]
                current_expected_loss = self.expected_loss(current_time,i,recovery)
                next_expected_loss = self.expected_loss(next_time,i,recovery)
                time_diff = next_time - current_time
                discount_factor = np.exp(-self.r * current_time)

                premium_flow_difference = (next_expected_loss - current_expected_loss) * discount_factor
                premium_list.append(premium_flow_difference)

                protection_flow_difference = time_diff * (1 - current_expected_loss) * discount_factor
                protection_list.append(protection_flow_difference)
            
            epsilon = 1e-10
            premium_leg = np.sum(premium_list)
            protection_leg = np.sum(protection_list)
           
            if protection_leg != 0 and not math.isnan(premium_leg) and not math.isnan(protection_leg):
                spread_list.append(premium_leg / protection_leg + epsilon)
            
            
            mean = np.mean(spread_list)
            var = np.var(spread_list, ddof=1)
            alpha = 1 - proba 
            quantile = norm.ppf(1 - alpha/2)  # fonction quantile 
            ci_size = quantile * np.sqrt(var / np.array(spread_list).size)
        
         return pd.DataFrame({'Monte-Carlo Spread':mean,
                             'Variance':var,
                             'lower_confidence':mean - ci_size,
                             'upper_confidence':mean + ci_size},index=['Sobol_Student']).T
        

In [42]:
quasi_gaussian = Quasi_Monte_Carlo_Student_t_copula(attachment=0.3,detachment=0.7,rho=0.3,intensity=0.2,
                              T = 5, d=40,r=0.02,notionel=1,n_samples=10,nu=10)

In [43]:
%%time
df_nig_student = quasi_gaussian.synthetic_cdo_spread_computation_student(0.95,0.2)



CPU times: user 52.8 s, sys: 240 ms, total: 53 s
Wall time: 52.9 s


In [44]:
df_nig_student

Unnamed: 0,Sobol_Student
Monte-Carlo Spread,0.127716
Variance,0.000328
lower_confidence,0.116491
upper_confidence,0.138942


**Sobol Normal Inverse Gaussian Copula**

In [45]:
class Quasi_Monte_Carlo_Normal_Inverse_Gaussian:
    
    def __init__(self, attachment, detachment, rho, intensity, T, d, r, notionel, n_samples, alpha, beta):
        
        self.attachment = attachment
        self.detachment = detachment
        self.rho = rho
        self.intensity = intensity
        self.T = T
        self.d = d
        self.r = r
        self.notionel = notionel
        self.n_samples = n_samples
        self.alpha = alpha
        self.beta = beta
        
    def quasi_monte_carlo_stopping_times(self):
        
        
        def Sobol_sequence():
            
            return Sobol(self.d,scramble=True,seed=42).random(self.n_samples)
        

        def box_muller_multivariate_sample():
            quasi_random_samples = Sobol_sequence()
            epsilon = 1e-10  # Small offset to avoid division by zero and invalid values
            array = np.empty((self.n_samples,self.d))

            for i in range(0,int(quasi_random_samples.shape[1]/2)):
              
                array[:, 2 * i] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.cos(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

                array[:, 2 * i + 1] = np.sqrt(-2 * np.log(quasi_random_samples[:, 2 * i] + epsilon)) * np.sin(2 * np.pi * quasi_random_samples[:, 2 * i + 1])

            return array

        
        def correlated_brownian_multivariate_sample():
            
            multivariate_samples = box_muller_multivariate_sample()
            
            covariance_matrix = np.full((self.d,self.d),self.rho) + (1 - self.rho) * np.eye(self.d)
            
            if np.allclose(covariance_matrix,covariance_matrix.T):
                try : 
                    L = np.linalg.cholesky(covariance_matrix)


                except np.linalg.LinAlgError : 
                    print("La matrice n'est pas sémi définie positive")
                
            # Une fois obtenue L , nous pouvons trouver l'échantillon final correspondant 
            multivariate_samples_correlated = L @ multivariate_samples.T
        
            multivariate_samples_correlated = multivariate_samples_correlated.T
            
            
            # simulons les temps d'arret
        
            stopping_times = (-1/self.intensity) * np.log(norm.cdf(multivariate_samples_correlated))
        
        
            for i in range(stopping_times.shape[0]):
            
                stopping_times[i] = np.sort(stopping_times[i])
            
            filtered_stopping_times = []
        
            for i in range(stopping_times.shape[0]):
                filtered_times = stopping_times[i][stopping_times[i] < self.T]
            
                filtered_stopping_times.append(filtered_times)
            
            filtered_stopping_times = np.array(filtered_stopping_times,dtype=object)
            
        
        
            return filtered_stopping_times
        
        
        return correlated_brownian_multivariate_sample()
    
    
    def risk_neutral_probabilities(self, t):
        
        return  1 - np.exp(-self.intensity * t)
    
    def nig_cumulative_distribution(self, x, s):
        
        gamma = np.sqrt(self.alpha ** 2 - self.beta ** 2)
        alpha, beta = s * self.alpha, s * self.beta
        loc = -s * ((-self.beta * gamma ** 2) / (self.alpha ** 2))
        scale = s * (gamma ** 3 / (self.alpha ** 2))
        
        return norminvgauss.cdf(x, alpha * scale, beta * scale, loc, scale)
    
    def nig_density_distribution(self, x, s):
        
        gamma = np.sqrt(self.alpha ** 2 - self.beta ** 2)
        alpha, beta = s * self.alpha, s * self.beta
        loc = -s * ((-self.beta * gamma ** 2) / (self.alpha ** 2))
        scale = s * (gamma ** 3 / (self.alpha ** 2))
        
        return norminvgauss.pdf(x, alpha * scale, beta * scale, loc, scale)
    
    def inverse_nig_distribution(self, x, s):
        gamma = np.sqrt(self.alpha ** 2 - self.beta ** 2)
        alpha, beta = s * self.alpha, s * self.beta
        loc = -s * ((-self.beta * gamma ** 2) / (self.alpha ** 2))
        scale = s * (gamma ** 3 / (self.alpha ** 2))
        
        return norminvgauss.ppf(x, alpha * scale, beta * scale, loc, scale)
    
    def nig_expected_loss(self,time, recovery=0):
        
        f = lambda x: self.nig_cumulative_distribution(x, (np.sqrt(1 - self.rho**2) / self.rho)) - (self.attachment/(1 - recovery))
        g = lambda x: self.nig_density_distribution((self.inverse_nig_distribution(self.risk_neutral_probabilities(time), 1 / self.rho) - np.sqrt(1 - self.rho**2) * x) / self.rho, 1) * (np.sqrt(1 - self.rho**2) / self.rho)
        f_infinity = lambda x, time: 1 - self.nig_cumulative_distribution(((self.inverse_nig_distribution(self.risk_neutral_probabilities(time), 1 / self.rho) - np.sqrt(1 - self.rho**2) * self.inverse_nig_distribution(x, (np.sqrt(1 - self.rho**2) / self.rho))) / self.rho), 1)
        
        b = self.inverse_nig_distribution((self.detachment/(1 - recovery)), (np.sqrt(1 - self.rho**2) / self.rho))
        z = self.inverse_nig_distribution((self.attachment/(1 - recovery)), (np.sqrt(1 - self.rho**2) / self.rho))
        
        integrand = lambda x: (f(x) * g(x)) / ((self.detachment - self.attachment)/(1 - recovery))
        infinity = f_infinity((self.detachment/(1 - recovery)), time)
        
        return quad(integrand, z, b)[0] + (1 - infinity)
    
    def synthetic_cdo_spread_normal_inverse_gaussian(self,proba,recovery=0):
    
        spread_list = []
        stopping_times_sim = self.quasi_monte_carlo_stopping_times()
        
        for i in range(len(stopping_times_sim)):
            premium_list = []
            protection_list = []

            stopping_times_sample = stopping_times_sim[i]

            for j in range(len(stopping_times_sample) - 1):
                current_time = stopping_times_sample[j]
                next_time = stopping_times_sample[j + 1]
                current_expected_loss = self.nig_expected_loss(current_time,recovery)
                next_expected_loss = self.nig_expected_loss(next_time,recovery)
                time_diff = next_time - current_time
                discount_factor = np.exp(-self.r * current_time)

                premium_flow_difference = (next_expected_loss - current_expected_loss) * discount_factor
                premium_list.append(premium_flow_difference)

                protection_flow_difference = time_diff * (1 - current_expected_loss) * discount_factor
                protection_list.append(protection_flow_difference)

            epsilon = 1e-10    
            premium_leg = np.sum(premium_list)
            protection_leg = np.sum(protection_list)
            
            if protection_leg != 0 and not math.isnan(premium_leg) and not math.isnan(protection_leg):
                spread_list.append(premium_leg / protection_leg + epsilon)
        
        mean = np.mean(spread_list)
        var = np.var(spread_list, ddof=1)
        alpha = 1 - proba
        quantile = norm.ppf(1 - alpha / 2)
        ci_size = quantile * np.sqrt(var / len(spread_list))
    
        return pd.DataFrame({'Monte-Carlo Spread': mean,
                             'Variance': var,
                             'lower_confidence': mean - ci_size,
                             'upper_confidence': mean + ci_size}, index=['Sobol_Norm.Inv.Gauss.']).T

In [46]:
quasi_nig = Quasi_Monte_Carlo_Normal_Inverse_Gaussian(attachment=0.3,detachment=0.7,rho=0.3, intensity=0.2,T=5,
                                     d = 40, r=0.02,notionel=1, n_samples=5, alpha=1,beta=0.625)

In [47]:
%%time
df_sobol_nig = quasi_nig.synthetic_cdo_spread_normal_inverse_gaussian(0.95,0.2)



CPU times: user 2min 59s, sys: 2.31 s, total: 3min 1s
Wall time: 3min


In [48]:
df_sobol_nig

Unnamed: 0,Sobol_Norm.Inv.Gauss.
Monte-Carlo Spread,0.130198
Variance,0.000351
lower_confidence,0.113769
upper_confidence,0.146627


In [49]:
pd.concat([df_quasi_gaussien,df_quasi_nig,df_halton_student,df_sobol_gaussian,df_sobol_nig,df_nig_student],axis=1)

Unnamed: 0,Halton_Gaussienne,Halton_Norm.Inv.Gauss.,Halton_Student,Sobol_Gaussienne,Sobol_Norm.Inv.Gauss.,Sobol_Student
Monte-Carlo Spread,0.122565,0.1181,0.117727,0.121976,0.130198,0.127716
Variance,0.000495,7.5e-05,0.00018,0.000323,0.000351,0.000328
lower_confidence,0.114469,0.112722,0.10942,0.119481,0.113769,0.116491
upper_confidence,0.13066,0.123478,0.126033,0.124472,0.146627,0.138942


**Séquence de Kakutani**

Soit $p_1, p_2, \dots, p_d$ des nombres premiers distincts, et $y_1, \dots, y_d \in [0, 1[$ avec $y_i > \frac{1}{p_i}$. La suite de Kakutani $d$-dimensionnelle $(\xi_n)_{n\geq 1}$ est définie par
$$
\xi_{n+1} = \xi_n \oplus_p y \quad \text{en vectoriel},
$$
avec $\oplus_p$ une addition avec propagation de la retenue de gauche à droite dans l'écriture en base $p$. Par exemple, $0.120482 \oplus_{10} 0.340851 = 0.460243$.

La famille de séquences a été obtenue pour la première fois en tant que sous-produit lors de la tentative de génération de la séquence de Halton en tant qu'orbite d'une transformation ergodique. Cette extension est basée sur l'addition p-adique sur $[0, 1]$, avec $p$ entier, $p \geq 2$. Elle est également connue sous le nom de machine à addition de Kakutani. Elle est définie sur les expansions p-adiques régulières des nombres réels de $[0, 1]$ comme l'addition de gauche à droite des expansions p-adiques régulières avec report. L'expansion p-adique régulière de 1 est conventionnellement fixée à $1 = 0.(p − 1)(p − 1)(p − 1)...p$ et $1$ n'est pas considéré comme un nombre rationnel p-adique dans le reste de cette section.
Soit $\oplus_p$ cette addition (ou machine à addition de Kakutani). Ainsi, par exemple, $0.123333... \oplus_{10} 0.412777... = 0.535011...$


De manière plus formelle, si $x, y \in [0, 1]$ avec leurs expansions p-adiques régulières respectives
$0,x_1x_2\cdots x_k\cdots$ et $0,y_1y_2\cdots y_k\cdots$, alors $x \oplus_p y$ est défini par $x \oplus_p y = \sum_{k \geq 1} z_k p^{-k}$,
où la séquence $\{0, \ldots, p - 1\}$-valuée $(z_k)_{k \geq 1}$ est donnée par $z_k = (x_k + y_k + \varepsilon_{k - 1}) \mod p$ et $\varepsilon_k = 1_{\{x_k + y_k + \varepsilon_{k - 1} \geq p\}}$, $k \geq 1$,
avec $\varepsilon_0 = 0$.

   - Si $x$ ou $y$ est un nombre rationnel p-adique et $x, y \neq 1$, alors on vérifie facilement que $z_k = y_k$ ou $z_k = y_k$ pour chaque $k$ suffisamment grand, de sorte que cela définit une expansion régulière, c'est-à-dire les chiffres $(x \oplus_p y)_k$ de $x \oplus_p y$ sont $(x \oplus_p y)_k = z_k, k \geq 1$.
   - Si $x$ et $y$ ne sont pas des nombres rationnels p-adiques, il se peut que $z_k = p - 1$ pour chaque $k$ suffisamment grand, de sorte que $\sum_{k \geq 1} z_k p^{-k}$ n'est pas l'expansion p-adique régulière de $x \oplus_p y$. C'est le cas, par exemple, dans la pseudo-somme suivante :
    $0.123333... \oplus_{10} 0.412666... = 0.535999... = 0.536$
    où $(x \oplus_p y)_1 = 5$, $(x \oplus_p y)_2 = 3$, $(x \oplus_p y)_3 = 5$ et $(x \oplus_p y)_k = 9$, $k \geq 4$. Alors, pour chaque $y \in [0, 1]$, on définit la rotation p-adique associée avec angle $y$ par
    $T_{p, y}(x) := x \oplus_p y$.

In [50]:
def kakutani_addition(x, y, p):
    carry, z_digits = 0, []
    for x_k, y_k in zip(x[2:], y[2:]):
        x_k, y_k = int(x_k), int(y_k)
        z_k = (x_k + y_k + carry) % p
        carry = 1 if x_k + y_k + carry >= p else 0
        z_digits.append(z_k)
    return '0.' + ''.join(map(str, z_digits))

def generate_kakutani_sequence(p, y, n):
    sequence = []
    x = '0.' + '0' * (n - 2)
    for _ in range(n):
        x = kakutani_addition(x, y, p)
        sequence.append(float(x))
    return sequence

In [51]:
sequence = generate_kakutani_sequence(10, "0.412777", 5)
print(sequence)

[0.412, 0.824, 0.246, 0.658, 0.07]
