In [None]:
%pip install numpy matplotlib numba tqdm

In [None]:
import numpy as np
import scipy
import numba
import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Circle
import os

In [None]:
def compute_u_r(r, theta, velocity, radius):
    return r * (radius * radius - r * r) * velocity *\
        np.cos(theta) * np.sin(theta)


def compute_u_theta(r, theta, velocity, radius):
    return r * (2 * r * r - radius * radius) * velocity *\
        np.sin(theta) * np.sin(theta)


@numba.njit
def fpe_euler_full(endTime, p, u_r, u_theta, r, theta,
                   delta_r, delta_theta, N_r, N_theta, velocity,
                   noiseCoeff, radius, dt=0.01, saveInterval=1000):
    solnTimes = np.linspace(0, endTime, int(endTime/dt) + 1)
    pAll = np.zeros((solnTimes.shape[0]//saveInterval + 1, N_r, N_theta),
                    dtype=np.float64)
    pAll[0, :, :] = p
    snap = 1
    p_new = np.copy(p)
    dpdt = np.zeros((N_r, N_theta))
    for t in range(1, solnTimes.shape[0]):
        pSum = 0
        for j in range(N_theta):
            pSum += p[1, j]
        for j in range(N_theta):
            p[0, j] = pSum / N_theta
        for i in range(1, N_r - 1):
            for j in range(N_theta):
                # j_across = (j + N_theta // 2) % N_theta
                j_plus = (j + 1 + N_theta) % N_theta
                j_minus = (j - 1 + N_theta) % N_theta
                ## drift at r direction
                u_r_plus = np.max(np.array([u_r[i, j], np.float64(0)]))
                u_r_minus = np.min(np.array([u_r[i, j], np.float64(0)]))
                p_r_minus = (p[i, j] - p[i - 1, j]) / delta_r
                p_r_plus = (p[i + 1, j] - p[i, j]) / delta_r
                drift_r = u_r_plus * p_r_minus + u_r_minus * p_r_plus
                ## drift at theta direction
                u_theta_plus = np.max(np.array([u_theta[i, j], np.float64(0)]))
                u_theta_minus = np.min(np.array([u_theta[i, j], np.float64(0)]))
                p_theta_minus = (p[i, j] - p[i, j_minus]) / (r[i] * delta_theta)
                p_theta_plus = (p[i, j_plus] - p[i, j]) / (r[i] * delta_theta)
                drift_theta = u_theta_plus * p_theta_minus +\
                    u_theta_minus * p_theta_plus
                ## laplacian terms
                d2pdr2 = (p[i + 1, j] + p[i - 1, j] - 2 * p[i, j]) /\
                    (delta_r * delta_r)
                d2pdtheta2 = (p[i, j_plus] + p[i, j_minus] - 2 * p[i, j]) /\
                    (delta_theta * delta_theta * r[i] * r[i])
                dpdr = (p[i + 1, j] - p[i - 1, j]) / (2 * r[i] * delta_r)
                pByr = p[i, j] / (r[i] * r[i])
                dpdt[i, j] = - drift_r - drift_theta + noiseCoeff * noiseCoeff *\
                    0.5 * (d2pdr2 + d2pdtheta2 - dpdr + pByr)
        for i in range(1, N_r - 1):
            for j in range(N_theta):
                p_new[i, j] = p[i, j] + dpdt[i, j] * dt
                p[i, j] = p_new[i, j]
        if t % saveInterval == 0:
            pAll[snap, :, :] = p
            snap += 1
    return pAll

## Full circle Fokker-Planck

In [None]:
velocity = 1
radius = 0.3
noiseCoeff = 1e-3
N_r, N_theta = 11, 20
r = np.linspace(0, radius, N_r)
theta = np.linspace(10, 370, N_theta + 1) * np.pi / 180
theta = theta[:-1]
mesh_r, mesh_theta = np.meshgrid(r, theta)
delta_r = r[1] - r[0]
delta_theta = theta[1] - theta[0]
print(delta_r, delta_theta)

In [None]:
u_r = np.zeros((N_r, N_theta), dtype=np.float64)
u_theta = np.zeros((N_r, N_theta), dtype=np.float64)
for i in range(N_r):
    for j in range(N_theta):
        u_r[i, j] = compute_u_r(mesh_r.T[i, j], mesh_theta.T[i, j], velocity, radius)
        u_theta[i, j] = compute_u_theta(mesh_r.T[i, j], mesh_theta.T[i, j], velocity, radius)
        if theta[j] > np.pi:
            u_r[i, j] = - u_r[i, j] 
            u_theta[i, j] = - u_theta[i, j]

In [None]:
u_mag = np.sqrt(np.power(u_r, 2) + np.power(u_theta, 2))
u_x = np.multiply(u_r, np.cos(mesh_theta.T)) -\
    np.multiply(u_theta, np.sin(mesh_theta.T))
u_y = np.multiply(u_r, np.sin(mesh_theta.T)) +\
    np.multiply(u_theta, np.cos(mesh_theta.T))
fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
cmap = mpl.colormaps.get_cmap("jet")
scat = ax.scatter(mesh_theta.T, mesh_r.T, marker='o', s=20, c=u_mag, cmap=cmap)
plt.colorbar(scat)
plt.show()

In [None]:
area = np.pi * radius * radius
initialPDFValue = 1 / area
p_0 = np.zeros_like(u_r)
for i in range(0, N_r - 1):
    for j in range(N_theta):
        p_0[i, j] = initialPDFValue

In [None]:
fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
cmap = mpl.colormaps.get_cmap("jet")
scat = ax.scatter(mesh_theta.T, mesh_r.T, marker='o', s=20, c=p_0, cmap=cmap)
plt.colorbar(scat)
plt.show()

In [None]:
p = np.copy(p_0)
endTime = 900
saveInterval = 0.2
noOfSteps = int(endTime/saveInterval) + 1
times = np.linspace(0, endTime, noOfSteps)
pFinal = fpe_euler_full(endTime, p, u_r, u_theta, r, theta,
                        delta_r, delta_theta, N_r, N_theta, velocity,
                        noiseCoeff, radius, dt=0.01, saveInterval=1000)
print(pFinal.shape)

In [None]:
fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
cmap = mpl.colormaps.get_cmap("jet")
scat = ax.scatter(mesh_theta.T, mesh_r.T, marker='o', s=20,
                  c=pFinal[-1, :, :], cmap=cmap,
                  vmin=0, vmax=0.05)
plt.colorbar(scat)
plt.show()

In [None]:
if not os.path.isdir("figures"):
    os.makedirs("figures")
center_x, center_y = 0, 0
mesh_x = np.multiply(mesh_r, np.cos(mesh_theta))
mesh_y = np.multiply(mesh_r, np.sin(mesh_theta))
snaps = [0, 45, 75, 90]
v_min = [0, 0, 0, 0]
v_max = [3.5, 0.05, 0.05, 0.05]
for itr, snap in enumerate(snaps):
    fig = plt.figure(itr, figsize=(10, 8))
    ax = plt.gca()
    circle = Circle((center_x, center_y), radius, fill=False)
    ax.add_patch(circle)
    cmap = mpl.colormaps.get_cmap("jet")
    scat = ax.scatter(mesh_x.T, mesh_y.T, marker='o', s=20,
                    c=pFinal[snap, :, :], cmap=cmap,
                    vmin=v_min[itr], vmax=v_max[itr])
    cbar = plt.colorbar(scat)
    cbar.set_label(r"$\tilde{p}(r, \theta, t)$", size=20, labelpad=10)
    cbar.ax.tick_params(labelsize=15)
    ax.set_xlabel(r"x", fontsize=18)
    ax.set_ylabel(r"y", fontsize=18)
    ax.text(-0.35, 0.35, r"$t = " + str(snap * 1000 * 0.01) + r"$",
            fontsize=20)
    ax.set_xticks([-0.4, -0.2, 0, 0.2, 0.4])
    ax.set_xticklabels([-0.4, -0.2, 0, 0.2, 0.4], fontsize=15)
    ax.set_yticks([-0.4, -0.2, 0, 0.2, 0.4])
    ax.set_yticklabels([-0.4, -0.2, 0, 0.2, 0.4], fontsize=15)
    plt.savefig("figures/fpe_t_" + str(snap) + ".png")
    plt.close()

In [None]:
fig = plt.figure(itr, figsize=(7, 7))
ax = fig.add_axes([0.13, 0.15, 0.82, 0.80])
circle = Circle((center_x, center_y), radius, fill=False)
ax.add_patch(circle)
scat = ax.scatter(mesh_x.T, mesh_y.T, marker='o', s=20,
                  edgecolors="black", facecolors="black")
ax.set_xlabel(r"x", fontsize=18)
ax.set_ylabel(r"y", fontsize=18)
ax.set_xticks([-0.4, -0.2, 0, 0.2, 0.4])
ax.set_xticklabels([-0.4, -0.2, 0, 0.2, 0.4], fontsize=15)
ax.set_yticks([-0.4, -0.2, 0, 0.2, 0.4])
ax.set_yticklabels([-0.4, -0.2, 0, 0.2, 0.4], fontsize=15)
plt.savefig("figures/fpe_2d_grid_polar.png")
plt.close()

In [None]:
fig = plt.figure(itr, figsize=(7, 7))
ax = fig.add_axes([0.13, 0.15, 0.82, 0.80])
scat = ax.scatter(mesh_theta.T * 180 / np.pi, mesh_r.T, marker='o', s=20,
                  edgecolors="black", facecolors="black")
ax.set_xlabel(r"$\theta\,(\degree)$", fontsize=20)
ax.set_ylabel(r"$r$", fontsize=20)
ax.set_xticks([0, 60, 120, 180, 240, 360])
ax.set_xticklabels([0, 60, 120, 180, 240, 360], fontsize=15)
ax.set_yticks([0.05, 0.1, 0.15, 0.2, 0.25, 0.3])
ax.set_yticklabels([0.05, 0.1, 0.15, 0.2, 0.25, 0.3], fontsize=15)
plt.savefig("figures/fpe_2d_map.png")
plt.close()