## Density estimating using the Kotlarski theorem

### This notebook describes the procedure for the density estimation using the Kotlarski theorem. In order to understand the steps properly, please either consult the thesis or Li & Vuong (1998)

In [11]:
%reload_ext cython

### First some packages are imported

In [17]:
%%cython

import numpy as np
from numpy.fft import fft
import scipy 
from scipy import fftpack
from scipy import integrate
from scipy import misc
from scipy import stats
import math
import matplotlib.pyplot as plt

n = 800
np.random.seed(314711239)
#define variables
theta_1 = 1.2*np.random.randn(n)
theta_2 = 0.7*np.random.randn(n) 
epsilon_1 = 1.1*np.random.randn(n) 
epsilon_2 = 1.1*np.random.randn(n) 
x_rand_1 = theta_1 + epsilon_1
y_rand_1 = theta_1 + epsilon_2
x_rand_2 = theta_2 + epsilon_1
y_rand_2 = theta_2 + epsilon_2
((4*(np.std(x_rand_1 - y_rand_1)**5))/3*n)**(1/5)
#define functions needed for Kotlarski estimate
def emp_charak_dif (df1,df2,t):
    value = np.sqrt(1/len(df1)*sum(np.exp((0+1j)*t*(df1 - df2))))
    return value
def emp_charac_fkt(df1,df2,u1,u2):
    value = 1/len(df1)*sum(np.exp(((0+1j)*u1*df1 + (0+1j)*u2*df2)))
    return value
# Kernels
def kernel_tri(z,h):
    if 1 - (np.abs(z/h)) > 0:
        y = 1 - (np.abs(z/h))
    else:
        y = 0
    y = fft([y])
    return y.tolist()[0]
def kernel_sin(z,h):
    if np.abs(z*h) < 1:
        y = 1
    else:
        y = 0
    return y
    
def integrand_2(t,epsilon,h,df1,df2):
    return np.exp(-(0+1j)*t*epsilon)*emp_charak_dif(df1,df2,t)*kernel_sin(t,h)
    #return np.exp(-(0+1j)*t*epsilon)*emp_charak_dif(df1,df2,t)
def expint2(T,epsilon,h,df1,df2):
    a =(1/2*math.pi)*scipy.integrate.quad(integrand_2,-T,T, args=(epsilon,h,df1,df2))[0] 
    if a < 0:
        a = 0
    return a
    
def x_hat(df1,df2,t):
    return emp_charac_fkt(df1,df2,t,0)/emp_charak_dif(df1,df2,t)
def integrand_X(t,epsilon,h,df1,df2):
    return np.exp(-(0+1j)*t*epsilon)*x_hat(df1,df2,t)*kernel_sin(t,h)
def expint_X(T,epsilon,h,df1,df2):
    a = (1/2*math.pi)*scipy.integrate.quad(integrand_X,-T,T, args=(epsilon,h,df1,df2))[0]
    if a < 0:
        a = 0
    return a
# function to chose optimal bandwith in line with Glad et al. (2007)
from scipy.optimize import fsolve
def function(t):
    return np.abs(x_hat(x_rand_1,y_rand_1,t) - (1/np.sqrt(n+1)))
array_solve = np.arange(0,50,0.01)
sol_ele = []
for x in array_solve:
    a = 1/fsolve(function,x)
    sol_ele.append(a)
r = np.unique(np.round(sol_ele,3))
r = r[r > 0.2]
r = r[r < 0.8]
print(r)

[0.264 0.265 0.266 0.267 0.682 0.683]


In [None]:
#define variables we want to estimate
n = 800
np.random.seed(314711239)
#define variables
theta_1 = 1.2*np.random.randn(n)
theta_2 = 0.7*np.random.randn(n) 
epsilon_1 = 1.1*np.random.randn(n) 
epsilon_2 = 1.1*np.random.randn(n) 
x_rand_1 = theta_1 + epsilon_1
y_rand_1 = theta_1 + epsilon_2
x_rand_2 = theta_2 + epsilon_1
y_rand_2 = theta_2 + epsilon_2

In [None]:
#define functions needed for Kotlarski estimate
#empirical characteristic functions
def emp_charak_dif (df1,df2,t):
    value = np.sqrt(1/len(df1)*sum(np.exp((0+1j)*t*(df1 - df2))))
    return value
def emp_charac_fkt(df1,df2,u1,u2):
    value = 1/len(df1)*sum(np.exp(((0+1j)*u1*df1 + (0+1j)*u2*df2)))
    return value
# Kernels
def kernel_tri(z,h):
    if 1 - (np.abs(z/h)) > 0:
        y = 1 - (np.abs(z/h))
    else:
        y = 0
    y = fft([y])
    return y.tolist()[0]
def kernel_sin(z,h):
    if np.abs(z*h) < 1:
        y = 1
    else:
        y = 0
    return y
#calculate integrands and function for x and epsilon    
def integrand_2(t,epsilon,h,df1,df2):
    return np.exp(-(0+1j)*t*epsilon)*emp_charak_dif(df1,df2,t)*kernel_sin(t,h)
#individual density values of epsilon at x
def expint2(T,epsilon,h,df1,df2):
    a =(1/2*math.pi)*scipy.integrate.quad(integrand_2,-T,T, args=(epsilon,h,df1,df2))[0] 
    if a < 0:
        a = 0
    return a
    
def x_hat(df1,df2,t):
    return emp_charac_fkt(df1,df2,t,0)/emp_charak_dif(df1,df2,t)
def integrand_X(t,epsilon,h,df1,df2):
    return np.exp(-(0+1j)*t*epsilon)*x_hat(df1,df2,t)*kernel_sin(t,h)
def expint_X(T,epsilon,h,df1,df2):
    a = (1/2*math.pi)*scipy.integrate.quad(integrand_X,-T,T, args=(epsilon,h,df1,df2))[0]
    if a < 0:
        a = 0
    return a

In [None]:
# function to chose optimal bandwith in line with Glad et al. (2007)
from scipy.optimize import fsolve
def function(t):
    return np.abs(x_hat(x_rand_1,y_rand_1,t) - (1/np.sqrt(n+1)))
array_solve = np.arange(0,50,0.01)
sol_ele = []
for x in array_solve:
    a = 1/fsolve(function,x)
    sol_ele.append(a)
r = np.unique(np.round(sol_ele,3))
r = r[r > 0.2]
r = r[r < 0.8]
print(r)
#now calculate the optimal bandwith and use it in the next step

In [None]:
# calculate pdf of epsilon with MISE minimizing bandwith
def pdf_epsi(T,h,df1,df2):
    array = np.arange(-5,5,0.05)
    y_1 = []
    for x in array:
        calculus1 = expint2(T,x,h,df1,df2)
        y_1.append(calculus1)
    y_1_array = np.asarray(y_1)
    result_eps = scipy.integrate.quad(lambda epsilon: expint2(T, epsilon,h, df1,df2),-5,5)
    y_1_array_std = y_1_array / result_eps[0]
    return y_1_array_std
    

c=pdf_epsi(100,0.265, x_rand_1, y_rand_1)
c

In [None]:
array = np.arange(-5,5,0.05)
plt.scatter(array,c)
plt.hist(epsilon_1, density=True,alpha=0.5,color='red',bins=20)
plt.hist(epsilon_2, density=True,alpha=0.5,color='blue',bins=20)

In [None]:
#defining density of X
def pdf_X(T,h,df1,df2):
    array = np.arange(-5,5,0.05)
    y_1 = []
    for x in array:
        calculus1 = expint_X(T,x,h,df1,df2)
        y_1.append(calculus1)
    y_1_array = np.asarray(y_1)
    result_eps = scipy.integrate.quad(lambda epsilon: expint_X(T, epsilon,h, df1,df2),-5,5, limit=25)
    y_1_array_std = y_1_array / result_eps[0]
    return y_1_array_std
    

d=pdf_X(np.inf,0.366, x_rand_1, y_rand_1)
# spproximating x by a mixture of normals
from scipy import stats
def mixture(x,mu1,sigma1,mu2,sigma2,w):
    return w*scipy.stats.norm.pdf(x,mu1,sigma1) + (1-w)*scipy.stats.norm.pdf(x,mu2,sigma2)
    #return scipy.stats.norm.pdf(x,mu1,sigma1)
from scipy.optimize import curve_fit
popt,pcov = curve_fit(mixture,array,d, bounds=([-np.inf,-np.inf,-np.inf,-np.inf,0],[np.inf,np.inf,np.inf,np.inf,1]), maxfev = 10000)
popt

In [None]:
#plot normal mixture against estimate
array = np.arange(-5,5,0.05)
plt.scatter(array,d)
plt.hist(theta_1, density=True,alpha=0.5,color='red',bins=10)
plt.plot(array,mixture(array,popt[0],popt[1],popt[2],popt[3],popt[4]))