# Increasing Order of Solvers
Here we move from first order upwind, forward Euler to flux-limiting third order and Crank Nicholson

In [3]:
import numpy as np
from numpy.random import normal, uniform

import scipy.stats as stats
from scipy.integrate import simps

import warnings

import matplotlib.pyplot as plt
import matplotlib.animation as animation

import seaborn as sns
sns.set()
sns.color_palette('colorblind')

from src import herding as herd
from src.plotting import hom_plot as homplt
from src.plotting import het_plot as hetplt

In [9]:
def run_particle_model(particles=100,
                   D=1,
                   initial_dist=uniform(size=100),
                   dt=0.01,
                   T_end=1,
                   G=herd.step_G):
    """ Space-Homogeneous Particle model

    Calculates the solution of the space-homogeneous particle model using an
    Euler-Maruyama scheme.

    Args:
        particles: Number of particles to simulate, int.
        D: Diffusion coefficient denoted sigma in equation, float.
        initial_dist: Array containing initial velocities of particles.
        dt: Time step to be use in E-M scheme, float.
        T_end: Time point at which to end simulation, float.
        G: Interaction function - refer to herding.py.

    Returns:
        t: array of times at which velocities were calculated (only used for
           plotting).
        v: array containing velocities of each particle at every timestep.

    """

    t = np.arange(0, T_end + dt, dt)
    N = len(t)-1

    v = np.zeros((N+1, particles), dtype=float)
    M1 = np.zeros(N)
    var = np.zeros(N)

    #TODO: take density function as argument for initial data using inverse transform
    v[0,] = initial_dist

    for n in range(N):
        M1[n] = np.mean(v[n,])
        var[n] = np.var(v[n,])
        v[n+1,] = (v[n,] - v[n,]*dt + G(herd.M1_hom_part(v[n,]))*dt
                   + np.sqrt(2*D*dt) * normal(size=particles))
    return t, v, [M1, var]


## Crank-Nicholson Diffusion
Finally changing from forward Euler to CN in diffusive term


In [82]:
def FVCN_solve_hom_PDE(D=1,
              initial_dist=(lambda x: np.array([int(i>=0 and i<=1) for i in x])),
              dt=0.01, T_end=1, L=5, dv=0.1, G=herd.smooth_G):
    """ Solves the kinetic model using a finite volume method

    Uses finite volume Euler and upwind techniques to approximate the
    solution on [0,T_end] given an initial condition and prints mass loss.

    Args:
        D: Diffusion coefficient denoted sigma in equation, float.
        initial_dist: function describing initial density.
        dt: Time step to be used, float.
        dv: Velocity step.
        T_end: Time point at which to end simulation, float.
        L: Velocity domain of solution. Choose to be large enough so that |v|
            never exceeds L.
        G: Interaction function - refer to herding.py.
    Returns:
        v: Velocity mesh on which solution was calculated, array.
        F: Approximate solution. Array is (time, space).

    Will warn if the initial density does not approximately integrate to 1.
    """

    v = np.arange(-L, L+dv, dv)
    t = np.arange(0, T_end+dt, dt)

    J = len(v)-1
    N = len(t)-1

    F = np.zeros((N+1, J+1), dtype=float) #array with rows in time, columns in space

    F_0 = initial_dist(v) #Initial Condition
    if not np.isclose(simps(F_0, dx=dv), 1, rtol=1e-05):
        warnings.warn('Normalising initial data... Try increasing L')
        F_0 = F_0/simps(F_0, dx=dv)
    F[0, :] = F_0

    mu = D * dt/(dv**2)
    M0, M1, M2 = [np.zeros(N) for _ in range(3)]
    
    a = 0.5*mu
    b = 1 + mu
    c = 0.5*mu

    d, e, f = [np.zeros(J+1) for _ in range(3)]


    for n in range(N):
        M0[n], M1[n], M2[n] = (herd.Mn(F[n,],v, m) for m in range(3))
        herd_coeff = G(herd.M1_hom(F[n,],v))        
        for j in range(J+1):
            if j==0:
                adv_flux_left = 0
            else:
                adv_flux_left = adv_flux_right

            if j < J:
                v_m = 0.5*(v[j]+v[j+1])    
                adv_flux_right = (min(0, -(v_m-herd_coeff))*F[n, j+1] + max(0, -(v_m-herd_coeff))*F[n, j])
                diff = 0.5 *D*(dt/dv**2) * (F[n,j+1] -2*F[n,j] + F[n,j-1])
                
            else:
                adv_flux_right = 0

            d[j] = F[n, j] + diff + dt*(adv_flux_left-adv_flux_right)/dv
            e[j] = c/(b - a*e[j-1])
            f[j] = (d[j] + a*f[j-1]) / (b - a*e[j-1])

        for j in range(J, 0, -1):
            if j==0 or j==J:
                F[n+1,j] = 0
            else:
                F[n+1, j] = f[j] + e[j]*F[n+1, j+1]

    mass_loss =  (1 - sum(F[-1,:])/sum(F[0,:]))*100
    print('Finite volume mass loss was {:.2f}%'.format(mass_loss))
    return v, F, [M0, M1, M2]


In [145]:
%matplotlib qt
diffusion = 1
particle_count = 1000
timestep = 0.01
T_final = 15
domain_size = 8
spacestep = 0.1

herding_function = herd.step_G
#Set initial data for Gaussian
mu_init = -0.5
sd_init = 0.8

#Set max/min for indicator
max_init = 0
min_init = -1

gaussian = {'particle': normal(loc=mu_init, scale=sd_init ,size=particle_count),
            'pde': lambda x: stats.norm.pdf(x, loc=mu_init, scale=sd_init)}

indicator = {'particle': uniform(low=min_init, high=max_init, size=particle_count),
            'pde': lambda x: np.array([int(i>=min_init and i<=max_init) for i in x])} 


initial_data = indicator #Choose indicator or Gaussian

framestep = 50
animate = False

###############################################################################
FV_v, FV_F, FV_moments = FVCN_solve_hom_PDE(D=diffusion,
                                      initial_dist=initial_data['pde'],
                                      dt=timestep, T_end=T_final, L=domain_size,
                                      dv=spacestep, G=herding_function)

t, traj, [M1, var] = run_particle_model(particles=particle_count,
                   D=diffusion,
                   initial_dist=initial_data['particle'],
                   dt=timestep,
                   T_end=T_final,
                   G=herding_function)
if animate:
    ani = homplt.animate_PDE_hist(t, FV_v, traj , FV_F)
    plt.show()

fig,ax=plt.subplots()
#ax.plot(t[1:], M1, label='Particle Mean')
#ax.plot(t[1:], var, label='Particle Variance')
stat_mu = np.sign(FV_F[0,].mean())
mu_0 =  FV_moments[1][0] 
mean_ode = (mu_0+1)*np.exp(-0.5*t)-1 #np.sign(mu_0) - np.sign(mu_0)*(-abs(mu_0) + 1)*np.exp(-0.5 * t)
var_0 = FV_moments[2][0] - FV_moments[1][0]**2
var_ode = diffusion + (var_0-diffusion)*np.exp(-2*t)
ax.plot(t, mean_ode, 'r:')
ax.plot(t, var_ode, 'b:')
ax.plot(t[1:], FV_moments[1], label='PDE Mean')
ax.plot(t[1:], FV_moments[2]-FV_moments[1]**2, label=' PDE Variance')

ax.plot(t[1:], FV_moments[1], label='Approx. Mean')
ax.plot(t[1:], FV_moments[2]-FV_moments[1]**2, label='Approx. Variance')

ax.set_xlabel('Time')
ax.legend(loc='best')
ax.set_title('CN Upwind Solution')
fig.show()
savepath = './'
plt.savefig(savepath+'{}.jpg'.format('upwindspacehom'), format='jpg', dpi=1000)
plt.show()

Finite volume mass loss was 0.00%


In [148]:
err_fig, err_ax= plt.subplots()
true_sol = stats.norm.pdf(FV_v, loc=-1, scale=1) 
err_ax.plot(FV_v, np.cumsum(abs(true_sol - FV_F[-1,])), label='Upwind')
err_ax.legend()
err_fig.show()

## Higher Order Upwind


In [4]:
def HO_solve_hom_PDE(D=1,
              initial_dist=(lambda x: np.array([int(i>=0 and i<=1) for i in x])),
              dt=0.01, T_end=1, L=5, dv=0.1, G=herd.smooth_G):
    """ Solves the kinetic model using a finite volume method

    Uses finite volume Euler and upwind techniques to approximate the
    solution on [0,T_end] given an initial condition and prints mass loss.

    Args:
        D: Diffusion coefficient denoted sigma in equation, float.
        initial_dist: function describing initial density.
        dt: Time step to be used, float.
        dv: Velocity step.
        T_end: Time point at which to end simulation, float.
        L: Velocity domain of solution. Choose to be large enough so that |v|
            never exceeds L.
        G: Interaction function - refer to herding.py.
    Returns:
        v: Velocity mesh on which solution was calculated, array.
        F: Approximate solution. Array is (time, space).

    Will warn if the initial density does not approximately integrate to 1.
    """

    v = np.arange(-L, L+dv, dv)
    t = np.arange(0, T_end+dt, dt)

    J = len(v)-1
    N = len(t)-1

    F = np.zeros((N+1, J+1), dtype=float) #array with rows in time, columns in space

    F_0 = initial_dist(v) #Initial Condition
    if not np.isclose(simps(F_0, dx=dv), 1, rtol=1e-05):
        warnings.warn('Normalising initial data... Try increasing L')
        F_0 = F_0/simps(F_0, dx=dv)
    F[0, :] = F_0

    mu = D * dt/(dv**2)
    M0, M1, M2 = [np.zeros(N) for _ in range(3)]
    
    a = 0.5*mu
    b = 1 + mu
    c = 0.5*mu

    d, e, f = [np.zeros(J+1) for _ in range(3)]

    eps = 10**-8
    def psi(theta): return max(0,min(1, 1/3 + (1/6)*theta, theta))
    for n in range(N):
        M0[n], M1[n], M2[n] = (herd.Mn(F[n,],v, m) for m in range(3))
        herd_coeff = G(herd.M1_hom(F[n,],v))        
        for j in range(J+1):
            if j==0:
                adv_flux_left = 0
                diff = 0
            else:
                adv_flux_left = adv_flux_right

            if 1 < j < J-1:
                theta = (F[n,j] - F[n,j-1]) / (F[n,j+1] - F[n,j] + eps) 
                v_m = 0.5*(v[j]+v[j+1])    
                
                adv_flux_right = (min(-(v_m-herd_coeff),0)*(F[n, j+1] + psi(1/(theta+eps))*(F[n, j] - F[n, j+1])) + max(-(v_m-herd_coeff),0)*(F[n, j] +  psi(theta)*(F[n, j+1] - F[n, j])))
                diff = 0.5 *D*(dt/dv**2) * (F[n,j+1] -2*F[n,j] + F[n,j-1])
                
            elif j==J-1 or j==1: 
                v_m = 0.5*(v[j]+v[j+1])
                flux_right = (min(-(v_m-herd_coeff),0)*F[n, j+1] + max(-(v_m-herd_coeff),0)*F[n, j])
            
                diff = 0.5 *D*(dt/dv**2) * (F[n,j+1] -2*F[n,j] + F[n,j-1])
                
            else:
                adv_flux_right = 0

            d[j] = F[n, j] + diff + dt*(adv_flux_left-adv_flux_right)/dv
            e[j] = c/(b - a*e[j-1])
            f[j] = (d[j] + a*f[j-1]) / (b - a*e[j-1])

        for j in range(J, 0, -1):
            if j==0 or j==J:
                F[n+1,j] = 0
            else:
                F[n+1, j] = f[j] + e[j]*F[n+1, j+1]

    mass_loss =  (1 - sum(F[-1,:])/sum(F[0,:]))*100
    print('Finite volume mass loss was {:.2f}%'.format(mass_loss))
    return v, F, [M0, M1, M2]


In [12]:
%matplotlib qt
diffusion = 1
particle_count = 1000
timestep = 0.01
T_final = 30
domain_size = 8
spacestep = 0.1

herding_function = herd.step_G
#Set initial data for Gaussian
mu_init = -2
sd_init = 0.5

#Set max/min for indicator
max_init = 0
min_init = -1

gaussian = {'particle': normal(loc=mu_init, scale=sd_init ,size=particle_count),
            'pde': lambda x: stats.norm.pdf(x, loc=mu_init, scale=sd_init)}

indicator = {'particle': uniform(low=min_init, high=max_init, size=particle_count),
            'pde': lambda x: np.array([int(i>=min_init and i<=max_init) for i in x])} 


initial_data = gaussian #Choose indicator or Gaussian

framestep = 50
animate = False


###############################################################################
FV_v, FV_F, FV_moments = HO_solve_hom_PDE(D=diffusion,
                                      initial_dist=initial_data['pde'],
                                      dt=timestep, T_end=T_final, L=domain_size,
                                      dv=spacestep, G=herding_function)

t, traj, [M1, var] = run_particle_model(particles=particle_count,
                   D=diffusion,
                   initial_dist=initial_data['particle'],
                   dt=timestep,
                   T_end=T_final,
                   G=herding_function)


Finite volume mass loss was 0.00%


In [14]:
animate = True
if animate:
    ani = homplt.animate_PDE_hist(t, FV_v, traj , FV_F)
    plt.show()
    writer = animation.FFMpegWriter(fps = 20, extra_args=['-vcodec', 'libx264'])
    ani.save('conv_to_neg.mp4', writer = writer)

fig,ax=plt.subplots()
#ax.plot(t[1:], M1, label='Particle Mean')
#ax.plot(t[1:], var, label='Particle Variance')
stat_mu = np.sign(FV_F[0,].mean())
mu_0 =  FV_moments[1][0] 
mean_ode = np.sign(mu_0) - np.sign(mu_0)*(-abs(mu_0) + 1)*np.exp(-0.5 * t)
var_0 = FV_moments[2][0] - FV_moments[1][0]**2
var_ode = diffusion + (var_0-diffusion)*np.exp(-2*t)
ax.plot(t, mean_ode, 'r:')
ax.plot(t, var_ode, 'b:')
ax.plot(t[:-1], FV_moments[1], label='PDE Mean')
ax.plot(t[:-1], FV_moments[2]-FV_moments[1]**2, label=' PDE Variance')

ax.plot(t[:-1], FV_moments[1], label='Approx. Mean')
ax.plot(t[:-1], FV_moments[2]-FV_moments[1]**2, label='Approx. Variance')

ax.set_xlabel('Time')
ax.legend(loc='best')
ax.set_title('Flux Limiting Solution')
fig.show()
savepath = './'
plt.savefig(savepath+'{}.jpg'.format('fluxspacehom'), format='jpg', dpi=1000)
plt.show()

In [159]:
err_fig, err_ax= plt.subplots()
true_sol = stats.norm.pdf(FV_v, loc=-1, scale=1) 
err_ax.plot(FV_v, np.cumsum(abs(true_sol - FV_F[-1,])), label='Flux Lim')
err_ax.legend()
err_fig.show()

## 2D Diffusion
