In [1]:
import sys
sys.path.append('../../..')
from Library.fourierABS import AnalyticalLossFunctionAbs
from scipy.optimize import minimize
import mpmath
import math
from scipy.integrate import quad, dblquad, IntegrationWarning
import warnings
import timeit
import numpy as np

import scipy
import scipy.stats as st

mpmath.pretty = True
mpmath.dps = 10
warnings.filterwarnings('error')

Here, we use Fourier Transform to approximate different expectations in the constraints. More generally, if a function $f$ verifies certain conditions (see Analysis of Fourier transform valuation formulas and applications), we have the following:
$$ \mathbb{E}\left[f(X - m)\right] = \frac{1}{2\pi} \int_{-\infty}^{\infty} \exp((iu - R)m) \mathbf{M}_X(R-iu)\hat{f}(u+iR)$$
where $\mathbf{M}_X$ is the generating function of $X$. 

For $f(x) = (x^+)^2$, $\hat{f}(u + iR) = \frac{2}{i(u + iR)^3}$. For the cross terms, namely, $f(x,y) = x^+ y^+$, 
$\hat{f}(u + iR) = \frac{1}{u_1 + i R_1}\frac{1}{u_2 + i R_2}$.\\
Moreover, the moment generating function of two correlated Compound poisson processes with intensities $\lambda_k$ and $\lambda_l$ and correlation coefficient $\rho_{kj}^*$ is given by the following formula:

$$\phi(u_k, u_l) = \sum_{m=0}^\infty \sum_{n = 0}^\infty \phi_{\mu, \sigma}(u_k)^m \phi_{\mu, \sigma}(u_l)^n Z_{m, n}( \lambda_k, \lambda_l, \rho_{kl})$$
where $\phi_{\mu, \sigma}$ is the generating moment function of a gaussian distribution with mean $\mu$ and standard deviation $\sigma$ and $Z_{m, n}(\lambda_k, \lambda_l, \rho_{kl})$ is given by the following formula:
\begin{align*}
    Z_{m, n}(\lambda_k, \lambda_l, \rho) &= \mathbb{P}(N_T^k = m, N_T^l = n)\\
    &= \mathbb{P}(P^{-1}_{\lambda_k T}(\varphi(\eta_k)) = m, P^{-1}_{\lambda_l T}(\varphi(\eta_j)) = n)\\
    &= \mathbb{P}( P_{\lambda_k T}(m - 1) < \varphi(\eta_k) \le  P_{\lambda_k T}(m), P_{\lambda_l T}(n - 1) < \varphi(\eta_l) \le  P_{\lambda_l T}(n))\\
    &= \mathbb{P}( u_{m - 1}^k < \varphi(\eta_k) \le  u_{m}^k , u_{n - 1}^l < \varphi(\eta_l) \le  u_{n}^l)\\
    &= \varphi_2(u_m^k, u_n^l, \rho_{kl}) - \varphi_2(u_m^k, u_{n - 1}^l, \rho_{kl}) - \varphi_2(u_{m - 1}^k, u_n^l, \rho_{kl}) + \varphi_2(u_{m - 1}^k, u_{n - 1}^l, \rho_{kl}) 
\end{align*}
where $\varphi_2(.,.\rho)$ is the bivariate normal distribution function with correlation $\rho$.


In [2]:
def fourier_integral1d(cplx_integrand, j, x, eta):
    def real_integrand(u):
        return np.real(cplx_integrand(u, j, x, eta))

    real_quad = quad(real_integrand, -np.inf, np.inf, epsrel=1e-4)[0]
    return real_quad

def fourier_integral2d(cplx_integrand, j, k, x, y, eta):
    def real_integrand(u, v):
        return np.real(cplx_integrand(u, v, j, k, x, y, eta))

    real_quad = dblquad(real_integrand, 
                        -np.inf, np.inf,
                        lambda x: -np.inf, lambda x: np.inf, 
                        epsrel=1e-4)[0]
    return real_quad

def m_n(lam, i, j):
    m, n = 0, 0
    lam_i, lam_j = lam[i], lam[j]
    F_0, G_0 = st.poisson.cdf(0, lam_i), st.poisson.cdf(0, lam_j)
    target_m = F_0 + G_0 - 1
    target_n = G_0 + F_0 - 1
    while target_m < 0:
        m += 1
        target_m = st.poisson.cdf(m, lam_i) + G_0 - 1
    while target_n < 0:
        n += 1
        target_n = st.poisson.cdf(n, lam_j) + F_0 - 1
    return m, n
        

def rho_min(lam, i, j):
    lam_i, lam_j = lam[i], lam[j]
    m, n = m_n(lam, i, j)
    rho_min = -lam_i * lam_j
    for k in range(m):
        for l in range(n):
            rho_min -= min(st.poisson.cdf(k, lam_i) + st.poisson.cdf(l, lam_j) - 1, 0)
    rho_min /= math.sqrt(lam_i * lam_j)
    return rho_min

def rho_max(lam, i, j):
    def min_sf(m, n):
        return min(st.poisson.sf(int(m), lam[i]), st.poisson.sf(int(n), lam[j]))
    double_sum = mpmath.nsum(min_sf, [0, math.inf], [0, math.inf])
    return (double_sum - lam[i] * lam[j]) / math.sqrt(lam[i] * lam[j])

In [5]:
class FourierCompPoisLossFunction(AnalyticalLossFunctionAbs):
    def __init__(self, T = None, lam = None, rho = None,  N = 10, mu = 0, sigma = 1, c = None, alpha = None):
        self.__T = T
        self.__N = N
        self.__lam = lam
        self.__mu = mu
        self.__sigma = sigma
        self.__alpha = alpha
        self.__dim = np.size(self.__lam)
        self.__rho = rho
        self.__rho_norm = np.ones((len(lam), len(lam)))
        for k in range(len(lam)):
            for l in range(k + 1, len(lam)):
                self.__rho_norm[k][l] = self.rho_k_l(k, l)
                self.__rho_norm[l][k] = self.__rho_norm[k][l]
        super(FourierCompPoisLossFunction, self).__init__(len(lam), c)

    def moment_generating(self, t, i):
        lam_i = self.__lam[i] * self.__T
        mu, sigma = self.__mu, self.__sigma
        normal_moment = np.exp(t * mu + (sigma * t) ** 2 / 2)
        sum_term = mpmath.nsum(lambda n : (normal_moment) ** n * st.poisson.pmf(int(n), lam_i), [0, math.inf])
        return sum_term

    def Z_m_n(self, m, n, k, l, rho_k_l):
        lam_k = self.__lam[k] * self.__T
        lam_l = self.__lam[l] * self.__T
        rho = [[1, rho_k_l], [rho_k_l, 1]]
        A_m = st.norm.ppf(st.poisson.cdf(int(m - 1), lam_k))
        B_m = st.norm.ppf(st.poisson.cdf(int(m), lam_k))
        C_n = st.norm.ppf(st.poisson.cdf(int(n - 1), lam_l))
        D_n = st.norm.ppf(st.poisson.cdf(int(n), lam_l))
        A = st.multivariate_normal.cdf([B_m, D_n],mean = [0, 0], cov = rho)
        B = st.multivariate_normal.cdf([A_m, D_n],mean = [0, 0], cov = rho)
        C = st.multivariate_normal.cdf([B_m, C_n],mean = [0, 0], cov = rho)
        D =  st.multivariate_normal.cdf([A_m, C_n],mean = [0, 0], cov = rho)
        return A - B - C + D
    
    def rho_k_l(self, k, l):
        def helper(rho_k_l):
            lam_k = self.__lam[k] * self.__T
            lam_l = self.__lam[l] * self.__T
            rho_k_l_pois = self.__rho[k][l]
            sum_term = mpmath.nsum(lambda m, n : m * n * self.Z_m_n(m, n, k, l, rho_k_l), [1, math.inf], [1, math.inf])
            return sum_term - lam_k * lam_l - rho_k_l_pois * math.sqrt(lam_k * lam_l)
        return scipy.optimize.fsolve(helper, 0)[0]

    
    def moment_generating2D(self, vec_t, k, l):
        mu, sigma = self.__mu, self.__sigma
        t_k, t_l = vec_t[0], vec_t[1]
        gen_norm_n = np.exp(t_k * mu + (sigma * t_k) ** 2 / 2)
        gen_norm_m = np.exp(t_l * mu + (sigma * t_l) ** 2 / 2)
        rho_k_l = self.__rho_norm[k][l]
        sum_term = mpmath.nsum(lambda m, n : (gen_norm_n) ** n * (gen_norm_m) ** m * self.Z_m_n(m, n, k, l, rho_k_l), [0, math.inf], [0, math.inf])
        return sum_term
    
    def e(self, m):
        return (self.__mu - m).sum()
    
    #this corresponds to the integrand when transforming the expectation corresponding to (x^+)**2
    def g_fourier_integrand(self, u, j, m_j, eta):
        i = 1j
        eta_m_iu = eta - i * u
        res = np.exp(-eta_m_iu * m_j)
        res *= self.moment_generating(eta_m_iu, j)
        res /= i * (u + i * eta) ** 3
        return res
 


    def g(self, i, m_i):
        continue_bool = True
        eta = 1.5 * np.random.rand()
        while continue_bool:
            try:
                integral = fourier_integral1d(self.g_fourier_integrand, i, m_i, eta)
                continue_bool = False
                return 1. / np.pi * integral
            except scipy.integrate.IntegrationWarning:
                #print "g not converging for x = %s, eta = %s" % (m_i, eta)
                eta = 1.5 * np.random.rand()

    
    #This corresponds to the integrand of the cross terms
    def h_fourier_integrand(self, u, v, j, k, x, y, eta):
        i = 1j
        eta_m_iu = eta - i * np.array([u, v])
        res = np.exp(np.dot(-eta_m_iu, [x, y]))
        res *= self.moment_generating2D(eta_m_iu, j, k)
        res /= (u + i*eta[0])**2 * (v + i*eta[1])**2
        return res
    

    
    def h(self, i, j, m_i, m_j):
        continue_bool = True
        eta = 1.5 * np.random.rand(2)
        while continue_bool:
            try:            
                integral = fourier_integral2d(self.h_fourier_integrand, i, j, m_i, m_j, eta)
                continue_bool = False
                return (1 / (2 * np.pi)**2) * integral
            except scipy.integrate.IntegrationWarning:
                eta = 1.5 * np.random.rand(2)
                

    def jac_g_fourier_integrand(self, u, j, m_j, eta):
        i = 1j
        eta_m_iu = eta - i * u
        res = np.exp(-eta_m_iu * m_j)
        res *= self.moment_generating(eta_m_iu, j)
        res /= (u + i * eta)**2
        return res
    
    

    def jac_g(self, i, m_i):
        continue_bool = True
        eta = 1.5 * np.random.rand()
        while continue_bool:
            try:
                integral = fourier_integral1d(self.jac_g_fourier_integrand, i, m_i, eta)
                continue_bool = False
                return (-0.5 / np.pi) * integral
            except scipy.integrate.IntegrationWarning:
                #print "f not converging for x = %s, alpha = %s" % (m_i, eta)
                eta = 1.5 * np.random.rand()
                

    def jac_h_fourier_integrand(self, u, v, j, k, x, y, eta):
        i = 1j
        eta_m_iu = eta - i * np.array([u, v])
        res = np.exp(np.dot(-eta_m_iu, [x, y]))
        res *= self.moment_generating2D(eta_m_iu, j, k)
        res /= (u + i * eta[0]) ** 2 * (-eta_m_iu[1])
        return res

    def jac_h(self, i, j, m_i, m_j):
        continue_bool = True
        eta = 1.5 * np.random.rand(2)
        while continue_bool:
            try:            
                integral = fourier_integral2d(self.jac_h_fourier_integrand, i, j, m_i, m_j, eta)
                continue_bool = False
                return (1 / (2 * np.pi)**2) * integral
            except IntegrationWarning:
                eta = 1.5 * np.random.rand(2)
                
                
    def shortfall_risk(self, m=None):
        m = self._check_argument(m)
        sum_e = self.e(m)
        sum_g, sum_h = 0., 0.
        for i, m_i in enumerate(m):
            sum_g += self.g(i, m_i)
            if self.__alpha != 0:
                for j, m_j in enumerate(m):
                    if j > i:
                        sum_h += self.h(i, j, m_i, m_j)  
        return sum_e + 0.5 * sum_g + self.__alpha * sum_h
    

    
    def shortfall_risk_jac(self, m):
        m = self._check_argument(m)
        res = []        
        for i, m_i in enumerate(m):
            partial_der = 1 + self.jac_g(i, m_i)
            if self.__alpha != 0:
                for j, m_j in enumerate(m):
                    if i != j:
                        partial_der += self.__alpha * self.jac_h(j, i, m_j, m_i)
            res.append(partial_der)
        return np.array(res)

In [6]:
#case rho = -0.9, alpha = 1
lam = [3, 3]
rho = [[1, -0.9], [-0.9, 1]]
T = 1
mu, sigma = 0, 1
c, alpha = 1, 1

maxiter = 3500
init = np.ones(len(lam))
loss = FourierCompPoisLossFunction(T, lam, rho, mu, sigma, c, alpha)
cons = ({'type': 'ineq',
         'fun' : lambda x: loss.ineq_constraint(x),
         'jac' : lambda x: loss.ineq_constraint_jac(x)})
res = minimize(loss.objective, init, jac=loss.objective_jac, constraints=cons, method='SLSQP',options={'maxiter': maxiter})

KeyboardInterrupt: 