In [20]:
from dash import Dash, dcc, html, Input, Output, State, callback_context
import numpy as np
import plotly.graph_objs as go

L = 2 * np.pi
time_final = 1.0
u = 1.0

def initial_condition(x):
    return np.sin(x)

def compute_flux_up1(T, u, dx):
    T_padded = np.pad(T, (1, 1), mode='wrap')
    flux = np.zeros(len(T) + 1)
    for i in range(len(flux)):
        ip = i + 1
        T_half = T_padded[ip - 1] if u > 0 else T_padded[ip]
        flux[i] = u * T_half
    return flux

def compute_flux_up3(T, u, dx):
    T_padded = np.pad(T, (2, 2), mode='wrap')
    flux = np.zeros(len(T) + 1)
    for i in range(len(flux)):
        ip = i + 2
        if u > 0:
            T_half = (2*T_padded[ip-1] + 5*T_padded[ip] - T_padded[ip+1]) / 6.
        else:
            T_half = (2*T_padded[ip+2] + 5*T_padded[ip+1] - T_padded[ip]) / 6.
        flux[i] = u * T_half
    return flux

def compute_flux_cen2(T, u, dx):
    T_padded = np.pad(T, (1, 1), mode='wrap')
    flux = np.zeros(len(T) + 1)
    for i in range(len(flux)):
        ip = i + 1
        T_half = 0.5 * (T_padded[ip] + T_padded[ip - 1])
        flux[i] = u * T_half
    return flux

def compute_flux_cen4(T, u, dx):
    T_padded = np.pad(T, (2, 2), mode='wrap')
    flux = np.zeros(len(T) + 1)
    for i in range(len(flux)):
        ip = i + 2
        delta_i = T_padded[ip] - T_padded[ip - 1]
        delta_ip1 = T_padded[ip + 1] - T_padded[ip]
        T_half = T_padded[ip] + (delta_i - delta_ip1) / 6.0
        flux[i] = u * T_half
    return flux

def compute_flux_fct(T, u, dx, base='cen4', theta=0.5):
    T_padded = np.pad(T, (1, 1), mode='wrap')
    flux_lo = np.zeros(len(T) + 1)
    for i in range(len(flux_lo)):
        ip = i + 1
        T_half = T_padded[ip - 1] if u > 0 else T_padded[ip]
        flux_lo[i] = u * T_half

    if base == 'cen4':
        flux_ho = compute_flux_cen4(T, u, dx)
    elif base == 'cen2':
        flux_ho = compute_flux_cen2(T, u, dx)
    else:
        raise ValueError("FCT base must be 'cen2' or 'cen4'.")

    return theta * flux_ho + (1 - theta) * flux_lo

def compute_flux_none(T, u, dx):
    return np.zeros(len(T) + 1)

def laplacian_2nd(T: np.ndarray, dx: float) -> np.ndarray:
    return (np.roll(T, -1) - 2 * T + np.roll(T, 1)) / dx**2

def laplacian_4th(T: np.ndarray, dx: float) -> np.ndarray:
    return (-np.roll(T, 2) + 16*np.roll(T, 1) - 30*T + 16*np.roll(T, -1) - np.roll(T, -2)) / (12 * dx**2)

def bilaplacian(T: np.ndarray, dx: float) -> np.ndarray:
    return laplacian_2nd(laplacian_2nd(T, dx), dx)

def laplacian_none(T: np.ndarray, dx: float) -> np.ndarray:
    return np.zeros_like(T)

def rk3_step(T, rhs, dt):
    R1 = rhs(T)
    T1 = T + dt * R1
    R2 = rhs(T1)
    T2 = 0.75 * T + 0.25 * (T1 + dt * R2)
    R3 = rhs(T2)
    T_new = (1/3) * T + (2/3) * (T2 + dt * R3)
    return T_new

def euler_step(T, rhs, dt):
    return T + dt * rhs(T)

def run_advection_diffusion_simulation(nx, flux_scheme, nu, laplacian_func, CFL_adv, CFL_diff, theta_fct, nt, time_scheme):
    dx = L / nx
    dt_adv = CFL_adv * dx / abs(u) if (flux_scheme != "none" and abs(u) > 0) else np.inf
    if laplacian_func is laplacian_none or nu == 0.0:
        dt_diff = np.inf
    else:
        dt_diff = CFL_diff * dx**2 / (2 * nu)
    dt = min(dt_adv, dt_diff)
    time_used = dt * nt
    DOM = np.linspace(0, L, nx, endpoint=False)
    SOL_num = initial_condition(DOM)
    # Build the advection and diffusion right-hand-sides
    flux_fn = {
        "none": compute_flux_none,
        "up1": compute_flux_up1,
        "up3": compute_flux_up3,
        "cen2": compute_flux_cen2,
        "cen4": compute_flux_cen4,
        "fct_cen2": lambda T, u, dx: compute_flux_fct(T, u, dx, base='cen2', theta=theta_fct),
        "fct_cen4": lambda T, u, dx: compute_flux_fct(T, u, dx, base='cen4', theta=theta_fct),
    }[flux_scheme]
    def adv_rhs(T):
        if flux_scheme == "none":
            return np.zeros_like(T)
        F = flux_fn(T, u, dx)
        return - (F[1:] - F[:-1]) / dx
    def diff_rhs(T):
        if laplacian_func is laplacian_none:
            return np.zeros_like(T)
        return nu * laplacian_func(T, dx)
    # Combined (Strang splitting not needed, use Lie splitting: advection then diffusion)
    for _ in range(nt):
        if time_scheme == "rk3":
            SOL_num = rk3_step(SOL_num, adv_rhs, dt)
            SOL_num = rk3_step(SOL_num, diff_rhs, dt)
        elif time_scheme == "euler":
            SOL_num = euler_step(SOL_num, adv_rhs, dt)
            SOL_num = euler_step(SOL_num, diff_rhs, dt)
        else:
            raise ValueError(f"Unknown time scheme: {time_scheme}")
    if flux_scheme == "none":
        SOL_exact = initial_condition(DOM)
    else:
        SOL_exact = initial_condition((DOM - u * time_used) % L)
    return DOM, SOL_num, SOL_exact, dx, dt, nt, time_used

flux_scheme_dict = {
    "none": "No advection",
    "up1": "Up1 (1st order upstream)",
    "up3": "Up3 (3rd order upstream)",
    "cen2": "Cen2 (2nd order centered)",
    "cen4": "Cen4 (4th order centered)",
    "fct_cen2": "FCT (Cen2)",
    "fct_cen4": "FCT (Cen4)"
}
laplacian_operator_dict = {
    "none": ("No diffusion", laplacian_none),
    "lap2": ("2nd-order Laplacian", laplacian_2nd),
    "lap4": ("4th-order Laplacian", laplacian_4th),
    "bi": ("Bilaplacian", bilaplacian)
}
time_scheme_dict = {
    "rk3": "RK3",
    "euler": "Explicit Euler"
}

app = Dash(__name__)

app.layout = html.Div([
    html.H2("Interactive 1D Advection-Diffusion Explorer"),
    html.Div([
        html.Div([
            html.Label("Advection Discretization Schemes:"),
            dcc.Dropdown(
                id='flux-schemes',
                options=[{"label": v, "value": k} for k, v in flux_scheme_dict.items()],
                value=["cen2"],
                multi=True
            ),
            html.Label("Diffusion Discretization Schemes:"),
            dcc.Dropdown(
                id='diff-schemes',
                options=[{"label": v[0], "value": k} for k, v in laplacian_operator_dict.items()],
                value=["lap2"],
                multi=True
            ),
            html.Label("Time stepping schemes:"),
            dcc.Dropdown(
                id='time-schemes',
                options=[{"label": v, "value": k} for k, v in time_scheme_dict.items()],
                value=["rk3"],
                multi=True
            ),
            html.Label("CFL Number (advection):"),
            dcc.Slider(
                id='cfl-adv-slider', min=0.0, max=2.0, step=0.01, value=1.0,
                marks={0.0: "0.0", 1.0: "1.0", 2.0: "2.0"}
            ),
            html.Label("CFL Number (diffusion):"),
            dcc.Slider(
                id='cfl-diff-slider', min=0.0, max=2.0, step=0.01, value=1.0,
                marks={0.0: "0.0", 1.0: "1.0", 2.0: "2.0"}
            ),
            html.Label("Diffusion coefficient (nu):"),
            dcc.Slider(
                id='nu-slider', min=0.0, max=0.1, step=0.001, value=0.01,
                marks={0.0: "0.0", 0.05: "0.05", 0.1: "0.1"}
            ),
            html.Label("Number of grid cells (nx):"),
            dcc.Slider(
                id='nx-slider', min=20, max=400, step=10, value=100,
                marks={20: "20", 100: "100", 200: "200", 400: "400"}
            ),
            html.Label("Theta for FCT schemes:"),
            dcc.Slider(
                id='theta-slider', min=0.0, max=1.0, step=0.05, value=0.5,
                marks={0.0: "0.0", 0.5: "0.5", 1.0: "1.0"}
            ),
            html.Label("Number of time steps (nt):"),
            dcc.Slider(
                id='nt-slider', min=1, max=1000, step=1, value=100,
                marks={1: "1", 100: "100", 500: "500", 1000: "1000"}
            ),
            html.Br(),
            html.Button("Recompute convergence", id='recompute-conv-btn', n_clicks=0),
            dcc.Store(id='convergence-cache')
        ], style={'width': '30%', 'display': 'inline-block', 'verticalAlign': 'top', 'padding': 20}),
        html.Div([
            dcc.Tabs(id="tabs", value='tab-sol', children=[
                dcc.Tab(label='Solution plot', value='tab-sol'),
                dcc.Tab(label='Convergence analysis', value='tab-conv'),
            ]),
            html.Div(
                dcc.Loading(
                    id="loading-conv",
                    type="dot",
                    children=html.Div(id='tabs-content')
                ),
                style={"height": "600px"}
            )
        ], style={'width': '65%', 'display': 'inline-block', 'verticalAlign': 'top'})
    ])
])

@app.callback(
    Output('tabs-content', 'children'),
    Output('convergence-cache', 'data'),
    Input('flux-schemes', 'value'),
    Input('diff-schemes', 'value'),
    Input('time-schemes', 'value'),
    Input('cfl-adv-slider', 'value'),
    Input('cfl-diff-slider', 'value'),
    Input('nu-slider', 'value'),
    Input('nx-slider', 'value'),
    Input('theta-slider', 'value'),
    Input('nt-slider', 'value'),
    Input('tabs', 'value'),
    Input('recompute-conv-btn', 'n_clicks'),
    State('convergence-cache', 'data')
)
def update_plots(flux_schemes, diff_schemes, time_schemes, CFL_adv, CFL_diff, nu, nx, theta_fct, nt, tab, recalc_clicks, cache_data):
    if not flux_schemes or not diff_schemes or not time_schemes:
        return html.Div("Please select at least one advection, one diffusion, and one time stepping scheme."), cache_data

    if tab == 'tab-sol':
        fig = go.Figure()
        for flux_key in flux_schemes:
            for lap_key in diff_schemes:
                lap_name, lap_func = laplacian_operator_dict[lap_key]
                for time_key in time_schemes:
                    DOM, SOL_num, SOL_exact, dx, dt, nt_used, time_used = run_advection_diffusion_simulation(
                        nx=nx,
                        flux_scheme=flux_key,
                        nu=nu,
                        laplacian_func=lap_func,
                        CFL_adv=CFL_adv,
                        CFL_diff=CFL_diff,
                        theta_fct=theta_fct,
                        nt=nt,
                        time_scheme=time_key
                    )
                    fig.add_trace(go.Scatter(
                        x=DOM,
                        y=SOL_num,
                        mode='lines',
                        name=f"{flux_scheme_dict[flux_key]} + {lap_name} + {time_scheme_dict[time_key]}",
                    ))
        DOM = np.linspace(0, L, nx, endpoint=False)
        if "none" in flux_schemes:
            SOL_exact = initial_condition(DOM)
            fig.add_trace(go.Scatter(
                x=DOM,
                y=SOL_exact,
                mode='lines',
                name='Exact (no advection)',
                line=dict(dash='dash', color='black')
            ))
        else:
            dt_adv = CFL_adv * (L / nx) / abs(u) if ("none" not in flux_schemes and abs(u) > 0) else np.inf
            dt_diff = CFL_diff * (L / nx)**2 / (2 * nu) if nu > 0 else np.inf
            dt = min(dt_adv, dt_diff)
            time_used = dt * nt
            SOL_exact = initial_condition((DOM - u * time_used) % L)
            fig.add_trace(go.Scatter(
                x=DOM,
                y=SOL_exact,
                mode='lines',
                name='Exact (advection only)',
                line=dict(dash='dash', color='black')
            ))
        fig.update_layout(
            title=f"1D Advection-Diffusion: Selected Schemes<br>ν={nu}, CFL_adv={CFL_adv}, CFL_diff={CFL_diff}, nx={nx}, nt={nt}",
            xaxis_title='x',
            yaxis_title='Tracer T(x)',
            legend_title='Scheme combinations',
            template='plotly_white'
        )
        return dcc.Graph(figure=fig), cache_data

    if tab == 'tab-conv':
        ctx = callback_context
        triggered = [t['prop_id'] for t in ctx.triggered]
        recalc = 'recompute-conv-btn.n_clicks' in triggered or cache_data is None

        if recalc and recalc_clicks > 0:
            nxs = [5, 10, 20, 40, 80, 160, 320]
            all_traces = []
            for flux_key in flux_schemes:
                for lap_key in diff_schemes:
                    lap_name, lap_func = laplacian_operator_dict[lap_key]
                    for time_key in time_schemes:
                        DX = []
                        ERROR = []
                        for nx_c in nxs:
                            DOM, SOL_num, _, dx, dt, nt_used, time_used = run_advection_diffusion_simulation(
                                nx=nx_c,
                                flux_scheme=flux_key,
                                nu=nu,
                                laplacian_func=lap_func,
                                CFL_adv=CFL_adv,
                                CFL_diff=CFL_diff,
                                theta_fct=theta_fct,
                                nt=nt,
                                time_scheme=time_key
                            )
                            if flux_key == "none":
                                error = np.linalg.norm(SOL_num - initial_condition(DOM), np.inf)
                            else:
                                SOL_exact_actual = initial_condition((DOM - u * time_used) % L)
                                error = np.linalg.norm(SOL_num - SOL_exact_actual, np.inf)
                            DX.append(dx)
                            ERROR.append(error)
                        all_traces.append({
                            "x": DX, "y": ERROR,
                            "name": f"{flux_scheme_dict[flux_key]} + {lap_name} + {time_scheme_dict[time_key]}"
                        })
            ref_dx = np.array(DX)
            all_traces += [
                {"x": ref_dx, "y": ref_dx, "name": "1st order", "line": dict(dash='dash', color='gray')},
                {"x": ref_dx, "y": ref_dx**2, "name": "2nd order", "line": dict(dash='dot', color='gray')},
                {"x": ref_dx, "y": ref_dx**3, "name": "3rd order", "line": dict(dash='dashdot', color='gray')}
            ]
            cache_data = all_traces
            fig2 = go.Figure()
            for tr in all_traces:
                fig2.add_trace(go.Scatter(x=tr["x"], y=tr["y"], mode='lines+markers' if "line" not in tr else 'lines', name=tr["name"], line=tr.get("line", {})))
            fig2.update_layout(
                title=f'Convergence Analysis (L∞-error vs Δx), nt={nt}',
                xaxis_type='log',
                yaxis_type='log',
                xaxis_title='Δx',
                yaxis_title='L∞ Error',
                template='plotly_white'
            )
            return dcc.Graph(figure=fig2), cache_data
        elif cache_data:
            fig2 = go.Figure()
            for tr in cache_data:
                fig2.add_trace(go.Scatter(x=tr["x"], y=tr["y"], mode='lines+markers' if "line" not in tr else 'lines', name=tr["name"], line=tr.get("line", {})))
            fig2.update_layout(
                title=f'Convergence Analysis (L∞-error vs Δx), nt={nt}',
                xaxis_type='log',
                yaxis_type='log',
                xaxis_title='Δx',
                yaxis_title='L∞ Error',
                template='plotly_white'
            )
            return dcc.Graph(figure=fig2), cache_data
        else:
            return html.Div("Click 'Recompute convergence' to compute convergence analysis."), cache_data

    return html.Div("Click 'Recompute convergence' to compute convergence analysis."), cache_data

app.run(mode="inline")