In [None]:
import numpy as np
import matplotlib.pyplot as plt
import csv
import time
import scipy.stats
from jax.scipy.stats import norm
import scipy.optimize
import symnum.numpy as snp
import jax.numpy as jnp
from jax.scipy.linalg import cho_solve
from jax import jit, vmap, grad, value_and_grad
from jax.lax import scan
import matplotlib.pyplot as plt
from jax.config import config
config.update('jax_enable_x64', True)
config.update('jax_platform_name', 'cpu')

In [None]:
class hypo_toy_second:
    def __init__(self, param, initial_value, step_size_data, step_size_sim, num_data, num_simulation):
        self.param = param # θ = (β, σ)  
        self.initial_value = initial_value
        self.step_size_data = step_size_data
        self.step_size_sim = step_size_sim
        self.num_data = num_data
        self.num_simulation = num_simulation

    def calc_A_q(self, θ): #θ = (β, σ)   
        dt = self.step_size_data
        return jnp.array(
            [dt, dt**2 / 2- θ[0]*dt**3 /6]
            )

    def calc_A_h(self, θ): 
        dt = self.step_size_data
        return np.array([
            [1, dt - θ[0]*dt**2 /2], 
            [0, 1 - θ[0]*dt]
            ])
    
    def calc_mu_q(self, q):
        return q

    def matrix_A(self, θ):
        dt = self.step_size_data
        return jnp.array([
            [dt, dt**2 / 2- θ[0]*dt**3 /6],
            [1, dt - θ[0]*dt**2 /2], 
            [0, 1 - θ[0]*dt]
            ]) 
    
    # This is used to generate the true trajectories of sample paths  
    def mean_one_step_sim(self, x, θ): 
        dt = self.step_size_sim
        q = x[0]
        h = x[1:]
        matrix_A = jnp.array([
            [dt, dt**2 / 2- θ[0]*dt**3 /6],
            [1, dt - θ[0]*dt**2 /2], 
            [0, 1 - θ[0]*dt]
            ]) 
        return jnp.array([q, 0, 0]) + jnp.dot(matrix_A, h) 
    
    def mean_one_step(self, current_q, current_h, θ):
        dt = self.step_size_data
        matrix_A = jnp.array([
            [dt, dt**2 / 2- θ[0]*dt**3 /6],
            [1, dt - θ[0]*dt**2 /2], 
            [0, 1 - θ[0]*dt]
            ]) 
        return jnp.array([current_q, 0, 0], dtype=float) + jnp.dot(matrix_A, current_h) 
    
    def covariance_one_step_sim(self, θ):
        dt = self.step_size_sim
        σ = θ[1]
        return σ**2*jnp.array([
            [(dt**5)/20, (dt**4)/8, (dt**3)/6], 
            [(dt**4)/8, (dt**3)/3, (dt**2)/2], 
            [(dt**3)/6, (dt**2)/2, dt]
            ])
    
    def covariance_one_step(self, θ):
        dt = self.step_size_data
        σ = θ[1]
        return σ**2*jnp.array([
            [(dt**5)/20, (dt**4)/8, (dt**3)/6], 
            [(dt**4)/8, (dt**3)/3, (dt**2)/2], 
            [(dt**3)/6, (dt**2)/2, dt]
            ])

    def generate_sample_paths(self, θ, seed=20230606):
        np.random.seed(seed)
        seq_rvs = np.random.multivariate_normal(np.zeros(3), self.covariance_one_step_sim(θ), size=self.num_simulation)
        x_0 = self.initial_value
        
        @jit
        def step_func(x, noise):
            x_next = self.mean_one_step_sim(x, θ) + noise
            return x_next, x_next 
        
        _, x_seq = scan(step_func, x_0, seq_rvs) 

        return jnp.concatenate((x_0[None], x_seq))
    
    def generate_sample_paths_scan(self, θ, seed=20230606):
        np.random.seed(seed)
        seq_rvs = np.random.multivariate_normal(np.zeros(3), self.covariance_one_step_sim(θ), size=self.num_simulation)

        x = self.initial_value 
        paths = x 
        for k in range(self.num_simulation):
            x = self.mean_one_step_sim(x, θ) + seq_rvs[k,:]
            paths = np.vstack((paths, x))
        
        return paths

    """ Implementation for forward filtering & backward smoothing, start """
    
    def prediction_covariance(self, forward_filter_covariance, θ):
        Σ =self.covariance_one_step(θ)
        A = self.matrix_A(θ)
        pred_cov =  Σ + A @ forward_filter_covariance @ A.T 
        pred_cov_qq = pred_cov[0,0]
        pred_cov_hq = pred_cov[1:,0]
        pred_cov_hh = pred_cov[1:, 1:]
        return pred_cov_qq, pred_cov_hq, pred_cov_hh
    
    def prediction_mean(self, q, forward_filter_mean, θ):
        pred_mean = self.mean_one_step(q, forward_filter_mean, θ) 
        pred_mean_q = pred_mean[0]
        pred_mean_h = pred_mean[1:]
        return pred_mean_q, pred_mean_h
    
    def forward_filter_mean_cov_one_step(self, current_q, next_q, forward_filter_mean, forward_filter_covariance, θ):
        μ_q, μ_h = self.prediction_mean(current_q, forward_filter_mean, θ)
        Λ_qq, Λ_hq, Λ_hh = self.prediction_covariance(forward_filter_covariance, θ) 
        next_filter_mean = μ_h + ((next_q - μ_q)/Λ_qq)*Λ_hq
        mat = jnp.array([[Λ_hq[0]**2, Λ_hq[0]*Λ_hq[1]], [Λ_hq[0]*Λ_hq[1], Λ_hq[1]**2]])
        next_filter_cov = Λ_hh - mat / Λ_qq
        return next_filter_mean, next_filter_cov
    
    def forward_filter_mean_cov_paths(self, q_paths, initial_mean, initial_cov, θ):
        forward_filter_mean, forward_filter_cov = initial_mean, initial_cov
        forward_filter_mean_paths, forward_filter_cov_paths = np.array([forward_filter_mean]), np.array([forward_filter_cov])

        for k in range(self.num_data):
            forward_filter_mean, forward_filter_cov = self.forward_filter_mean_cov_one_step(q_paths[k], q_paths[k+1], forward_filter_mean, forward_filter_cov, θ)
            forward_filter_mean_paths = np.vstack((forward_filter_mean_paths, [forward_filter_mean]))
            forward_filter_cov_paths = np.vstack((forward_filter_cov_paths, [forward_filter_cov]))

        return forward_filter_mean_paths, forward_filter_cov_paths

    
    def forward_filter_mean_cov_paths_scan(self, q_paths, initial_mean, initial_cov, θ):

        def step_func(filter_mean_cov, q_paths_current_next):
            filter_mean, filter_cov = filter_mean_cov
            q_current, q_next = q_paths_current_next
            filter_next = self.forward_filter_mean_cov_one_step(q_current, q_next, filter_mean, filter_cov, θ)
            return filter_next, filter_next 
        
        _, filter_mean_cov = scan(step_func, (initial_mean, initial_cov), (q_paths[:-1], q_paths[1:]))
        filter_mean, filter_cov = filter_mean_cov

        return jnp.concatenate((initial_mean[None], filter_mean)), jnp.concatenate((initial_cov[None], filter_cov)) 
    

    def contrast_function(self, θ, q_paths, initial_filter_mean, initial_filter_cov):
        log_likelihood = norm.logpdf(q_paths[0], loc = q_paths[0], scale = 1.0)
        A_q = self.calc_A_q(θ)
        Σ = self.covariance_one_step(θ)
        filter_mean = initial_filter_mean
        filter_cov = initial_filter_cov

        for k in range(self.num_data):
            vec = A_q @ filter_cov
            vec = np.dot(vec, A_q)
            std_dev = np.sqrt(vec + Σ[0,0])
            mean = self.calc_mu_q(q_paths[k]) + np.dot(A_q, filter_mean)
            log_likelihood += norm.logpdf(q_paths[k+1], loc=mean, scale=std_dev)
            filter_mean, filter_cov = self.forward_filter_mean_cov_one_step(q_paths[k], q_paths[k+1], filter_mean, filter_cov, θ)
        

        return -2*log_likelihood

    def get_contrast_function_scan(self, θ, q_paths, initial_mean, initial_cov):
        filter_mean_paths, filter_cov_paths = self.forward_filter_mean_cov_paths_scan(q_paths, initial_mean, initial_cov, θ)

        initial_log_likelihood = norm.logpdf(q_paths[0], loc = q_paths[0], scale = 1.0)
        A_q = self.calc_A_q(θ)
        Σ = self.covariance_one_step(θ)

        def step_func(loglikelihood, qset_and_filtermeancov):
            q_current, q_next, filter_mean, filter_cov = qset_and_filtermeancov
            q_mean = self.calc_mu_q(q_current) + jnp.dot(A_q, filter_mean)
            vec = A_q @ filter_cov
            scalar = jnp.dot(vec, A_q)
            q_scale = jnp.sqrt(scalar + Σ[0,0])
            loglikelihood_next = loglikelihood + norm.logpdf(q_next, q_mean, q_scale)
            return loglikelihood_next, loglikelihood_next
        
        _, log_likelihood_seq = scan(step_func, initial_log_likelihood, (q_paths[:-1], q_paths[1:], filter_mean_paths[:-1,:], filter_cov_paths[:-1,:,:]))

        return -2*log_likelihood_seq[-1]*1e-8

class hypo_toy_second_false:
    def __init__(self, param, initial_value, step_size_data, step_size_sim, num_data, num_simulation):
        self.param = param # θ = (β, σ)  
        self.initial_value = initial_value
        self.step_size_data = step_size_data
        self.step_size_sim = step_size_sim
        self.num_data = num_data
        self.num_simulation = num_simulation

    def calc_A_q(self, θ): #θ = (β, σ)   
        dt = self.step_size_data
        return jnp.array(
            [dt, 0]
            )

    def calc_A_h(self, θ): 
        dt = self.step_size_data
        return np.array([
            [1, dt], 
            [0, 1 - θ[0]*dt]
            ])
    
    def calc_mu_q(self, q):
        return q

    def matrix_A(self, θ):
        dt = self.step_size_data
        return jnp.array([
            [dt, 0],
            [1, dt], 
            [0, 1 - θ[0]*dt]
            ])  
    
    def mean_one_step(self, current_q, current_h, θ):
        dt = self.step_size_data
        matrix_A = jnp.array([
            [dt, 0],
            [1, dt], 
            [0, 1 - θ[0]*dt]
            ]) 
        return jnp.array([current_q, 0, 0]) + jnp.dot(matrix_A, current_h) 
    
    def covariance_one_step(self, θ):
        dt = self.step_size_data
        σ = θ[1]
        return σ**2*jnp.array([
            [(dt**5)/20, (dt**4)/8, (dt**3)/6], 
            [(dt**4)/8, (dt**3)/3, (dt**2)/2], 
            [(dt**3)/6, (dt**2)/2, dt]
            ])
    
    def prediction_covariance(self, forward_filter_covariance, θ):
        Σ =self.covariance_one_step(θ)
        A = self.matrix_A(θ)
        pred_cov =  Σ + A @ forward_filter_covariance @ A.T 
        pred_cov_qq = pred_cov[0,0]
        pred_cov_hq = pred_cov[1:,0]
        pred_cov_hh = pred_cov[1:, 1:]
        return pred_cov_qq, pred_cov_hq, pred_cov_hh
    
    def prediction_mean(self, q, forward_filter_mean, θ):
        pred_mean = self.mean_one_step(q, forward_filter_mean, θ) 
        pred_mean_q = pred_mean[0]
        pred_mean_h = pred_mean[1:]
        return pred_mean_q, pred_mean_h
    
    def forward_filter_mean_cov_one_step(self, current_q, next_q, forward_filter_mean, forward_filter_covariance, θ):
        μ_q, μ_h = self.prediction_mean(current_q, forward_filter_mean, θ)
        Λ_qq, Λ_hq, Λ_hh = self.prediction_covariance(forward_filter_covariance, θ) 
        next_filter_mean = μ_h + ((next_q - μ_q)/Λ_qq)*Λ_hq
        mat = jnp.array([[Λ_hq[0]**2, Λ_hq[0]*Λ_hq[1]], [Λ_hq[0]*Λ_hq[1], Λ_hq[1]**2]])
        next_filter_cov = Λ_hh - mat / Λ_qq
        return next_filter_mean, next_filter_cov
    
    def forward_filter_mean_cov_paths(self, q_paths, initial_mean, initial_cov, θ):
        forward_filter_mean, forward_filter_cov = initial_mean, initial_cov
        forward_filter_mean_paths, forward_filter_cov_paths = np.array([forward_filter_mean]), np.array([forward_filter_cov])

        for k in range(self.num_data):
            forward_filter_mean, forward_filter_cov = self.forward_filter_mean_cov_one_step(q_paths[k], q_paths[k+1], forward_filter_mean, forward_filter_cov, θ)
            forward_filter_mean_paths = np.vstack((forward_filter_mean_paths, [forward_filter_mean]))
            forward_filter_cov_paths = np.vstack((forward_filter_cov_paths, [forward_filter_cov]))

        return forward_filter_mean_paths, forward_filter_cov_paths

    
    def forward_filter_mean_cov_paths_scan(self, q_paths, initial_mean, initial_cov, θ):
        
        def step_func(filter_mean_cov, q_paths_current_next):
            filter_mean, filter_cov = filter_mean_cov
            q_current, q_next = q_paths_current_next
            filter_next = self.forward_filter_mean_cov_one_step(q_current, q_next, filter_mean, filter_cov, θ)
            return filter_next, filter_next 
        
        _, filter_mean_cov = scan(step_func, (initial_mean, initial_cov), (q_paths[:-1], q_paths[1:]))
        filter_mean, filter_cov = filter_mean_cov

        return jnp.concatenate((initial_mean[None], filter_mean)), jnp.concatenate((initial_cov[None], filter_cov)) 
    

    def contrast_function(self, θ, q_paths, initial_filter_mean, initial_filter_cov):
        log_likelihood = norm.logpdf(q_paths[0], loc = q_paths[0], scale = 1.0)
        A_q = self.calc_A_q(θ)
        Σ = self.covariance_one_step(θ)
        filter_mean = initial_filter_mean
        filter_cov = initial_filter_cov

        for k in range(self.num_data):
            vec = A_q @ filter_cov
            vec = np.dot(vec, A_q)
            std_dev = np.sqrt(vec + Σ[0,0])
            mean = self.calc_mu_q(q_paths[k]) + np.dot(A_q, filter_mean)
            log_likelihood += norm.logpdf(q_paths[k+1], loc=mean, scale=std_dev)
            filter_mean, filter_cov = self.forward_filter_mean_cov_one_step(q_paths[k], q_paths[k+1], filter_mean, filter_cov, θ)
        

        return -2*log_likelihood

    def get_contrast_function_scan(self, θ, q_paths, initial_mean, initial_cov):
        filter_mean_paths, filter_cov_paths = self.forward_filter_mean_cov_paths_scan(q_paths, initial_mean, initial_cov, θ)

        initial_log_likelihood = norm.logpdf(q_paths[0], loc = q_paths[0], scale = 1.0)
        A_q = self.calc_A_q(θ)
        Σ = self.covariance_one_step(θ)

        def step_func(loglikelihood, qset_and_filtermeancov):
            q_current, q_next, filter_mean, filter_cov = qset_and_filtermeancov
            q_mean = self.calc_mu_q(q_current) + jnp.dot(A_q, filter_mean)
            vec = A_q @ filter_cov
            scalar = jnp.dot(vec, A_q)
            q_scale = jnp.sqrt(scalar + Σ[0,0])
            loglikelihood_next = loglikelihood + norm.logpdf(q_next, q_mean, q_scale)
            return loglikelihood_next, loglikelihood_next
        
        _, log_likelihood_seq = scan(step_func, initial_log_likelihood, (q_paths[:-1], q_paths[1:], filter_mean_paths[:-1,:], filter_cov_paths[:-1,:,:]))

        return -2*log_likelihood_seq[-1]*1e-8

Computation of maximum likelihood estimates

In [None]:
# setting 
dt_simulation = 1e-4 # step size for synthetic data 
dt_obs = 5*1e-4
 # step size for the observation 
T = 1000 # Time length of data step
n_simulation = int(T / dt_simulation)
sub_interval = int(dt_obs/dt_simulation)
n_data = int(T / dt_obs) # number of data 
θ = jnp.array([2.0, 4.0]) # param θ = σ
x_0 = jnp.array([0.0, 0.0, 0.0]) # initial value  
model_true = hypo_toy_second(θ, x_0, dt_obs, dt_simulation, n_data, n_simulation)
model_false = hypo_toy_second_false(θ, x_0, dt_obs, dt_simulation, n_data, n_simulation)
initial_mean = jnp.array([0.0, 0.0])
initial_cov = jnp.array([[1.0, 0.0], [0.0, 1.0]])

In [None]:
# Optimisation
num_sampling = 100
β_sample_true = np.empty((num_sampling))
σ_sample_true = np.empty((num_sampling))
β_sample_false = np.empty((num_sampling))
σ_sample_false = np.empty((num_sampling)) 
seed = 20230615

for k in range(num_sampling):
    print("Compute the observations -- Start")
    paths = model_true.generate_sample_paths(θ, seed)
    print("Compute the observations -- End")
    q_paths_sim = paths[:, 0]
    q_paths_obs = q_paths_sim[::sub_interval] 
    arg = (q_paths_obs, initial_mean, initial_cov)
    count_true = 0
    count_false = 0
    
    def cbf_true(X):
        global count_true
        count_true += 1
        f = model_true.get_contrast_function_scan(X, q_paths_obs, initial_mean, initial_cov)
        print('%d\t%f\t%f\t%f' % (count_true, X[0], X[1], f))
    
    def cbf_false(X):
        global count_false
        count_false += 1
        f = model_false.get_contrast_function_scan(X, q_paths_obs, initial_mean, initial_cov)
        print('%d\t%f\t%f\t%f' % (count_false, X[0], X[1], f))
        
    # θ_0 = jnp.array([2.0, 4.0])
    print("Optimisation for True -- Start")
    res_true = scipy.optimize.minimize(model_true.get_contrast_function_scan, θ, args=arg, method='Nelder-Mead', callback=cbf_true, options={"maxiter":100})
    print("Optimisation for True --End")
    print(res_true)
    print(k)
    print("Optimisation for False -- Start")
    res_false = scipy.optimize.minimize(model_false.get_contrast_function_scan, θ, args=arg, method='Nelder-Mead', callback=cbf_false, options={"maxiter":100})
    print("Optimisation for False --End")
    print(res_false)
    print(k)
    β_sample_true[k] = res_true.x[0]
    σ_sample_true[k] = res_true.x[1]
    β_sample_false[k] = res_false.x[0]
    σ_sample_false[k] = res_false.x[1]
    seed += 1

In [None]:
f = open(f'MLE_first_example_true_drift_T={T}_dt_obs_{dt_obs}_dt_sim_{dt_simulation}.csv', 'w')
writer = csv.writer(f, delimiter='\t')
writer.writerow(β_sample_true)
writer.writerow(σ_sample_true)
f.close()

f = open(f'MLE_first_example_incorrect_drift_T={T}_dt_obs_{dt_obs}_dt_sim_{dt_simulation}.csv', 'w')
writer = csv.writer(f, delimiter='\t')
writer.writerow(β_sample_false)
writer.writerow(σ_sample_false)
f.close()