In [None]:
import ipywidgets as widgets
from ipywidgets import interact
from IPython.display import display, clear_output
import matplotlib.pyplot as plt
import pandas as pd
from main_functions import *
import ipywidgets as widgets
from ipywidgets import HBox, VBox, interactive_output, Output
import matplotlib.pyplot as plt
import pandas as pd
from IPython.display import display
import pandas as pd
from tqdm import tqdm
import multiprocessing
multiprocessing.set_start_method("spawn", force=True)
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import dill
dill.settings['recurse'] = True

from main_functions import *



path = "Data/Historical_prices.xlsx"
df = pd.read_excel(path, sheet_name="data", header=[0, 1], index_col=0, parse_dates=True, skiprows=[2])
df = df.loc[:, pd.IndexSlice["XAU Curncy", :]]
df.columns = df.columns.droplevel(0)
df.columns = ["Close", "High", "Low", "Open"]

df.dropna(inplace=True)

In [None]:
import importlib
import main_functions

importlib.reload(main_functions)

from main_functions import *

In [None]:
fig_w = go.FigureWidget(make_subplots(specs=[[{"secondary_y": True}]]))
table_out = widgets.Output()

baseline_agg = None

def build_plot_df(prices_1d, vol_1d=None, atr_1d=None, position_1d=None, pnl_1d=None, bh_pnl_1d=None):
    df_plot = prices_1d.copy()
    if vol_1d is not None:
        df_plot['Volatility'] = vol_1d
    if atr_1d is not None:
        df_plot['ATR'] = atr_1d
    if position_1d is not None:
        df_plot['Position'] = position_1d
    if pnl_1d is not None:
        df_plot['Cumulative Return'] = pnl_1d
    if bh_pnl_1d is not None:
        df_plot['BuyAndHold Return'] = bh_pnl_1d
    return df_plot

def update_simulation(mu, sigma, kappa, theta, sigma_v, rho, v0, lambda_, nu, delta,
                      n_sims, n_days, mode, best_model, trender_sensitivity, sim_sample=0,
                      steps_per_day=24, initial_capital=1000, secondary_series='Cumulative Return'):
    global baseline_agg

    gbm_params = {'mu': mu, 'sigma': sigma}
    jump_diff_params = {'mu': mu, 'sigma': sigma, 'lambda': lambda_, 'nu': nu, 'delta': delta}
    heston_params = {'mu': mu, 'kappa': kappa, 'theta': theta, 'sigma_v': sigma_v, 'rho': rho, 'v0': v0}
    bates_params = {'mu': mu, 'kappa': kappa, 'theta': theta, 'sigma_v': sigma_v, 'rho': rho, 'v0': v0,
                    'lambda': lambda_, 'nu': nu, 'delta': delta}

    df_hist = df.copy()
    S0 = df_hist['Close'].iloc[0]

    gbm_paths = simulate_gbm_paths(gbm_params, S0, n_days, n_sims)
    jd_paths = simulate_jump_diffusion_paths(jump_diff_params, S0, n_days, n_sims)
    heston_paths, heston_vols = simulate_heston_paths(heston_params, S0, n_days, n_sims)
    bates_paths, bates_vols = simulate_bates_paths(bates_params, S0, n_days, n_sims)

    ks_gbm = backtest_historical_fit_paths(df_hist, gbm_paths)
    ks_jd = backtest_historical_fit_paths(df_hist, jd_paths)
    ks_heston = backtest_historical_fit_paths(df_hist, heston_paths)
    ks_bates = backtest_historical_fit_paths(df_hist, bates_paths)

    backtest_results = pd.DataFrame({
        "GBM": ks_gbm[:2],
        "Jump-Diffusion": ks_jd[:2],
        "Heston": ks_heston[:2],
        "Bates": ks_bates[:2]
    }, index=["KS_Stat", "P-Value"])

    if best_model == "None":
        computed_best_model = select_best_model(backtest_results)
        best_model_used = computed_best_model
        print(f"Automatically selected best model: {computed_best_model}")
    else:
        best_model_used = best_model
        print(f"Manually selected best model: {best_model}")

    if best_model_used == "GBM":
        synthetic_prices = gbm_paths
        params_used = gbm_params
        sim_vols = None
    elif best_model_used == "Jump-Diffusion":
        synthetic_prices = jd_paths
        params_used = jump_diff_params
        sim_vols = None
    elif best_model_used == "Heston":
        synthetic_prices = heston_paths
        params_used = heston_params
        sim_vols = heston_vols
    else:
        synthetic_prices = bates_paths
        params_used = bates_params
        sim_vols = bates_vols

    sigma_intraday = estimate_intraday_sigma(params_used, model_type=best_model_used, steps_per_day=steps_per_day)
    intraday_prices = brownian_bridge_paths(synthetic_prices, steps_per_day=steps_per_day, sigma_intraday=sigma_intraday)

    trender_results = apply_trender_to_simulations(intraday_prices, sensitivity=trender_sensitivity)

    aggregated_results, individual_results, equity_curves, positions = backtest_trender_multiple_paths(
        intraday_prices, trender_results, mode=mode, initial_capital=initial_capital
    )

    agg_bh, df_bh, eq_curves_bh = backtest_buy_and_hold_multiple_paths(intraday_prices, initial_capital=initial_capital)

    trender_0 = trender_results[sim_sample].drop(["Trend"], axis=1)
    intraday_0 = intraday_prices[sim_sample]["Close"].rename("Price")
    sample_price = pd.merge(trender_0, intraday_0, left_index=True, right_index=True)

    sample_equity_curve = equity_curves[sim_sample]
    sample_cumpnl = ((sample_equity_curve / initial_capital) - 1) * 100

    sample_atr = trender_results[sim_sample].get('ATR')
    sample_pos = positions[sim_sample]

    if sim_vols is not None:
        sim_vols = pd.DataFrame(sim_vols)[sim_sample] * 100

    sample_equity_bh = eq_curves_bh[sim_sample]
    sample_cumpnl_bh = ((sample_equity_bh / initial_capital) - 1) * 100

    plot_df = build_plot_df(
        prices_1d=sample_price, 
        atr_1d=sample_atr,
        position_1d=sample_pos,
        vol_1d=sim_vols,
        pnl_1d=sample_cumpnl,
        bh_pnl_1d=sample_cumpnl_bh
    )

    x_vals = plot_df.index
    with fig_w.batch_update():
        fig_w.data = []
        for col in ["Price", "TrenderUp", "TrenderDown"]:
            if col in plot_df.columns:
                fig_w.add_trace(
                    go.Scatter(
                        x=x_vals,
                        y=plot_df[col],
                        name=col
                    ),
                    secondary_y=False
                )
        if secondary_series != "None" and secondary_series in plot_df.columns:
            fig_w.add_trace(
                go.Scatter(
                    x=x_vals,
                    y=plot_df[secondary_series],
                    name=secondary_series
                ),
                secondary_y=True
            )
        fig_w.update_layout(
            title=f"Simulation Visualization ({best_model_used})",
            xaxis_title="Time",
            hovermode='x unified',
            xaxis=dict(rangeslider=dict(visible=True)),
            width=950,
            height=700
        )
        fig_w.update_yaxes(title_text="Price + Trender Lines", secondary_y=False)
        if secondary_series != "None":
            fig_w.update_yaxes(title_text=secondary_series, secondary_y=True)

    current_agg_df = pd.DataFrame(aggregated_results).T.drop(
        columns=["num_trades", "final_equity"], 
        errors='ignore'
    )

    bh_current_agg_df = pd.DataFrame(agg_bh).T.drop(
        columns=["num_trades", "final_equity"],
        errors='ignore'
    )

    bh_row = bh_current_agg_df.loc["mean",:]
    bh_row.name = "BuyAndHold Mean"
    bh_row = pd.DataFrame(bh_row).T
    combined_df = pd.concat([current_agg_df, bh_row])

    if baseline_agg is None:
        baseline_agg = combined_df.copy()

    def highlight_diff(df_):
        styles = pd.DataFrame("", index=df_.index, columns=df_.columns)
        for col in df_.columns:
            for idx in df_.index:
                if idx not in baseline_agg.index or col not in baseline_agg.columns:
                    continue
                base_val = baseline_agg.at[idx, col]
                curr_val = df_.at[idx, col]
                if pd.isnull(base_val) or pd.isnull(curr_val):
                    continue
                if curr_val > base_val:
                    styles.loc[idx, col] = 'background-color: lightgreen'
                elif curr_val < base_val:
                    styles.loc[idx, col] = 'background-color: lightcoral'
        return styles

    styled_agg = combined_df.style.apply(
        highlight_diff, axis=None
    ).format({
        "max_drawdown_duration": "{:.2f}",
        "sharpe_ratio": "{:.2f}",
        "max_drawdown": "{:.2%}",
        "total_return": "{:.2%}",
        "annual_return": "{:.2%}",
        "annual_vol": "{:.2%}",
        "win_ratio": "{:.2%}"
    })

    with table_out:
        table_out.clear_output()
        display(styled_agg)

bates_params = calibrate_bates(df)
gbm_params = calibrate_gbm(df)

controls = {
    "mu": widgets.FloatSlider(
        value=bates_params["mu"], min=0.0, max=0.001, step=1e-5,
        readout_format='.5f', description='mu'),
    "sigma": widgets.FloatSlider(
        value=gbm_params["sigma"], min=0.001, max=0.05, step=1e-4,
        readout_format='.5f', description='sigma'),
    "kappa": widgets.FloatSlider(
        value=bates_params["kappa"], min=0.0, max=1.0, step=1e-4,
        readout_format='.4f', description='kappa'),
    "theta": widgets.FloatSlider(
        value=bates_params["theta"], min=0.05, max=0.2, step=1e-3,
        readout_format='.4f', description='theta'),
    "sigma_v": widgets.FloatSlider(
        value=bates_params["sigma_v"], min=0.001, max=0.1, step=1e-3,
        readout_format='.4f', description='sigma_v'),
    "rho": widgets.FloatSlider(
        value=bates_params["rho"], min=-1.0, max=1.0, step=1e-3,
        readout_format='.4f', description='rho'),
    "v0": widgets.FloatSlider(
        value=bates_params["v0"], min=0.0001, max=0.01, step=1e-4,
        readout_format='.5f', description='v0'),
    "lambda_": widgets.FloatSlider(
        value=bates_params["lambda"], min=0.0, max=0.1, step=1e-4,
        readout_format='.5f', description='lambda'),
    "nu": widgets.FloatSlider(
        value=bates_params["nu"], min=-0.1, max=0.1, step=1e-4,
        readout_format='.5f', description='nu'),
    "delta": widgets.FloatSlider(
        value=bates_params["delta"], min=0.001, max=0.1, step=1e-4,
        readout_format='.5f', description='delta'),
    "n_sims": widgets.IntSlider(
        min=50, max=500, step=50, value=100, description='Simulations'),
    "n_days": widgets.IntSlider(
        min=100, max=10000, step=50, value=252*5, description='Days'),
    "sim_sample": widgets.IntSlider(
        min=0, max=100, step=1, value=0, description='Sim Sample'),
    "trender_sensitivity": widgets.FloatSlider(
        value=1, min=1, max=10, step=1, readout_format='.0f', description='Trender Sens'),
    "mode": widgets.Dropdown(
        options=['long_only', 'long_short'], value='long_only', description='Mode'),
    "best_model": widgets.Dropdown(
        options=["None", "GBM", "Jump-Diffusion", "Heston", "Bates"],
        value="None", description='Best Model'),
    "secondary_series": widgets.Dropdown(
        options=["None", "Price", "Volatility", "ATR", "Position", "Cumulative Return", "BuyAndHold Return"],
        value="Cumulative Return", description='Secondary Y'),
}

reset_button = widgets.Button(description="Reset", icon="refresh")

def reset_parameters(b):
    global baseline_agg
    baseline_agg = None
    controls["mu"].value = bates_params["mu"]
    controls["sigma"].value = gbm_params["sigma"]
    controls["kappa"].value = bates_params["kappa"]
    controls["theta"].value = bates_params["theta"]
    controls["sigma_v"].value = bates_params["sigma_v"]
    controls["rho"].value = bates_params["rho"]
    controls["v0"].value = bates_params["v0"]
    controls["lambda_"].value = bates_params["lambda"]
    controls["nu"].value = bates_params["nu"]
    controls["delta"].value = bates_params["delta"]
    controls["n_sims"].value = 100
    controls["n_days"].value = 252*5
    controls["mode"].value = "long_only"
    controls["best_model"].value = "None"
    controls["secondary_series"].value = "Cumulative Return"
    controls["sim_sample"].value = 0
    controls["trender_sensitivity"].value = 1

reset_button.on_click(reset_parameters)

interactive_out = widgets.interactive_output(update_simulation, controls)

control_box = widgets.VBox([reset_button] + list(controls.values()))
control_box.layout = widgets.Layout(width='25%')

right_side = widgets.VBox([fig_w, table_out])
right_side.layout = widgets.Layout(width='75%')

layout = widgets.HBox([control_box, right_side])
display(layout)
