In [1]:
%matplotlib qt
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
import math
from numpy.linalg import inv

In [2]:
def weighted_avg_and_std(values, weights):
    """
    Return the weighted average and standard deviation.

    values, weights -- Numpy ndarrays with the same shape.
    """
    average = np.average(values, weights=weights)
    # Fast and numerically precise:
    variance = np.average((values-average)**2, weights=weights)
    return (average, math.sqrt(variance))

def Kalman_filter(y, m0, P0, F, H, U, V, n, xdim, delta):
    # Updated mean and variance
    mu = np.zeros((xdim, n))
    Pu = np.zeros((xdim, xdim, n))
    # Predicted mean and variance
    mp = np.zeros((xdim, n))
    Pp = np.zeros((xdim, xdim, n))
    mp[:, 0] = m0
    Pp[:, :, 0] = P0
    for k in range(n):
        if k < n - delta:
            # Update
            z = y[:, k] - np.matmul(H, mp[:, k])
            S = np.matmul(np.matmul(H, Pp[:, :, k]), H.transpose()) + V
            K = np.matmul(np.matmul(Pp[:, :, k], H.transpose()), inv(S))
            mu[:, k] = mp[:, k] + np.matmul(K, z)
            Pu[:, :, k] = np.matmul(np.eye(xdim) - np.matmul(K, H), Pp[:, :, k])
        else:
            mu[:, k] = mp[:, k]
            Pu[:, :, k] = Pp[:, :, k]
        
        if k < n-1:
            # Prediction
            mp[:, k+1] = np.matmul(F, mu[:, k])
            Pp[:, :, k+1] = np.matmul(np.matmul(F, Pu[:, :, k]), F.transpose()) + U
    return mp, Pp, mu, Pu

def prediction(x, F, sqrtU):
    return np.matmul(F, x) + np.matmul(sqrtU, np.random.randn(sqrtU.shape[1]))

def observation(x, H, sqrtV):
    return np.matmul(H, x) + np.matmul(sqrtV, np.random.randn(sqrtV.shape[1]))


def generate_data(F, H, sqrtU, sqrtV, n, xdim, ydim):
    x = np.zeros((xdim, n))
    y = np.zeros((ydim, n))
    x[:, 0] = x0
    y[:, 0] = observation(x0, H, sqrtV)
    for k in range(1,n):
        x[:, k] = prediction(x[:, k-1], F, sqrtU)
        y[:, k] = observation(x[:, k], H, sqrtV)
    return x, y

In [3]:
# Dimensions of the state and observation
xdim = 1
ydim = 1

# Initial state
x0 = np.array([[0]])

# Observation matrix = N(yn;xn,1)
H = np.array([[1]])

# Observation noise
sqrtV = np.array([[1]])
V = np.matmul(sqrtV, sqrtV.transpose())

# Prior = N(x0;0,1)
m0 = np.array([0.0])
P0 = np.array([1.0]) 

# Evolution = N(xn+1;xn,1)
dt = 1.0
F = np.array([[1]])
sqrtU = np.array([[1]])
U = np.matmul(sqrtU, sqrtU.transpose())
# Time steps
n = 100
delta = 0

# Data generation
(x, y) = generate_data(F, H, sqrtU, sqrtV, n, xdim, ydim)

# Kalman filter
(mp, Pp, mu, Pu) = Kalman_filter(y, m0, P0, F, H, U, V, n, xdim, delta)

Xsys = x[0]
Ysys = y[0]

In [4]:
#plot of true vale and observation
plt.figure(figsize=(15,15))
plt.plot(x[0,:],'-', label='true value')
plt.plot(y[0,:], '.', label='observations')
plt.plot(mu[0,:], '-', label='Kalman filter mean')
plt.xlabel('time', fontsize = 20)
plt.ylabel('position', fontsize = 20)
plt.title('Noisy observation of 1D random walk', fontsize = 20)
plt.legend(loc='best',fontsize=15);

In [7]:
def SIS(T, N,):
    Xsis = np.zeros((T,N)) #matrix of particles for 100 times
    Wsis = np.zeros((T,N)) #matrix of weight of particles for 100 times
    Meansis = np.zeros(T) 
    SDsis = np.zeros(T)

    xsis0 = np.random.normal(0,1,N) #q(x1|y1)= u(x1) sample particle for first time
    alsis0 = norm.pdf(Ysys[0], xsis0,1)
    wsis0 = alsis0/np.sum(alsis0)  #w1=g(y1|x1)
    Xsis[0] = xsis0
    Wsis[0] = wsis0

    for t in range(0,T-1): 
        xsis = np.random.normal(Xsis[t],1,N)  #q(xn|yn,xn-1) = f(xn|xn-1)
        alsis = norm.pdf(Ysys[t+1], xsis, 1) #calculate incremental weight \alpha_n = g(yn|xn)
        wsis = alsis*Wsis[t]/np.sum(alsis*Wsis[t]) #weight of each particl at time n #wn=\alpha_n * wn-1
        Xsis[t+1] = xsis
        Wsis[t+1] = wsis
    
    for k in range(0,T):
        weighted_stats = weighted_avg_and_std(Xsis[k], Wsis[k])
        Meansis[k] = weighted_stats[0] #filtering mean time n
        SDsis[k] = weighted_stats[1] #filtering sd time n
    return Meansis, SDsis, Wsis


def SMC(T, N):
    Xsmc = np.zeros((T,N)) #matrix of particles for 100 times
    Wsmc = np.zeros((T,N)) #matrix of weight of particles for 100 times
    Meansmc = np.zeros(T)
    SDsmc = np.zeros(T)
    Xb = np.zeros((T,N)) #matrix of resampling particles

    xsmc0 = np.random.normal(0,1,N) #q(x1|y1)= u(x1) sample particle for first time
    alsmc0 = norm.pdf(Ysys[0], xsmc0,1)
    wsmc0 = alsmc0/np.sum(alsmc0)  #w1=g(y1|x1)
    Xsmc[0] = xsmc0
    Wsmc[0] = wsmc0
    Xb[0] = np.random.choice(Xsmc[0], N, p= Wsmc[0])

    for t in range(0,T-1): 
        xsmc = np.random.normal(Xb[t],1,N)  #q(xn|yn,xn-1) = f(xn|xn-1)
        alsmc = norm.pdf(Ysys[t+1], xsmc, 1) #calculate incremental weight \alpha_n = g(yn|xn)
        wsmc = alsmc/np.sum(alsmc) #weight of each particl at time n #wn=\alpha_n * wn-1
        Xsmc[t+1] = xsmc
        Wsmc[t+1] = wsmc
        Xb[t+1] = np.random.choice(Xsmc[t+1], N, p= Wsmc[t+1])
    
    for k in range(0,T):
        weighted_stats = weighted_avg_and_std(Xsmc[k], Wsmc[k])
        Meansmc[k] = weighted_stats[0] #filtering mean time n
        SDsmc[k] = weighted_stats[1] #filtering sd time n
    return Meansmc, SDsmc, Wsmc


def SMO(T, N):
    Xsmo = np.zeros((T,N)) #matrix of particles for 100 times
    Wsmo = np.zeros((T,N)) #matrix of weight of particles for 100 times
    Meansmo = np.zeros(T)
    SDsmo = np.zeros(T)
    Xb = np.zeros((T,N)) #matrix of resampling particles

    xsmo0 = np.random.normal(0,1,N) #q(x1|y1)= u(x1) sample particle for first time
    alsmo0 = norm.pdf(Ysys[0],xsmo0,1)
    wsmo0 = alsmo0/np.sum(alsmo0)  #w1=g(y1|x1)
    Xsmo[0] = xsmo0
    Wsmo[0] = wsmo0
    Xb[0] = np.random.choice(Xsmo[0], N, p= Wsmo[0])

    for t in range(0,T-1): 
        xsmo = np.random.normal(Xb[t],1,N)  #q(xn|yn,xn-1) = f(xn|xn-1)
        alsmo = norm.pdf(Ysys[t+1], xsmo, 1) #calculate incremental weight \alpha_n = g(yn|xn)
        wsmo = alsmo/np.sum(alsmo) #weight of each particl at time n #wn=\alpha_n * wn-1
        Xsmo[t+1] = xsmo
        Wsmo[t+1] = wsmo
        Xb[t+1] = xsmo
        #resampling step
        w = np.random.choice(np.arange(0,N), N, p= Wsmo[t+1]) #to find which particle to resampling
        for k in np.arange(0,N):
            for j in np.arange(0,t+2):
                Xb[j][k] = Xb[j][w[k]]
    
    for k in range(0,T):
        weighted_stats = weighted_avg_and_std(Xb[k], Wsmo[T-1])
        Meansmo[k] = weighted_stats[0] #filtering mean time n
        SDsmo[k] = weighted_stats[1] #filtering sd time n
    return Meansmo, SDsmo, Wsmo

T=100

In [8]:
#plot SIS
Meansis10 = SIS(100,10)[0] #10 particles
Meansis100 = SIS(100,100)[0] #100 particles
Meansis1000 = SIS(100,1000)[0] #1000 particles

fig, ax = plt.subplots(3,1, figsize=(15,15))
fig.suptitle('SIS filtering')

ax[0].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[0].plot(np.arange(start=1, stop=T+1, step=1),Meansis10,linestyle='-',linewidth=1,color='red',label='SIS Filter Mean')
ax[0].set_title("N=10", fontsize = 12)
ax[0].legend(loc='best',fontsize=15)

ax[1].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[1].plot(np.arange(start=1, stop=T+1, step=1),Meansis100,linestyle='-',linewidth=1,color='red',label='SIS Filter Mean')
ax[1].set_title("N=100", fontsize = 12)

ax[2].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[2].plot(np.arange(start=1, stop=T+1, step=1),Meansis1000,linestyle='-',linewidth=1,color='red',label='SIS Filter Mean')
ax[2].set_title("N=1000", fontsize = 12)
ax[2].set_xlabel('time', fontsize=12)

plt.show()

In [9]:
#plot SMC
Meansmc10 = SMC(100,10)[0] #10 particles
Meansmc100 = SMC(100,100)[0] #100 particles
Meansmc1000 = SMC(100,1000)[0] #1000 particles

fig, ax = plt.subplots(3,1, figsize=(15,15))
fig.suptitle('SMC filtering')

ax[0].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[0].plot(np.arange(start=1, stop=T+1, step=1),Meansmc10,linestyle='-',linewidth=1,color='red',label='SMC Filter Mean')
ax[0].set_title("N=10", fontsize = 12)
ax[0].legend(loc='best',fontsize=15)

ax[1].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[1].plot(np.arange(start=1, stop=T+1, step=1),Meansmc100,linestyle='-',linewidth=1,color='red',label='SMC Filter Mean')
ax[1].set_title("N=100", fontsize = 12)

ax[2].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[2].plot(np.arange(start=1, stop=T+1, step=1),Meansmc1000,linestyle='-',linewidth=1,color='red',label='SMC Filter Mean')
ax[2].set_title("N=1000", fontsize = 12)
ax[2].set_xlabel('time', fontsize=12)

plt.show()

In [10]:
#plot SMC with smoothing
Meansmo10 = SMO(100,10)[0] #10 particles
Meansmo100 = SMO(100,100)[0] #100 particles
Meansmo1000 = SMO(100,1000)[0] #1000 particles

fig, ax = plt.subplots(3,1, figsize=(15,15))
fig.suptitle('SMC smoothing')

ax[0].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[0].plot(np.arange(start=1, stop=T+1, step=1),Meansmo10,linestyle='-',linewidth=1,color='red',label='SMC Smoother Mean')
ax[0].set_title("N=10", fontsize = 12)
ax[0].legend(loc='best',fontsize=15)

ax[1].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[1].plot(np.arange(start=1, stop=T+1, step=1),Meansmo100,linestyle='-',linewidth=1,color='red',label='SMC Smoother Mean')
ax[1].set_title("N=100", fontsize = 12)

ax[2].plot(np.arange(start=1, stop=T+1, step=1),mu[0],marker='+',linestyle='-',color='blue',label='Kalman Filter mean ')
ax[2].plot(np.arange(start=1, stop=T+1, step=1),Meansmo1000,linestyle='-',linewidth=1,color='red',label='SMC Smoother Mean')
ax[2].set_title("N=1000", fontsize = 12)
ax[2].set_xlabel('time', fontsize=12)

plt.show()