#### Parallel Cluster Initialization with MPI4py: This could only be run on a HPC cluster

In [None]:
# The is only relevant to running mpi4py in a Jupyter notebook.
import ipyparallel
cluster=ipyparallel.Client(profile='mpi_tutorial')
print("IDs:",cluster.ids)

In [None]:
%%px
from mpi4py import MPI

In [None]:
%%px 
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print ("I'm rank %d of %d on %s" %(rank,size,MPI.Get_processor_name()))

#### Packages Import

In [7]:
%%px
import numpy as np
from numpy import math
from scipy.stats import norm
from scipy import stats
import matplotlib.pyplot as plt
import progressbar
import time
import datetime

#### Model Specification: OU Process
 1. $dX_{t} = \theta_{1}(\theta_{2} - X_{t})dt + \sigma dW_{t}$, $Y_{t}|X_{t} \sim \mathcal{N}(X_{t}, \theta_{3}^2)$
 2. $\mathbb{E}[X_{t}] = x_{0} e^{-\theta_1t} + \theta_{2} (1-e^{-\theta_{1}t})$, $Var[X_{t}] = \frac{\sigma^{2}}{2\theta_{1}}(1-e^{-2t\theta_1})$
 3. $Y_{1},Y_{2},...$ mutually independent, $Y_{t} \sim_{i.i.d.} \mathcal{N}(\mathbb{E}[X_{t}], \theta_{3}^2 + Var[X_{t}])$, for $t \in \mathbb{N}_{0}$

In [8]:
%%px

initial_val = 1
sigma = 0.5
theta = np.array([1,0,np.sqrt(0.2)])


def diff_coef(x, dt, dw):
    return sigma*np.math.sqrt(dt)*dw

def drift_coef(x, dt):
    return theta[0]*(theta[1]-x)*dt

# Log-scaled unnormalized likelihood function p(y|x)
def likelihood_logscale(y, x):
    d = (y-x)
    gn = -1/2*(d**2/(theta[2]**2))
    return gn

def likelihood_update(y,un,unormal_weight):
    gamma = math.sqrt(0.2)
    d = (y-un)
    gn1 = -1/2*(d**2/(theta[2]**2)) + unormal_weight
    return gn1

def sig_mean(t,theta):
    return initial_val*np.exp(-theta[0]*t) + theta[1]*(1-np.exp(-theta[0]*t))

## Used only when theta[0] != 0
def sig_var(t,theta):
    return (sigma**2 / (2*theta[0])) * (1-np.exp(-2*theta[0]*t))

def gen_data(T):
    Y = np.zeros(T+1)
    for t in range(T+1):
        std = np.sqrt(sig_var(t,theta) + theta[2]**2)
        Y[t] = sig_mean(t,theta) + std * np.random.randn(1)
    return Y

def Kalmanfilter(T,Y):
    
    m = np.zeros((T+1))
    mhat = np.zeros((T+1))
    c = np.zeros((T+1))
    a = theta[0]
    s = sigma
    # observational noise variance is gam^2*I
    gam = theta[2]
    # dynamics noise variance is sig^2*I
    sig = np.sqrt(s**2/2/a*(1-np.exp(-2*a)))
    # dynamics determined by A
    A = np.exp(-a)
    # initial mean&covariance
    m[0] = initial_val
    c[0] = 0
    H = 1
    # solution & assimilate!
    for t in range(T):
        mhat[t] = A*m[t] + theta[1]*(1-A)
        chat = A*c[t]*A + sig**2
        ########################
        d = Y[t+1] - H*mhat[t]
        # Kalmab Gain
        K = (chat*H) / (H*chat*H + gam**2)
        # Mean Update
        m[t+1] = mhat[t] + K*d
        # Covariance update
        c[t+1] = (1-K*H)*chat
    tv = m[T]
    return tv

def Kalmanfilter_path(T,Y):
    
    m = np.zeros((T+1))
    mhat = np.zeros((T+1))
    c = np.zeros((T+1))
    a = theta[0]
    s = sigma
    # observational noise variance is gam^2*I
    gam = theta[2]
    # dynamics noise variance is sig^2*I
    sig = np.sqrt(s**2/2/a*(1-np.exp(-2*a)))
    # dynamics determined by A
    A = np.exp(-a)
    # initial mean&covariance
    m[0] = initial_val
    c[0] = 0
    H = 1
    # solution & assimilate!
    for t in range(T):
        mhat[t] = A*m[t] + theta[1]*(1-A)
        chat = A*c[t]*A + sig**2
        ########################
        d = Y[t+1] - H*mhat[t]
        # Kalmab Gain
        K = (chat*H) / (H*chat*H + gam**2)
        # Mean Update
        m[t+1] = mhat[t] + K*d
        # Covariance update
        c[t+1] = (1-K*H)*chat
    return m

#### Main Function

In [9]:
%%px

# Resampling - input one-dimensional particle x
def resampling(weight, gn, x, N):
    ess = 1/((weight**2).sum())
    if ess <= (N/2):
        ## Sample with uniform dice
        dice = np.random.random_sample(N)
        ## np.cumsum obtains CDF out of PMF
        bins = np.cumsum(weight)
        ## np.digitize gets the indice of the bins where the dice belongs to 
        x_hat = x[np.digitize(dice,bins)]
        ## after resampling we reset the accumulating weight
        gn = np.zeros(N)
    if ess > (N/2):
        x_hat = x
    
    return x_hat, gn

# Coupled Wasserstein Resampling
def coupled_wasserstein(fine_weight, coarse_weight, gn, gc, fine_par, coarse_par, N):
    ess = 1/((fine_weight**2).sum())
    fine_hat = fine_par
    coarse_hat = coarse_par
    if ess <= (N/2):
        # Sort in ascending order of particles
        ind = np.argsort(fine_par[:])
        inc = np.argsort(coarse_par[:])
        fine_par = fine_par[ind]
        fine_weight = fine_weight[ind]
        coarse_par = coarse_par[inc]
        coarse_weight = coarse_weight[inc]
        # Sample with uniform dice
        dice = np.random.random_sample(N)
        # CDF
        bins = np.cumsum(fine_weight)
        bins1 = np.cumsum(coarse_weight)
        # get the indices of the bins where the dice belongs to
        fine_hat = fine_par[np.digitize(dice, bins)]
        coarse_hat = coarse_par[np.digitize(dice, bins1)]
        # reset accumulating weight after resampling
        gn = np.zeros(N)
        gc = np.zeros(N)
    if ess > (N/2):
        fine_hat = fine_par
        coarse_hat = coarse_par

    return fine_hat, gn, coarse_hat, gc

# Maixmally Coupled Resampling
def coupled_maximal(fine_weight, coarse_weight, gn, gc, fine_par, coarse_par, N):
    ess = 1/((fine_weight**2).sum())
    if ess <= (N/2):
        # Maximal coupled resampling
        fine_hat, coarse_hat = maximal_resample(fine_weight, coarse_weight, fine_par, coarse_par, N)
        # reset accumulating weight after resampling
        gn = np.zeros(N)
        gc = np.zeros(N)
    if ess > (N/2):
        fine_hat = fine_par
        coarse_hat = coarse_par
    return fine_hat, gn, coarse_hat, gc

def maximal_resample(weight1,weight2,x1,x2,N):
    # Initialize
    x1_hat = np.zeros(N)
    x2_hat = np.zeros(N)

    # Calculating many weights
    unormal_min_weight = np.minimum(weight1, weight2)
    min_weight_sum = np.sum(unormal_min_weight)
    min_weight = unormal_min_weight / min_weight_sum
    unormal_reduce_weight1 = weight1 - unormal_min_weight
    unormal_reduce_weight2 = weight2 - unormal_min_weight

    ## Sample with uniform dice
    dice = np.random.random_sample(N)
    ## [0] takes out the numpy array which is suitable afterwards
    coupled = np.where(dice <= min_weight_sum)[0]
    independ = np.where(dice > min_weight_sum)[0]
    ncoupled = np.sum(dice <= min_weight_sum)
    nindepend = np.sum(dice > min_weight_sum)
    
    if ncoupled>=0:
        dice1 = np.random.random_sample(ncoupled)
        bins = np.cumsum(min_weight)
        x1_hat[coupled] = x1[np.digitize(dice1,bins)]
        x2_hat[coupled] = x2[np.digitize(dice1,bins)]
   
    ## nindepend>0 implies min_weight_sum>0 imples np.sum(unormal_reduce_weight*) is positive, thus the division won't report error
    if nindepend>0:
        reduce_weight1 = unormal_reduce_weight1 / np.sum(unormal_reduce_weight1)
        reduce_weight2 = unormal_reduce_weight2 / np.sum(unormal_reduce_weight2)
        dice2 = np.random.random_sample(nindepend)
        bins1 = np.cumsum(reduce_weight1)
        bins2 = np.cumsum(reduce_weight2)
        x1_hat[independ] = x1[np.digitize(dice2,bins1)]
        x2_hat[independ] = x2[np.digitize(dice2,bins2)]
        
    return x1_hat, x2_hat


def Particle_filter(l,T,N,Y):
    hl = 2**(-l)
    un = np.zeros(N)+initial_val
    un_hat = un
    gn = np.zeros(N)
    for t in range(T):
        un_hat = un
        for dt in range(2**l):
            dw = np.random.randn(N)
            un = un + drift_coef(un, hl) + diff_coef(un, hl, dw)
        # Cumulating weight function    
        gn = likelihood_logscale(Y[t+1], un) + gn
        what = np.exp(gn-np.max(gn))
        wn = what/np.sum(what)
        
        # Wasserstein resampling
        un_hat, gn = resampling(wn, gn, un, N)
    
    return(np.sum(un*wn))


def Coupled_particle_filter_wasserstein(l,T,N,Y):
    hl = 2**(-l)
    ## Initial value
    un1 = np.zeros(N) + initial_val
    cn1 = np.zeros(N) + initial_val
    gn = np.ones(N)
    gc = np.ones(N)
    for t in range(T):
        un = un1
        cn = cn1
        for dt in range(2**(l-1)):
            dw = np.random.randn(2,N)
            for s in range(2):
                un = un + drift_coef(un, hl) + diff_coef(un, hl, dw[s,:])
            cn = cn + drift_coef(cn, hl*2) + diff_coef(cn, hl, (dw[0,:] + dw[1,:]))
        
        ## Accumulating Weight Function
        gn = likelihood_update(Y[t+1], un, gn)
        what = np.exp(gn-np.max(gn))
        wn = what/np.sum(what)
        gc = likelihood_update(Y[t+1], cn, gc)
        wchat = np.exp(gc-np.max(gc))
        wc = wchat/np.sum(wchat)
        ## Wassersteing Resampling 
        un1, gn, cn1, gc = coupled_wasserstein(wn,wc,gn,gc,un,cn,N)

    return(np.sum(un*wn-cn*wc))


def Coupled_particle_filter_maximal(l,T,N,Y):
    hl = 2**(-l)
    ## Initial value
    un1 = np.zeros(N) + initial_val
    cn1 = np.zeros(N) + initial_val
    gn = np.ones(N)
    gc = np.ones(N)
    for t in range(T):
        un = un1
        cn = cn1
        for dt in range(2**(l-1)):
            dw = np.random.randn(2,N)
            for s in range(2):
                un = un + drift_coef(un, hl) + diff_coef(un, hl, dw[s,:])
            cn = cn + drift_coef(cn, hl*2) + diff_coef(cn, hl, (dw[0,:] + dw[1,:]))
        
        ## Accumulating Weight Function
        gn = likelihood_update(Y[t+1], un, gn)
        what = np.exp(gn-np.max(gn))
        wn = what/np.sum(what)
        gc = likelihood_update(Y[t+1], cn, gc)
        wchat = np.exp(gc-np.max(gc))
        wc = wchat/np.sum(wchat)
        ## Wassersteing Resampling 
        un1, gn, cn1, gc = coupled_maximal(wn,wc,gn,gc,un,cn,N)

    return(np.sum(un*wn-cn*wc))

def coef(x, y): 
    # number of observations/points 
    n = np.size(x) 
  
    # mean of x and y vector 
    m_x, m_y = np.mean(x), np.mean(y) 
  
    # calculating cross-deviation and deviation about x 
    SS_xy = np.sum(y*x) - n*m_y*m_x 
    SS_xx = np.sum(x*x) - n*m_x*m_x 
  
    # calculating regression coefficients 
    b_1 = SS_xy / SS_xx 
    b_0 = m_y - b_1*m_x 
  
    return(b_0, b_1) 

def num_coupled_par(p, p_max, const):
    return int(2**(p+2*p_max) * (p_max**2) * const * c3)

def num_par(p, p_max, const):
    return int(2**(p+2*p_max) * (p_max**2) * const * c2)

def prob_l_func(max_val):
    prob = np.zeros(max_val)
    for l in range(max_val):
        prob[l] = 2**(-l*beta)
    prob = prob / np.sum(prob)
    return prob

def prob_p_func(max_val):
    prob = np.zeros(max_val)
    for p in range(max_val):
        prob[p] = 2**(-p)
    prob = prob / np.sum(prob)
    return prob

def Xi_zero(T,p_prob,p_max,const,Y):
    # sample the variable P
    p = int(np.random.choice(p_max, 1, p=p_prob)[0])
    #print('p_val is',p)
    # construct the estimator
    Xi_zero = (Particle_filter(0,T,num_par(p, p_max, const),Y) - Particle_filter(0,T,num_par(p-1, p_max, const),Y)) / p_prob[p]
    return Xi_zero

def Xi_nonzero(l,T,p_prob,p_max,const,Y):
    # sample the variable P
    p = int(np.random.choice(p_max, 1, p=p_prob)[0])
    #print('p_val is',p)
    # construct the estimator
    Xi = (Coupled_particle_filter_maximal(l,T,num_coupled_par(p,p_max,const),Y) - Coupled_particle_filter_maximal(l,T,num_coupled_par(p-1,p_max,const),Y)) / p_prob[p]
    return Xi

def Xi(T,l_prob,l_max,p_prob,p_max,const,Y):
    l = int(np.random.choice(l_max, 1, p=l_prob)[0])
    #print('value of l is',l)
    if l==0:
        Xi = Xi_zero(T,p_prob,p_max,const,Y)
    if l!=0:
        Xi = Xi_nonzero(l,T,p_prob,p_max,const,Y)
    est = Xi / l_prob[l]
    return est
    
def parallel_particle_filter(M,T,max_val,const,Y):
    l_max = max_val
    p_max = max_val
    l_prob = prob_l_func(l_max)
    p_prob = prob_p_func(p_max)
    est_summand = np.zeros(M)
    for m in range(M):
        est_summand[m] = Xi(T,l_prob,l_max,p_prob,p_max,const,Y)
    return (np.mean(est_summand))

def parallel_particle_filter_record_progbar(M,T,max_val,const,Y):
    l_max = max_val
    p_max = max_val
    l_prob = prob_l_func(l_max)
    p_prob = prob_p_func(p_max)
    est_summand = np.zeros(M)
    pr = progressbar.ProgressBar(max_value=M).start()
    for m in range(M):
        est_summand[m] = Xi(T,l_prob,l_max,p_prob,p_max,const,Y)
        pr.update(m+1)
    pr.finish()
    return est_summand

def parallel_particle_filter_record(M,T,max_val,const,Y):
    l_max = max_val
    p_max = max_val
    l_prob = prob_l_func(l_max)
    p_prob = prob_p_func(p_max)
    est_summand = np.zeros(M)
    for m in range(M):
        est_summand[m] = Xi(T,l_prob,l_max,p_prob,p_max,const,Y)
    return est_summand

# For OU process, beta=2
def num_ml_coupled(l,lmax,const):
    return 2**(2*lmax-1.5*l) * const * c3

def num_ml_single(l,lmax,const):
    return 2**(2*lmax-1.5*l) * const * c2

def mlpf(T,max_val,const,Y):
    L = max_val
    level_est = np.zeros(L)
    level_est[0] = Particle_filter(0,T,int(num_ml_single(0,L,const)),Y)
    for l in range(1,L):
        level_est[l] = Coupled_particle_filter_maximal(l,T,int(num_ml_coupled(l,L,const)),Y)
    return np.sum(level_est)

#### Simulation Setup Example
1. At discretization level $l=2$, aim at variance level of 10^{-7} for ppf (parallel particle filter), this is so that the variance is banlanced with the square bias, which we have already obtained. This is done by using $C=10^6$ on a single processor, with $M=1$.

2. Note that the PPF estimator has variance $Var(\sum_{i=1}^{M}\Xi_{i}) = \mathcal{O}(C^{-1}M^{-1})$, this means we can achieve the same variance level by using $C=10^3$ and $M=10^3$. We use $10^3$ parallel cores to obtain $i.i.d.$ realizations of $\Xi$ at the same time, this will give us a giant speed up. The simulation is set out to find how much is the speed up, at the same time ensuring $Var(\sum_{i=1}^{M}\Xi_{i}) \approx Bias(\sum_{i=1}^{M}\Xi_{i}) \approx 10^{-7}$.

In [10]:
%%px

T = 100
data_path = np.load('ou_model_data_path.npy')
c2, c3, beta = np.load('ou_fit_values.npy')
max_val=2
M=1000
const=1000
true_val = Kalmanfilter(T,data_path)

#### Parallel Implementaion of PPF
1. We need to parallel compute the $M$ realizations. We record the time needed for such one parallel realization.
2. We check the MSE of such PPF with $M$ values, this can be done in any fashion.
3. We can then compare MLPF with PPF 's cost for similar MSE targets.

In [6]:
%%px

# Used to construct a parallel - PPF: evaluate the cost of it
# Use M cores to get M repe of it and record the time
def multi_xi(seed_val):
    l_max = max_val
    np.random.seed(seed_val)
    l = int(np.random.choice(l_max, 1, p=l_prob)[0])
    #print('value of l is',l)
    if l==0:
        Xi = Xi_zero(T,p_prob,p_max,const,Y)
    if l!=0:
        Xi = Xi_nonzero(l,T,p_prob,p_max,const,Y)
    est = Xi / l_prob[l]
    return est

# Used to obtain MSE of PPF with M. 
# Use Rep_num of cores to get repetition of it and compute the (sample) MSE.
def multi_ppf(seed_val):
    np.random.seed(seed_val)
    l_max = max_val
    p_max = max_val
    l_prob = prob_l_func(l_max)
    p_prob = prob_p_func(p_max)
    est_summand = np.zeros(M)
    for m in range(M):
        est_summand[m] = Xi(T,l_prob,l_max,p_prob,p_max,const,Y)
    return (np.mean(est_summand))

#### MPI4py HPC Implementation

In [None]:
%%px
iter_num = 0

rank = comm.Get_rank()
size = comm.Get_size()

## Every iteration should have different initial_seed values
initial_seed = iter_num*(size)

seed_val_rankwise = initial_seed + rank

#### (I) Cost record of M parallel implementations for PPF estimate

In [None]:
%%px

stime = time.time()
xi_reptition = np.zeros(1)
xi_reptition = multi_xi(seed_val_rankwise)

result = np.zeros(size)
comm.Gather(xi_reptition,result,root=0)
if rank == 0 :
    x = np.asarray(result)
    ppf_estimate = np.mean(x)
    print('HPC-PPF outputs:',ppf_estimate)

etime = time.time()
time_len = str(datetime.timedelta(seconds=etime-stime))
print("Time cost for HPC-PPF is:",time_len)

#### (II) MSE compuation for PPF estimate

In [None]:
%%px

ppf_reptition = np.zeros(1)
ppf_reptition = multi_ppf(seed_val_rankwise)

result = np.zeros(size)
comm.Gather(xi_reptition,result,root=0)
if rank == 0 :
    x = np.asarray(result)
    
    mse_ppf = np.mean((x-true_val)**2)
    var_ppf = np.var(x)
    square_bias_ppf = mse_ppf - var_ppf
    
    print('HPC-PPF has MSE:',mse_ppf, 'Variance:',var_ppf, 'Square Bias:',square_bias_ppf)