In [1]:
# Non-linear Komptoneets equation as per Nagirner, Loskutov, Grachev, 1997

In [1]:
import myplotlib
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
from typing import Dict, Any, Callable
from tqdm.notebook import tqdm

myplotlib.load("hershey")

In [2]:
import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import tensorflow as tf


print("Num GPUs Available: ", len(tf.config.list_physical_devices("GPU")))

Num GPUs Available:  1


In [None]:
def divide_with_zero(a, b):
    return np.divide(a, b, out=np.zeros_like(a), where=b != 0)


class Simulation:
    def __init__(
        self,
        grid: Dict[str, Any],
        initial_conditions: Callable[[float], float],
        accuracy: Dict[str, float],
    ) -> None:
        self.grid = grid
        self.initial_conditions = initial_conditions
        self.accuracy = accuracy

        self.K = self.grid["K0"]
        self.dt = self.grid["dt0"]
        self.t = 0
        self.x = np.linspace(self.grid["x_min"], self.grid["x_max"], self.grid["K0"])
        self.f_i = self.initial_conditions(self.x)
        self.f_0 = self.f_i.copy()

    def coeff_dx(self, k: int) -> float:
        return self.x[k + 1] - self.x[k]

    def coeff_y(self, k: int) -> float:
        return self.x[k] ** 2 / self.coeff_dx(k)

    def coeff_yMHalf(self, k: int) -> float:
        xk_Mhalf = (self.x[k] + self.x[k - 1]) / 2
        return xk_Mhalf**2 / self.coeff_dx(k - 1)

    def coeff_yPHalf(self, k: int) -> float:
        xk_Phalf = (self.x[k] + self.x[k + 1]) / 2
        return xk_Phalf**2 / self.coeff_dx(k)

    def coeff_z(self, k: int) -> float:
        return self.x[k] * (self.x[k] - 2) / 2

    def coeff_zMHalf(self, k: int) -> float:
        xk_Mhalf = (self.x[k] + self.x[k - 1]) / 2
        return xk_Mhalf * (xk_Mhalf - 2) / 2

    def coeff_zPHalf(self, k: int) -> float:
        xk_Phalf = (self.x[k] + self.x[k + 1]) / 2
        return xk_Phalf * (xk_Phalf - 2) / 2

    def coeff_kappa(self, k: int) -> float:
        if k == 0:
            return 4 * self.dt / (3 * self.coeff_dx(k))
        else:
            return self.dt / (self.coeff_dx(k) + self.coeff_dx(k - 1))

    def coeff_w(self, k: int) -> float:
        return -self.coeff_dx(k) / (
            self.coeff_dx(k - 1) * (self.coeff_dx(k) + self.coeff_dx(k - 1))
        )

    def coeff_v(self, k: int) -> float:
        return (self.coeff_dx(k) - self.coeff_dx(k - 1)) / (
            self.coeff_dx(k) * self.coeff_dx(k - 1)
        )

    def coeff_u(self, k: int) -> float:
        return self.coeff_dx(k - 1) / (
            self.coeff_dx(k) * (self.coeff_dx(k) + self.coeff_dx(k - 1))
        )

    def coeff_a(self, k: int, f: np.ndarray) -> float:
        if k == 0:
            return 0
        elif k == self.K - 1:
            return 0
        else:
            return self.coeff_kappa(k) * (
                -self.coeff_yMHalf(k)
                + self.coeff_zMHalf(k)
                + 0.5 * f[k]
                + 0.25 * f[k - 1]
            ) + 0.25 * (self.coeff_dx(k) - self.coeff_dx(k - 1)) * self.coeff_w(k)

    def coeff_b(self, k: int, f: np.ndarray) -> float:
        if k == 0:
            return 1 + 0.125 * self.coeff_kappa(k) * (
                self.coeff_dx(k) * (6 - self.coeff_dx(k)) + 6 * f[k]
            )
        elif k == self.K - 1:
            return 1
        else:
            return (
                1
                + self.coeff_kappa(k)
                * (
                    self.coeff_yPHalf(k)
                    + self.coeff_yMHalf(k)
                    - self.coeff_zPHalf(k)
                    + self.coeff_zMHalf(k)
                )
                + 0.25 * (self.coeff_dx(k) - self.coeff_dx(k - 1)) * self.coeff_v(k)
            )

    def coeff_c(self, k: int, f: np.ndarray) -> float:
        if k == 0:
            return (1 / 3) - 0.125 * self.coeff_kappa(k) * (
                self.coeff_dx(k) * (self.coeff_dx(k) - 2) + 4 * f[k] + 2 * f[k + 1]
            )
        elif k == self.K - 1:
            return 0
        else:
            return -self.coeff_kappa(k) * (
                self.coeff_yPHalf(k)
                + self.coeff_zPHalf(k)
                + 0.5 * f[k]
                + 0.25 * f[k + 1]
            ) + 0.25 * (self.coeff_dx(k) - self.coeff_dx(k - 1)) * self.coeff_u(k)

    def coeff_d(self, k: int, f: np.ndarray) -> float:
        if k == 0:
            return f[k] * (
                1
                - 0.125
                * self.coeff_kappa(k)
                * (self.coeff_dx(k) * (6 - self.coeff_dx(k)) + 6 * f[k])
            ) + f[k + 1] * (
                (1 / 3)
                + 0.125
                * self.coeff_kappa(k)
                * (self.coeff_dx(k) * (self.coeff_dx(k) - 2) + 4 * f[k] + 2 * f[k + 1])
            )
        elif k == self.K - 1:
            return self.f_i[k]
        else:
            return (
                -self.coeff_kappa(k)
                * (
                    -self.coeff_yMHalf(k)
                    + self.coeff_zMHalf(k)
                    + 0.5 * f[k]
                    + 0.25 * f[k - 1]
                )
                * f[k - 1]
                + f[k]
                * (
                    1
                    - self.coeff_kappa(k)
                    * (
                        self.coeff_yPHalf(k)
                        + self.coeff_yMHalf(k)
                        - self.coeff_zPHalf(k)
                        + self.coeff_zMHalf(k)
                    )
                )
                + self.coeff_kappa(k)
                * (
                    self.coeff_yPHalf(k)
                    + self.coeff_zPHalf(k)
                    + 0.5 * f[k]
                    + 0.25 * f[k + 1]
                )
                * f[k + 1]
                + 0.25
                * (self.coeff_dx(k) - self.coeff_dx(k - 1))
                * (
                    self.coeff_w(k) * f[k - 1]
                    + self.coeff_v(k) * f[k]
                    + self.coeff_u(k) * f[k + 1]
                )
            )

    def adaptive_timestep(self, f_iP1: np.ndarray) -> None:
        D = np.nanmax(divide_with_zero(np.abs(f_iP1 - self.f_i), f_iP1))
        if D > self.accuracy["Dmax"]:
            self.dt /= 2
        elif D < self.accuracy["Dmin"]:
            self.dt *= 2

    def adaptive_grid(self) -> None:
        new_f = [self.f_i[0]]
        new_x = [self.x[0]]
        new_K = self.K
        for k in range(1, self.K):
            d_k = np.abs(
                divide_with_zero(self.f_i[k] - self.f_i[k - 1], self.f_i[k - 1])
            )
            if d_k > self.accuracy["dmax"]:
                # add a new grid point
                new_f.append(0.5 * (self.f_i[k] + self.f_i[k - 1]))
                new_x.append(0.5 * (self.x[k] + self.x[k - 1]))
                new_K += 1
            if (k < self.K - 1) and (d_k < self.accuracy["dmin"]):
                d_kP1 = np.abs(
                    divide_with_zero(self.f_i[k + 1] - self.f_i[k], self.f_i[k])
                )
                # remove a grid point
                if d_kP1 < self.accuracy["dmin"]:
                    new_K -= 1
                    continue
            new_f.append(self.f_i[k])
            new_x.append(self.x[k])
        self.f_i = np.array(new_f)
        self.x = np.array(new_x)
        assert (len(self.f_i) == new_K) and (len(self.x) == new_K)
        self.K = new_K

    def is_converged(self, df: np.ndarray) -> bool:
        return np.max(df) <= self.accuracy["eps"]

    def iteration(self) -> np.ndarray:
        f_s = self.f_i.copy()
        accuracy_reached = False
        for _ in range(self.accuracy["maxiter"]):
            a_coeffs = [self.coeff_a(k, f_s) for k in range(self.K - 1)]
            b_coeffs = [self.coeff_b(k, f_s) for k in range(self.K)]
            c_coeffs = [self.coeff_c(k, f_s) for k in range(self.K - 1)]
            d_coeffs = [self.coeff_d(k, f_s) for k in range(self.K)]
            ABC = np.diag(a_coeffs, -1) + np.diag(b_coeffs, 0) + np.diag(c_coeffs, 1)
            ABC_dot_f_s = np.dot(ABC, f_s)

            rhs = tf.constant(d_coeffs - ABC_dot_f_s)
            matrix = tf.constant(ABC)
            m = matrix.shape[0]
            dummy_idx = [0, 0]
            indices = [
                [[i, i + 1] for i in range(m - 1)] + [dummy_idx],
                [[i, i] for i in range(m)],
                [dummy_idx] + [[i + 1, i] for i in range(m - 1)],
            ]
            diagonals = tf.gather_nd(matrix, indices)
            df_s = tf.linalg.tridiagonal_solve(diagonals, rhs)
            f_s = (df_s + f_s).numpy()
            if np.max(np.abs(df_s)) < self.accuracy["eps"]:
                accuracy_reached = True
                break
        if not accuracy_reached:
            print("WARNING: accuracy goal not reached")
        return f_s

    def solve(self) -> None:
        f_iP1 = self.iteration()
        self.adaptive_timestep(f_iP1)
        self.f_i = f_iP1
        self.adaptive_grid()
        print(f"dt = {self.dt:.2e}, K = {self.K}")

    def Plot(self):
        fig = plt.figure(figsize=(6, 4), dpi=300)
        ax = fig.add_subplot(212)
        ax.plot(self.x, self.f_i)
        ax.plot(self.x, self.f_0, c="C1")
        ax.set(
            xlim=(self.grid["x_min"], self.grid["x_max"]),
            ylim=(0, np.max(self.f_i) * 1.2),
            xlabel="x",
            ylabel="f(x)",
        )
        ax = fig.add_subplot(211)
        ax.plot(self.x, np.arange(self.K))
        ax.set(ylabel="k", xticklabels=[])
        fig.suptitle(f"t = {self.t:.5f}, K = {self.K}")

In [4]:
Grid = {"dt0": 0.0001, "K0": 10000, "x_min": 0, "x_max": 1.5}
Accuracy = {
    "eps": 1e-5,
    "Dmax": 0.5,
    "Dmin": 1e-4,
    "dmax": 0.1,
    "dmin": 1e-4,
    "maxiter": 100,
}


def InitialConditions(x):
    C = 1
    e0 = 0.01
    x1 = 1
    return (C / (e0 * np.sqrt(np.pi))) * np.exp(-((x - x1) ** 2) / e0**2)


sim = Simulation(grid=Grid, initial_conditions=InitialConditions, accuracy=Accuracy)

In [5]:
for i in tqdm(range(10)):
    sim.solve()

sim.Plot()

  0%|          | 0/10 [00:00<?, ?it/s]

dt = 5.00e-05, K = 11529
dt = 2.50e-05, K = 13045
dt = 1.25e-05, K = 14618
dt = 6.25e-06, K = 16534
dt = 3.13e-06, K = 19054
dt = 1.56e-06, K = 22826
dt = 7.81e-07, K = 29579


: 

: 