In [None]:
import numpy as np
import scipy.optimize as opt
import matplotlib.pyplot as plt
import xarray as xr
import os
from typing import TypeVar, Literal, Callable

import bgk
import bgk.run_params as rp
import bgk.autofigs.util as util

# Helpers

In [None]:
T = TypeVar("T", float, np.ndarray)

def smooth(arr: np.ndarray,
           window: int,
           keep_edge: Literal["left", "right", "both", "none"]="none",
           keep_edge_half: bool=False, # if true, keep only half of each enabled edge
           pad: Literal["copy", "wrap", "trim_kernel", "none"]="none",
           kernel: Callable[[np.ndarray], float] | Literal["gauss", "mean"]="mean",
           extent: float=3.0 # extent along x-axis of gaussian if `kernel=="gauss"`
           ) -> np.ndarray:
    if kernel == "gauss":
        kernel = np.exp(-np.linspace(-extent,extent,window,True)**2)
        kernel = (kernel / sum(kernel)).dot
    elif kernel == "mean":
        kernel = np.mean
    keep_left = keep_edge in ["left", "both"]
    keep_right = keep_edge in ["right", "both"]
    ret = np.zeros(len(arr) + (window - 1) * (keep_left + keep_right) // (keep_edge_half + 1) - (window - 1))
    for i in range(0, len(ret)):
        start = i - (window - 1) * keep_left // (keep_edge_half + 1)
        stop = start + window
        subarr = arr[max(0, start):stop]
        if pad == "wrap":
            raise NotImplementedError()
        elif pad == "copy":
            subarr = np.concatenate([np.full(max(0, -start), arr[0]), subarr, np.full(max(0, stop - len(arr)), arr[-1])])
        elif pad == "trim_kernel":
            n_zeros_before = max(0, -start)
            n_zeros_after = max(0, stop - len(arr))
            if n_zeros_before + n_zeros_after > 0:
                rescale = kernel(np.concatenate([np.zeros(n_zeros_before), np.ones_like(subarr), np.zeros(n_zeros_after)]))
                subarr = np.concatenate([np.zeros(n_zeros_before), subarr, np.zeros(n_zeros_after)]) / rescale
        ret[i] = kernel(subarr)
    return ret

def _plot_curve(xs: list[float], ys: list[float] | Callable[[float], float], args: list, labels: bool):
        if callable(ys):
            ys = [ys(x) for x in xs]
        elif len(ys) < len(xs):
            ys = np.concatenate((ys, np.full(len(xs) - len(ys), np.nan)))
        label = args.pop(0) if labels and args else "_nolegend"
        plt.plot(xs, ys[:len(xs)], *args, label=label)


def plot(xs: list[float], *ys_argss: list, labels:bool=False):
    plt.close("all")
    for [ys, *args] in ys_argss:
        _plot_curve(xs, ys, args, labels)
    if labels:
        plt.legend()

def plots(*plots: list[list], labels:bool=False):
    plt.close("all")
    for [xs, ys, *args] in plots:
        _plot_curve(xs, ys, args, labels)
    if labels:
        plt.legend()

def print_errs(pnames: list[str], popts: np.ndarray, pcovs: np.ndarray):
    perrs = np.sqrt(np.diag(pcovs))
    perr_rels = abs(perrs / popts)
    for p, popt, perr_rel in zip(pnames, popts, perr_rels):
        print(f"{p:6s} = {popt:6.3f} ± {100*perr_rel:4.2f}%")


def sigmoid_gaussian(x: T, x0: float, y0: float, sx: float, sy_s: float, sy_g: float) -> T:
    return y0 + sy_s / (1 + np.exp(-(x - x0) / sx)) + sy_g * np.exp(-((x-x0)/sx)**2)

def jac_sigmoid_gaussian(x: T, x0: float, y0: float, sx: float, sy_s: float, sy_g: float) -> T:
    x = (x - x0) / sx
    ex = np.exp(-x)
    ex2 = np.exp(-x**2)

    j_x0 = -1/sx * (sy_s * ex / (1 + ex)**2 - 2 * x * sy_g * ex2)
    j_y0 = np.ones_like(x)
    j_sx = x * j_x0
    j_sy_s = 1 / (1 + ex)
    j_sy_g = ex2

    return np.array([j_x0, j_y0, j_sx, j_sy_s, j_sy_g]).T

def get_sigmoid_gaussian_fit(xs: list[float], ys: list[float]) -> tuple[np.ndarray, np.ndarray]:
    p0 = [xs[np.argmax(ys)], min(ys), max(xs) - min(xs), ys[-1] - ys[0], max(ys) - ys[-1]]
    return opt.curve_fit(sigmoid_gaussian, xs, ys, p0, method="dogbox", bounds=([-np.inf, -np.inf, 0, -np.inf, -np.inf], np.inf))

def get_growth_rate_sigmoid_gaussian(popts: np.ndarray, pcovs: np.ndarray) -> tuple[float, float]:
    return 1 / popts[2], np.sqrt(pcovs[2, 2]) / popts[2]**2


def exponential(x: T, y0: float, sx: float, sy: float) -> T:
    return y0 + sy * np.exp(x / sx)

def get_exponential_fit(xs: list[float], ys: list[float]) -> tuple[np.ndarray, np.ndarray]:
    p0 = [ys[0], xs[-1] - xs[0], ys[-1] - ys[0]]
    return opt.curve_fit(exponential, xs, ys, p0, method='dogbox')

def get_growth_rate_exponential(popts: np.ndarray, pcovs: np.ndarray) -> tuple[float, float]:
    return 1 / popts[1], np.sqrt(pcovs[1, 1]) / popts[1]**2

# Load Data

In [None]:
path = f"/mnt/lustre/IAM851/jm1667/psc-runs/case1/trials/exact/B00.25-n512-cont/"

run_manager = bgk.RunManager(path)
params_record = run_manager.params_record
run_diagnostics = run_manager.run_diagnostics

size = run_diagnostics.domain_size
struct_radius = run_diagnostics.hole_radius

whole_view = bgk.Bounds3D.whole()
center_view = bgk.Bounds3D.center_yz(struct_radius)

run_diagnostics.print_params()
run_diagnostics.check_params()

In [None]:
# fiddle with this until as many steps as possible are used (usually, they can all be used)
nframes = 201

videoMaker = bgk.VideoMaker(nframes, run_manager)

videoMaker.frame_manager.print_coverage()

In [None]:
videoMaker.set_param(rp.e_phi)
videoMaker.set_view_bounds(whole_view)

# Preprocessing

In [None]:
rho = 1.3 * run_diagnostics.hole_radius

In [None]:
drho = run_diagnostics.domain_size / 100

def getRslice(data: xr.DataArray, rho: float) -> xr.DataArray:
    return data.where((rho <= videoMaker.grid_rho) & (videoMaker.grid_rho < rho + drho))

def get_y(data: xr.DataArray, rho: float) -> float:
    return abs(getRslice(data, rho)).mean() * 1e5

def get_ys(rho: float) -> np.ndarray:
    return [get_y(data, rho) for data in videoMaker.datas]

In [None]:
%matplotlib inline
plt.close("all")
im = plt.imshow(getRslice(videoMaker.datas[-1], rho), origin="lower", extent=videoMaker.view_bounds.get_extent())
plt.colorbar(im)

In [None]:
smoother = lambda arr: smooth(arr, 25, keep_edge="both", keep_edge_half=True, pad="trim_kernel", kernel="gauss", extent=3)
ts = smoother(videoMaker.axis_t)
ys = smoother(get_ys(rho))

# Fits

## Fit Sigmoid

In [None]:
popts_sg, pcovs_sg = get_sigmoid_gaussian_fit(ts, ys)
plot(ts, [ys, "."], [sigmoid_gaussian(ts, *popts_sg), "-"])
print_errs(["t0", "y0", "st", "sy_s", "sy_g"], popts_sg, pcovs_sg)

## Fit Exponential

In [None]:
tstart = 0
tstop = 50
growth_phase = slice(np.argmax(ts > tstart), np.argmax(ts > tstop))

ts2 = ts[growth_phase]
ys2 = ys[growth_phase]

In [None]:
popts_exp, pcovs_exp = get_exponential_fit(ts2, ys2)
print_errs(["y0", "st", "sy"], popts_exp, pcovs_exp)
plot(ts2, [ys2, "."], [exponential(ts2, *popts_exp), "-"], [exponential(ts2 - popts_sg[0], *popts_sg[1:4]), "--"])
# plot(ts2, [ys2, "."], [exponential(ts2, *popts_exp), "-"])

In [None]:
growth_rate_exp, growth_rate_exp_err = get_growth_rate_exponential(popts_exp, pcovs_exp)
growth_rate_sg, growth_rate_sg_err = get_growth_rate_sigmoid_gaussian(popts_sg, pcovs_sg)

print(f"growth rate exp: {growth_rate_exp:.4f} ± {growth_rate_exp_err:.4f}")
print(f"growth rate sg:  {growth_rate_sg:.4f} ± {growth_rate_sg_err:.4f}")

## Rho Dependence

In [None]:
from dataclasses import dataclass
    
@dataclass
class Growth:
    time_start: float
    time_stop: float
    rate: float
    rate_err: float

    def exclude(self) -> bool:
        return self.rate < 5 * self.rate_err or self.rate_err < 1e-6
    
    @classmethod
    def empty(cls):
        return Growth(np.nan, np.nan, np.nan, np.inf)

def get_growth(ts: np.ndarray, ys: np.ndarray) -> Growth:
    try:
        popts, pcovs = get_exponential_fit(ts, ys)
        rate, rate_err = get_growth_rate_exponential(popts, pcovs)
        return Growth(ts[0], ts[-1], rate, rate_err)
    except:
        return Growth.empty()

def get_growth_opt(ts_raw: np.ndarray, ys_raw: np.ndarray) -> Growth:
    lbound = 0
    ubound = np.argmax(ys_raw)
    min_pts = 10

    best = Growth.empty()
    for istop in range(lbound + min_pts, ubound, min_pts//2):
        for istart in range(lbound, istop - min_pts, min_pts//2):
            g = get_growth(ts_raw[istart:istop], ys_raw[istart:istop])
            if not g.exclude() and g.rate_err < best.rate_err:
                best = g
    return best

In [None]:
rhos = np.linspace(run_diagnostics.hole_radius / 2, run_diagnostics.domain_size / 2, 100)

yss = [smoother(get_ys(rho)) for rho in rhos]
ts2 = smoother(videoMaker.axis_t)

In [None]:
# growths = [get_growth_opt(ts2, ys) for ys in yss]
growths = [get_growth(ts2[:50], ys[:50]) for ys in yss]

In [None]:
rhos_sparse = np.array(rhos)
for i, g in enumerate(growths):
    if g.exclude():
        rhos_sparse[i] = np.nan

In [None]:
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax1.set_zorder(ax2.get_zorder() + 1)
ax1.set_facecolor('none')

color_ax1 = "b"
color_ax2 = "g"

ax1.set_title("Linear Growth Rates")
ax1.set_xlabel(rf"$\rho$")
ax1.set_ylabel("Growth Rate", color=color_ax1)
ax1.errorbar(rhos_sparse, [g.rate for g in growths], yerr=[g.rate_err for g in growths], color=color_ax1, capsize=1.7)
ax1.set_ylim(bottom=0)
ax2.set_ylabel("Period of Linear Growth (t)", color=color_ax2)
ax2.vlines(rhos_sparse, [g.time_start for g in growths], [g.time_stop for g in growths], colors=color_ax2, alpha=.4)
ax2.set_ylim(bottom=0)

In [None]:
os.makedirs("figs-test", exist_ok=True)
util.save_fig(fig, f"figs-test/growth2-{videoMaker._currentParam.name}-{params_record.init_strategy}-B{params_record.B0:4.2f}-n{params_record.res}.png")

## Detecting Linear Region

In [None]:
def grad_nice(ys: np.ndarray, xs: np.ndarray, rolling_window: int) -> tuple[np.ndarray, np.ndarray]:
    # grad = np.gradient(ys, xs)
    grad = (ys[1:] - ys[:-1]) / (xs[1:] - xs[:-1])
    xs = (xs[1:] + xs[:-1]) / 2
    # remove nans
    where_finite = np.isfinite(grad)
    grad = grad[where_finite]
    xs = xs[where_finite]
    # smooth
    # grad = moving_average(grad, rolling_window)
    # xs = moving_average(xs, rolling_window)
    # rescale to [-1, 1]
    grad /= max(abs(grad))
    return grad, xs

def grad2_conv(ys: np.ndarray, xs: np.ndarray, smooth_width: int) -> tuple[np.ndarray, np.ndarray]:
    smooth_width = 30
    x_conv = np.linspace(-3, 3, smooth_width)
    y_conv = (4 * x_conv**2 - 2) * np.exp(-x_conv**2) / smooth_width * 8
    return np.convolve(ys, y_conv, mode="valid")

def log_nice(ys: np.ndarray) -> np.ndarray:
    logys = np.log(ys)
    # filter infs
    logys[np.isinf(logys)] = np.nan
    return logys
    

In [None]:
rho = run_diagnostics.hole_radius * 1.3
ts = smoother(videoMaker.axis_t)
ys = smoother(get_ys(rho))

In [None]:
# y = y0 + Y * exp(t/T)
# dy = Y/T * exp(t/T)
# log dy = log(Y/T) * t/T
# d log dy = log(Y/T) / T

In [None]:
# logys = log_nice(ys - y0)
# dlogys = np.gradient(logys, ts)
# ddlogys = np.gradient(dlogys, ts)

dys, ts2 = grad_nice(ys, ts, 10)
logdys = log_nice(dys)
dlogdys, ts3 = grad_nice(logdys, ts2, 10)
ddlogdys, ts4 = grad_nice(dlogdys, ts3, 10)

In [None]:
# plot(ts, [np.zeros_like(ts)], [ys, "y"], [ddlogys / max(abs(ddlogys)), "ddlog"], [dlogys / max(abs(dlogys)), "dlog"], [logys, "log"], labels=True)
# plot(ts, [np.zeros_like(ts)], [ys, "y"], [dys, "d"], [logdys, "logd"], [dlogdys, "dlogd"], [ddlogdys, "ddlogd"], labels=True)
plt.close("all")
plt.plot(ts, np.zeros_like(ts))
plt.plot(ts, ys, label="y")
plt.plot(ts2, dys, label="dy")
plt.plot(ts2, logdys, label="logdy")
plt.plot(ts3, dlogdys, label="dlogdy")
plt.plot(ts4, ddlogdys, label="ddlogdy")
plt.legend()

In [None]:
smooth_width = 30
x_conv = np.linspace(-3,3,smooth_width)
y_conv = (4*x_conv**2 - 2) * np.exp(-x_conv**2) / smooth_width * 8

In [None]:
ddlogys_conv = np.convolve(logys, y_conv, mode="valid")
plot(ts, [ddlogys_conv])