# Experiments and Results

In [1]:
# imports
import sys
BASE_PATH =  "/Users/audreymcmillion/Documents/acm-thesis"
sys.path.append(BASE_PATH) 
import duckdb
import matplotlib.pyplot as plt
from model_fitting import ModelFitting

from ev_scoring import ExtremeValueScoring
from market_utils import MarketUtilities
import pandas as pd

from tqdm import tqdm
import json 

ev = ExtremeValueScoring(wrds_username='audreymcmillion')
db = ev.wrds_db
conn = ev.sqlite_conn
mkt_utils = MarketUtilities(wrds_username='audreymcmillion', wrds_db = db, sqlite_conn = conn)

Loading library list...
Done


## Sources

- MAPIE Documentation: https://mapie.readthedocs.io/en/stable/theoretical_description_metrics.html

## Real Data Sets

$$
\text{MWI Score} = \frac{1}{n} \sum_{i=1}^n (\hat{y}_i^{\text{up}} - \hat{y}_i^{\text{low}} ) +\frac{2}{\alpha} \sum_{i=1}^n \max(0, |y_i- \hat{y}_i^{\text{boundary}}|)
$$

### Testing

In [None]:
covid_data = pd.read_csv(f"{BASE_PATH}/test_data/covid_results/covid_dist_shift.csv")
covid_shift = covid_data[covid_data.ks_identical == False].reset_index(drop=True)

In [None]:
covid_shift

In [None]:
anomaly_data = pd.read_sql("""
    with valid_symbols as (
    	select symbol, count(*)
    	from argarch_results ar 
    	where test_set = 'Real Anomaly'
    	group by symbol
    	having count(*) >= 100
    )
    select * 
    from manual_anomalies ma 
    where classification = 'Anomaly'
    and symbol in (select symbol from valid_symbols)
""", conn)

In [None]:
anomaly_data

In [None]:
dist_shift_query = """
    SELECT 
        t.*,
        ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY chunk_end_date) AS rn
    FROM iterative_ev_kl_5D t
    WHERE chunk_start_date > '2000-01-01'
    AND kl_divergence > 0.5
    AND chunk_end_date > '2020-01-31'
  """
dist_shift_df = pd.read_sql(dist_shift_query, conn)

In [None]:
dist_shift_df["symbol"].nunique()

In [None]:
interday_query_path = f"{BASE_PATH}/sql_lib/interday_highlow_query.sql"
with open(interday_query_path, "r") as file:
    interday_query = file.read()

**PARAMETERS:** 350 and 75

### AR-GARCH

In [None]:
argarch_coverage_lst = []

i = 0 
for ind, row in tqdm(covid_shift.iterrows()):
    symbol, cutoff_dt = row["symbol"], row["cutoff_dt"]
    start_dt, end_dt = mkt_utils.get_before_date(cutoff_dt, 350), mkt_utils.get_after_date(cutoff_dt, 75)

    # get interday df
    interday_df = db.raw_sql(interday_query.format(symbol=symbol, 
                                     start_dt=start_dt, 
                                     end_dt=end_dt))

    # create object
    if i == 0:
        model_test =  ModelFitting(data=interday_df)
    else:
        model_test.reset_data(interday_df)


    # fit and forecast an AR(1)-GARCH(1,1) model
    argarch_forecast_df = model_test.fit_forecast_ar_garch(window_size=300, garch_p = 1, garch_q = 1)

    if argarch_forecast_df.empty:
        i += 1
        continue

    # add columns
    argarch_forecast_df["symbol"]   = symbol
    argarch_forecast_df["start_dt"] = start_dt
    argarch_forecast_df["end_dt"]   = end_dt
    argarch_forecast_df["model"]    = "AR(1)-GARCH(1,1)"
    argarch_forecast_df["test_set"] = "Real Distibution Shift"
    argarch_forecast_df = argarch_forecast_df.reset_index()
    argarch_forecast_df.to_sql("argarch_results", conn, if_exists='append', index=False)
    
    # TODO: Save this in database in case we want to run future metrics
    argarch_coverage_stats = model_test.get_coverage_stats(argarch_forecast_df)

    # append symbol and cutoff date to coverage stats dictionary
    argarch_coverage_stats['symbol']     = symbol
    argarch_coverage_stats['start_dt']   = start_dt
    argarch_coverage_stats['end_dt']     = end_dt
    argarch_coverage_stats['test_set']   = "Real Distribution Shift"
    argarch_coverage_stats['model']      = "AR(1)-GARCH(1,1)"
    argarch_coverage_stats['base_model'] = None # no base model

    # convert to dataframe and add to database
    argarch_coverage_stats_df  = pd.DataFrame([argarch_coverage_stats])
    argarch_coverage_stats_df.to_sql("model_coverage_stats", conn, if_exists='append', index=False)
    
    # append to list
    argarch_coverage_lst.append(argarch_coverage_stats)

    i += 1

### Conformal Models

In [None]:
def process_symbol_conformal(symbol, conformity_score: str, test_set_name: str, start_dt: str, end_dt: str, earliest_dt: str, pred_ind: int):
    # get interday df
    interday_df = db.raw_sql(interday_query.format(symbol=symbol, 
                                     start_dt=earliest_dt, 
                                     end_dt=end_dt))
    interday_df['log_highlow_diff'] = interday_df['log_highlow_diff'].to_numpy(dtype=float)

    # create object
    model_test =  ModelFitting(data=interday_df)

    if conformity_score == "gamma":
        # fit and forecast Online Conformal and Naive Conformal dataframe
        online_df, naive_df = model_test.arma_conformal_forecasting(alpha=0.05, gamma=0.005) # NOTE: Adaptivity parameter of 0.005
    else:
        # fit and forecast Online Conformal and Naive Conformal dataframe
        online_df, naive_df = model_test.argarch_conformal_forecasting(alpha=0.05, gamma=0.005) # NOTE: Adaptivity parameter of 0.005

    if online_df.empty or naive_df.empty:
        return (None, None), (None, None)

    # key-value pairs
    online_df["symbol"] = symbol
    online_df["start_dt"] = start_dt
    online_df["end_dt"] = end_dt
    online_df["test_set"] = test_set_name
    online_df["conformal_mode"] = "online"
    
    if conformity_score == 'gamma':
        online_df['conformity_score'] = 'gamma'
        online_df['base_model'] = "AR(1)"
        online_df["forecast_std"] = None
    else:
        online_df['conformity_score'] = 'residual_normalized'
        online_df['base_model'] = "AR(1)-GARCH(1,1)"
    
    naive_df["symbol"] = symbol
    naive_df["start_dt"] = start_dt
    naive_df["end_dt"] = end_dt
    naive_df["test_set"] = test_set_name
    naive_df["conformal_mode"] = "naive"

    if conformity_score == 'gamma':
        naive_df['conformity_score'] = 'gamma'
        naive_df['base_model'] = "AR(1)"
        naive_df["forecast_std"] = None
    else:
        naive_df['conformity_score'] = 'residual_normalized'
        naive_df['base_model'] = "AR(1)-GARCH(1,1)"

    # get coverage stats
    if pred_ind <= len(online_df):
        online_coverage_stats = model_test.get_coverage_stats(online_df.iloc[-pred_ind:].copy())
    else:
        online_coverage_stats = model_test.get_coverage_stats(online_df.copy())

    # naive coverage stats - only last x observations
    if pred_ind <= len(naive_df):
        naive_coverage_stats = model_test.get_coverage_stats(naive_df.iloc[-pred_ind:].copy())
    else:
        naive_coverage_stats = model_test.get_coverage_stats(naive_df.copy())
    
    # append symbol and cutoff date to coverage stats dictionary
    online_coverage_stats['symbol'] = symbol
    online_coverage_stats['start_dt'] = start_dt
    online_coverage_stats['end_dt'] = end_dt
    online_coverage_stats['test_set'] = test_set_name

    if conformity_score == 'gamma':
        online_coverage_stats['model'] = "OC: Gamma Score"
        online_coverage_stats['base_model'] = "AR(1)"
    else:
        online_coverage_stats['model'] = "OC: Residual Normalized Score"
        online_coverage_stats['base_model'] = "AR(1)-GARCH(1,1)"

    naive_coverage_stats['symbol'] = symbol
    naive_coverage_stats['start_dt'] = start_dt
    naive_coverage_stats['end_dt'] = end_dt
    naive_coverage_stats['test_set'] = test_set_name
    
    if conformity_score == 'gamma':
        naive_coverage_stats['model'] = "NC: Gamma Score"
        naive_coverage_stats['base_model'] = "AR(1)"
    else:
        naive_coverage_stats['model'] = "NC: Residual Normalized Score"
        naive_coverage_stats['base_model'] = "AR(1)-GARCH(1,1)"

    # save as dataframe
    online_coverage_stats_df  = pd.DataFrame([online_coverage_stats])        
    naive_conformal_coverage_df  = pd.DataFrame([naive_coverage_stats])

    return (online_df.reset_index(), online_coverage_stats_df), (naive_df.reset_index(), naive_conformal_coverage_df)

In [None]:
forecast_dfs = []
coverage_dfs = []
for ind, row in tqdm(covid_shift.iterrows()):
    symbol, cutoff_dt = row["symbol"], row["cutoff_dt"]

    print("Iteration: ", ind, symbol, cutoff_dt)
    
    start_dt, end_dt = mkt_utils.get_before_date(cutoff_dt, 350), mkt_utils.get_after_date(cutoff_dt, 75)
    earliest_dt = mkt_utils.get_before_date(cutoff_dt, 450)
    online_pair, naive_pair = process_symbol_conformal(symbol, conformity_score = 'gamma', test_set_name="Real Distribution Shift", 
                                                       start_dt=start_dt, end_dt=end_dt, earliest_dt=earliest_dt, pred_ind=126)
    if online_pair[0] is not None:
        forecast_dfs.append(online_pair[0])
        coverage_dfs.append(online_pair[1])

    if naive_pair[0] is not None:
        forecast_dfs.append(naive_pair[0])
        coverage_dfs.append(naive_pair[1])

# --- Batch insert after loop ---
if forecast_dfs:
    all_forecasts = pd.concat(forecast_dfs, ignore_index=True)
    all_forecasts["symbol"] = all_forecasts["symbol"].astype(str)
    all_forecasts.to_sql("conformal_results", conn, if_exists="append", index=False)

if coverage_dfs:
    all_coverage = pd.concat(coverage_dfs, ignore_index=True)
    all_coverage.to_sql("model_coverage_stats", conn, if_exists="append", index=False)

### DtACI

In [None]:
from tqdm import tqdm
import json 
from DtACI import DtACI

def process_symbol_dtaci(symbol, test_set_name: str, start_dt: str, end_dt: str, earliest_dt: str, I: int):
    # get interday df
    interday_df = db.raw_sql(interday_query.format(symbol=symbol, 
                                     start_dt=earliest_dt,
                                     end_dt=end_dt))

    # create object
    dtaci_obj =  DtACI(data=interday_df)

    # fit and forecast Online Conformal and Naive Conformal dataframe
    # gamma_df = dtaci_obj.get_dtaci_forecast_df(score_type="gamma")
    resnorm_df = dtaci_obj.get_dtaci_forecast_df(score_type="residual_normalized", eta_adapt=False, I=I)

    # if gamma_df.empty or resnorm_df.empty:
    #     return (None, None), (None, None)
    if resnorm_df.empty:
        return (None, None)

    # key-value pairs
    # gamma_df["symbol"] = symbol
    # gamma_df["start_dt"] = start_dt
    # gamma_df["end_dt"] = end_dt
    # gamma_df["test_set"] = test_set_name
    # gamma_df['conformity_score'] = 'gamma'
    # gamma_df['model'] = "AR(1)-GARCH(1,1);sigma=0.1"

    resnorm_df["symbol"] = symbol
    resnorm_df["start_dt"] = start_dt
    resnorm_df["end_dt"] = end_dt
    resnorm_df["test_set"] = test_set_name
    resnorm_df['conformity_score'] = "residual_normalized"
    resnorm_df['model'] = f"AR(1)-GARCH(1,1);I={str(I)}"

    # coverage stat dataframes
    # dtaci_gamma_coverage_stats = dtaci_obj.get_coverage_stats(gamma_df, level = 0.95)
    dtaci_resnorm_coverage_stats = dtaci_obj.get_coverage_stats(resnorm_df, level = 0.95)
        
    # append symbol and cutoff date to coverage stats dictionary
    # dtaci_gamma_coverage_stats['symbol'] = symbol
    # dtaci_gamma_coverage_stats['start_dt'] = start_dt
    # dtaci_gamma_coverage_stats['end_dt'] = end_dt
    # dtaci_gamma_coverage_stats['test_set'] = test_set_name
    # dtaci_gamma_coverage_stats['model'] = "DtACI: Gamma Score;sigma=0.1"
    # dtaci_gamma_coverage_stats['base_model'] = "AR(1)-GARCH(1,1)"

    dtaci_resnorm_coverage_stats['symbol'] = symbol
    dtaci_resnorm_coverage_stats['start_dt'] = start_dt
    dtaci_resnorm_coverage_stats['end_dt'] = end_dt
    dtaci_resnorm_coverage_stats['test_set'] = test_set_name
    dtaci_resnorm_coverage_stats['model'] = f"DtACI: Residual Normalized Score;I={str(I)}"
    dtaci_resnorm_coverage_stats['base_model'] = "AR(1)-GARCH(1,1)"

    # save as dataframe
    # dtaci_gamma_coverage_stats_df  = pd.DataFrame([dtaci_gamma_coverage_stats])        
    dtaci_resnorm_coverage_stats_df  = pd.DataFrame([dtaci_resnorm_coverage_stats])

    # (gamma_df, dtaci_gamma_coverage_stats_df)
    return (resnorm_df, dtaci_resnorm_coverage_stats_df)

In [None]:
forecast_dfs = []
coverage_dfs = []
for ind, row in tqdm(covid_shift.iterrows()):
    symbol, cutoff_dt = row["symbol"], row["cutoff_dt"] # cutoff_dt

    print("Iteration: ", ind, symbol, cutoff_dt)
    
    start_dt, end_dt = mkt_utils.get_before_date(cutoff_dt, 350), mkt_utils.get_after_date(cutoff_dt, 75)
    earliest_dt = mkt_utils.get_before_date(cutoff_dt, 450)
    resnorm_pair = process_symbol_dtaci(symbol, test_set_name="Real Distribution Shift", 
                                                    start_dt=start_dt, end_dt=end_dt, earliest_dt = earliest_dt, I = 150)
    # if gamma_pair[0] is not None:
    #     forecast_dfs.append(gamma_pair[0])
    #     coverage_dfs.append(gamma_pair[1])

    if resnorm_pair[0] is not None:
        forecast_dfs.append(resnorm_pair[0])
        coverage_dfs.append(resnorm_pair[1])

# --- Batch insert after loop ---
if forecast_dfs:
    all_forecasts = pd.concat(forecast_dfs, ignore_index=True)
    all_forecasts["symbol"] = all_forecasts["symbol"].astype(str)
    all_forecasts = all_forecasts.rename(columns={"base_model": "model"})
    all_forecasts.to_sql("dtaci_results_new", conn, if_exists="append", index=False)

if coverage_dfs:
    all_coverage = pd.concat(coverage_dfs, ignore_index=True)
    all_coverage.to_sql("model_coverage_stats", conn, if_exists="append", index=False)

## Evaluation

In [None]:
eval_query = """
with symbol_dates as (
	SELECT symbol, start_dt, end_dt 
	FROM model_coverage_stats
	GROUP BY symbol, start_dt, end_dt 
	HAVING COUNT(DISTINCT model) = (
	    SELECT COUNT(DISTINCT model) 
	    FROM model_coverage_stats
	)
)

select mc.*
from symbol_dates sd
left join model_coverage_stats mc
on (sd.symbol, sd.start_dt, sd.end_dt) = (mc.symbol, mc.start_dt, mc.end_dt)
where mc.test_set = 'Real Distribution Shift'
"""

In [None]:
eval_df = pd.read_sql(eval_query, conn)

In [None]:
eval_df

In [None]:
import plotly.graph_objects as go

def plot_evaluation_metrics(eval_df, stat_col, model_col = "model", bin_size = 1):
    fig = go.Figure()

    bin_start, bin_end = eval_df[stat_col].min() - bin_size, eval_df[stat_col].max() + bin_size
    bin_settings = dict(start=bin_start, end=bin_end, size=bin_size)  # adjust values to your data range
    
    # Define each histogram: (dataframe, label, color)
    hist_traces = []
    colors = ['blue', 'red', 'purple', 'orange', 'green', 'brown', 'cyan', 'grey']
    distinct_models = list(eval_df[model_col].unique())

    #assert(len(colors) == len(distinct_models))

    i = 0
    for model in distinct_models:
        sub_df = eval_df[eval_df[model_col] == model]
        hist_traces.append((sub_df[stat_col], model, colors[i]))
        i += 1
    
    # Add each trace
    for data, name, color in hist_traces:
        fig.add_trace(go.Histogram(x=data, name=name, marker_color=color, opacity=0.6, xbins=bin_settings))
    
    # Layout settings
    fig.update_layout(
        barmode='overlay',  # use 'group' for side-by-side bars
        title=f'Comparison of {stat_col} for Different Models',
        xaxis_title=f'{stat_col}',
        yaxis_title='Frequency',
        legend_title='Model Type',
    )
    
    fig.show()

In [None]:
plot_evaluation_metrics(eval_df, "coverage", bin_size=0.75)

In [None]:
plot_evaluation_metrics(eval_df, "cwc_score", bin_size=0.1)

In [None]:
plot_evaluation_metrics(eval_df, "mwi_score", bin_size=2.5)

In [None]:
plot_evaluation_metrics(eval_df, "median_width", bin_size=2)

In [None]:
from scipy.stats import mannwhitneyu
import numpy as np

def mann_whitney_test(eval_df, model1, model2, stat_col, *, model_col = "model", alpha=0.1, alternative="less"):
    data1 = eval_df[eval_df[model_col] == model1][stat_col]
    data2 = eval_df[eval_df[model_col] == model2][stat_col]
    statistic, p_value = mannwhitneyu(data1, data2, alternative=alternative)

    decision = (
        f"Reject the null hypothesis: Distribution 1 is significantly {alternative} than Distribution 2."
        if p_value < alpha
        else f"Fail to reject the null hypothesis: No significant evidence that Distribution 1 is {alternative} than Distribution 2."
    )

    return {
        "U_statistic": statistic,
        "p_value": p_value,
        "alpha": alpha,
        "decision": decision
    }


In [None]:
eval_df["model"].unique()

In [None]:
mann_whitney_test(eval_df, 'AR(1)-GARCH(1,1)', 'DtACI: Residual Normalized Score', stat_col = "coverage")

In [None]:
mann_whitney_test(eval_df, 'DtACI: Residual Normalized Score', 'AR(1)-GARCH(1,1)', stat_col = "median_width")

In [None]:
mann_whitney_test(eval_df, 'DtACI: Residual Normalized Score', 'AR(1)-GARCH(1,1)', stat_col = "cwc_score")

In [None]:
mann_whitney_test(eval_df, 'DtACI: Residual Normalized Score', 'AR(1)-GARCH(1,1)', stat_col = "mwi_score")

## Synthetic Datasets

In [2]:
synth_data_df = pd.read_sql("""
select * 
from sim_data_distshift_anom_700 sdds
order by id, t
""", conn)
synth_data_df["log_highlow_diff"] = synth_data_df["data_ext"]

In [3]:
synth_data_df[["min_std", "max_std", "id"]].drop_duplicates()

Unnamed: 0,min_std,max_std,id
0,8,9,15801249959
700,8,9,18554195994
1400,5,6,48924892613
2100,8,9,51983899766
2800,5,6,73540246180
...,...,...,...
79801,7,8,958856916239
81200,8,9,964027918754
81900,6,7,969209905936
82600,6,7,974191752701


In [4]:
mkt_utils.get_after_date("1900-01-01", 200)

'1900-08-30'

In [5]:
synth_data_df[synth_data_df["id"] == 970901018586].iloc[100:].reset_index(drop=True)

Unnamed: 0,data,volatility,errors,max_flag,t,block,data_ext,data_anom,max_flag_anomaly,id,min_std,max_std,log_highlow_diff


In [6]:
int(synth_data_df.iloc[0]["min_std"])

8

### AR-GARCH

In [None]:
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
import pandas as pd

def process_symbol(symbol, dataset_name, min_std=None, max_std=None):
    # dummy dates
    start_dt, end_dt = "1900-01-01", '1900-08-30'

    # pull data
    if min_std is None and max_std is None:
        interday_df = synth_data_df[synth_data_df["id"] == symbol].iloc[100:].reset_index(drop=True)
    else:
        interday_df = synth_data_df[(synth_data_df["id"] == symbol) 
            & (synth_data_df["min_std"] == min_std) & (synth_data_df["max_std"] == max_std)].iloc[100:].reset_index(drop=True)

    df_cols = list(interday_df.columns)
    if 'min_std' in df_cols and 'max_std' in df_cols:
        symbol = str(symbol) + "|" + "std=" + str(int(min_std)) + ";" + str(int(max_std))

    if interday_df.empty:
        return None, None  # skip

    # fit AR(1)-GARCH(1,1)
    model_test = ModelFitting(data=interday_df)
    argarch_forecast_df = model_test.fit_forecast_ar_garch(
        window_size=300, garch_p=1, garch_q=1
    )

    if argarch_forecast_df.empty:
        return None, None

    # add identifying columns
    argarch_forecast_df["symbol"]   = symbol
    argarch_forecast_df["start_dt"] = start_dt
    argarch_forecast_df["end_dt"]   = end_dt
    argarch_forecast_df["model"] = "AR(1)-GARCH(1,1)"
    argarch_forecast_df["test_set"] = dataset_name

    # coverage stats
    argarch_coverage_stats = model_test.get_coverage_stats(argarch_forecast_df)
    argarch_coverage_stats["symbol"]   = symbol
    argarch_coverage_stats["start_dt"] = start_dt
    argarch_coverage_stats["end_dt"]   = end_dt
    argarch_coverage_stats["test_set"] = dataset_name
    argarch_coverage_stats["model"]    = "AR(1)-GARCH(1,1)"
    argarch_coverage_stats['base_model'] = None

    argarch_coverage_stats_df = pd.DataFrame([argarch_coverage_stats])

    return argarch_forecast_df.reset_index(), argarch_coverage_stats_df

In [None]:
from joblib import Parallel, delayed

results = Parallel(n_jobs=-1)(  # -1 = use all cores
    delayed(process_symbol)(sym_id, dataset_name = "Simulated DistShift+Anom 700", min_std=min_std, max_std=max_std)
    for sym_id, min_std, max_std in (synth_data_df[["id", "min_std", "max_std"]].drop_duplicates().to_numpy())
)

forecast_dfs = [f for f, c in results if f is not None]
coverage_dfs = [c for f, c in results if c is not None]

# --- Batch insert after loop ---
if forecast_dfs:
    all_forecasts = pd.concat(forecast_dfs, ignore_index=True)
    all_forecasts.to_sql("argarch_results", conn, if_exists="append", index=False)

if coverage_dfs:
    all_coverage = pd.concat(coverage_dfs, ignore_index=True)
    all_coverage.to_sql("model_coverage_stats", conn, if_exists="append", index=False)

### DtACI

In [13]:
from tqdm import tqdm
import json 
from DtACI import DtACI

def process_symbol_dtaci(symbol, dataset_name, *, min_std=None, max_std=None, I = None):
    # dummy dates
    start_dt, end_dt = "1900-01-01", '1900-08-30'

   # pull data
    if min_std is None and max_std is None:
        interday_df = synth_data_df[synth_data_df["id"] == symbol].iloc[100:].reset_index(drop=True)
    else:
        interday_df = synth_data_df[(synth_data_df["id"] == symbol) 
            & (synth_data_df["min_std"] == min_std) & (synth_data_df["max_std"] == max_std)].iloc[100:].reset_index(drop=True)

    if interday_df.empty:
        return (None, None), (None, None) # skip
        
    df_cols = list(interday_df.columns)
    if 'min_std' in df_cols and 'max_std' in df_cols:
        symbol = str(symbol) + "|" + "std=" + str(int(min_std)) + ";" + str(int(max_std))

    # create object
    if I is not None:
        dtaci_obj =  DtACI(data=interday_df)
    else:
        dtaci_obj =  DtACI(data=interday_df)

    # fit and forecast Online Conformal and Naive Conformal dataframe
    if I is None:
        gamma_df = dtaci_obj.get_dtaci_forecast_df(score_type="gamma", garch_p=1, garch_q=1, lookback=300, burn_ind=100)
        resnorm_df = dtaci_obj.get_dtaci_forecast_df(score_type="residual_normalized", lookback=300, burn_ind=100)
    else:
        resnorm_df = dtaci_obj.get_dtaci_forecast_df(score_type="residual_normalized", lookback=300, burn_ind=100, eta_adapt=False, I=I)

    if I is None:
        if gamma_df.empty or resnorm_df.empty:
            return (None, None), (None, None)
    else:
        if resnorm_df.empty:
            return (None, None), (None, None)

    # key-value pairs
    if I is None:
        gamma_df["symbol"] = symbol
        gamma_df["start_dt"] = start_dt
        gamma_df["end_dt"] = end_dt
        gamma_df["test_set"] = dataset_name
        gamma_df['conformity_score'] = 'gamma'
        gamma_df["model"] = "AR(1)"

    resnorm_df["symbol"] = symbol
    resnorm_df["start_dt"] = start_dt
    resnorm_df["end_dt"] = end_dt
    resnorm_df["test_set"] = dataset_name
    resnorm_df['conformity_score'] = "residual_normalized"
    
    if I is None:
        resnorm_df["model"] = "AR(1)-GARCH(1,1)"
    else:
        resnorm_df["model"] = f"AR(1)-GARCH(1,1);I={str(I)}"

    # coverage stat dataframes
    if I is None:
        dtaci_gamma_coverage_stats = dtaci_obj.get_coverage_stats(gamma_df, level = 0.95)
        
    dtaci_resnorm_coverage_stats = dtaci_obj.get_coverage_stats(resnorm_df, level = 0.95)
        
    # append symbol and cutoff date to coverage stats dictionary
    if I is None:
        dtaci_gamma_coverage_stats['symbol'] = symbol
        dtaci_gamma_coverage_stats['start_dt'] = start_dt
        dtaci_gamma_coverage_stats['end_dt'] = end_dt
        dtaci_gamma_coverage_stats['test_set'] = dataset_name
        dtaci_gamma_coverage_stats['model'] = "DtACI: Gamma Score"
        dtaci_gamma_coverage_stats['base_model'] = "AR(1)"

    dtaci_resnorm_coverage_stats['symbol'] = symbol
    dtaci_resnorm_coverage_stats['start_dt'] = start_dt
    dtaci_resnorm_coverage_stats['end_dt'] = end_dt
    dtaci_resnorm_coverage_stats['test_set'] = dataset_name
    dtaci_resnorm_coverage_stats['model'] = "DtACI: Residual Normalized Score"
    
    if I is None:
        dtaci_resnorm_coverage_stats['base_model'] = "AR(1)-GARCH(1,1)"
    else:
        dtaci_resnorm_coverage_stats['base_model'] = f"AR(1)-GARCH(1,1);I={str(I)}"

    # convert to dataframes
    if I is None:
        dtaci_gamma_coverage_stats_df = pd.DataFrame([dtaci_gamma_coverage_stats])
        
    dtaci_resnorm_coverage_stats_df = pd.DataFrame([dtaci_resnorm_coverage_stats])

    if I is None:
        return (gamma_df.reset_index(), dtaci_gamma_coverage_stats_df), (resnorm_df.reset_index(), dtaci_resnorm_coverage_stats_df)
    else:
        return (None, None), (resnorm_df.reset_index(), dtaci_resnorm_coverage_stats_df)

In [15]:
from joblib import Parallel, delayed

results = Parallel(n_jobs=-1)(  # -1 = use all cores
    delayed(process_symbol_dtaci)(sym_id, dataset_name = "Simulated DistShift+Anom 700", min_std=min_std, max_std=max_std, I = 150)
    for sym_id, min_std, max_std in (synth_data_df[["id", "min_std", "max_std"]].drop_duplicates().to_numpy())
)

forecast_dfs = [f[0] for f, c in results if f[0] is not None]
forecast_dfs += [c[0] for f, c in results if c[0] is not None]
coverage_dfs = [f[1] for f, c in results if f[1] is not None]
coverage_dfs += [c[1] for f, c in results if c[1] is not None]

# --- Batch insert after loop ---
if forecast_dfs:
    all_forecasts = pd.concat(forecast_dfs, ignore_index=True)
    all_forecasts.to_sql("dtaci_results_new", conn, if_exists="append", index=False)

if coverage_dfs:
    all_coverage = pd.concat(coverage_dfs, ignore_index=True)
    all_coverage.to_sql("model_coverage_stats", conn, if_exists="append", index=False)

estimating the model parameters. The scale of y is 0.03398. Parameter
estimation work better when this value is between 1 and 1000. The recommended
rescaling is 10 * y.

model or by setting rescale=False.

estimating the model parameters. The scale of y is 0.03399. Parameter
estimation work better when this value is between 1 and 1000. The recommended
rescaling is 10 * y.

model or by setting rescale=False.

estimating the model parameters. The scale of y is 0.034. Parameter
estimation work better when this value is between 1 and 1000. The recommended
rescaling is 10 * y.

model or by setting rescale=False.

estimating the model parameters. The scale of y is 0.03412. Parameter
estimation work better when this value is between 1 and 1000. The recommended
rescaling is 10 * y.

model or by setting rescale=False.

estimating the model parameters. The scale of y is 0.03426. Parameter
estimation work better when this value is between 1 and 1000. The recommended
rescaling is 10 * y.

model or

### Other Conformal Models

In [None]:
def process_symbol_conformal(symbol, conformity_score: str, dataset_name: str, 
                             min_std = None, max_std = None, start_dt = "1900-01-01", end_dt = "1900-08-30"):
    # pull data
    if min_std is None and max_std is None:
        interday_df = synth_data_df[synth_data_df["id"] == symbol].iloc[100:].reset_index(drop=True)
    else:
        interday_df = synth_data_df[(synth_data_df["id"] == symbol) 
            & (synth_data_df["min_std"] == min_std) & (synth_data_df["max_std"] == max_std)].iloc[100:].reset_index(drop=True)

    # convert to float
    interday_df['log_highlow_diff'] = interday_df['log_highlow_diff'].to_numpy(dtype=float)

    symbol = str(symbol)

    # create object
    model_test =  ModelFitting(data=interday_df)

    if conformity_score == "gamma":
        # fit and forecast Online Conformal and Naive Conformal dataframe
        online_df, naive_df = model_test.arma_conformal_forecasting(alpha=0.05, gamma=0.005) # NOTE: Adaptivity parameter of 0.005
    else:
        # fit and forecast Online Conformal and Naive Conformal dataframe
        online_df, naive_df = model_test.argarch_conformal_forecasting(alpha=0.05, gamma=0.005) # NOTE: Adaptivity parameter of 0.005

    if online_df.empty or naive_df.empty:
        return (None, None), (None, None)

    df_cols = list(interday_df.columns)
    if 'min_std' in df_cols and 'max_std' in df_cols:
        symbol = str(symbol) + "|" + "std=" + str(int(min_std)) + ";" + str(int(max_std))

    # key-value pairs
    online_df["symbol"] = symbol
    online_df["start_dt"] = start_dt
    online_df["end_dt"] = end_dt
    online_df["test_set"] = "Simulated Distribution Shift 700"
    online_df["conformal_mode"] = "online"
    
    if conformity_score == 'gamma':
        online_df['conformity_score'] = 'gamma'
        online_df['base_model'] = "AR(1)"
        online_df["forecast_std"] = None
    else:
        online_df['conformity_score'] = 'residual_normalized'
        online_df['base_model'] = "AR(1)-GARCH(1,1)"
    
    naive_df["symbol"] = symbol
    naive_df["start_dt"] = start_dt
    naive_df["end_dt"] = end_dt
    naive_df["test_set"] = "Simulated Distribution Shift 700"
    naive_df["conformal_mode"] = "naive"

    if conformity_score == 'gamma':
        naive_df['conformity_score'] = 'gamma'
        naive_df['base_model'] = "AR(1)"
        naive_df["forecast_std"] = None
    else:
        naive_df['conformity_score'] = 'residual_normalized'
        naive_df['base_model'] = "AR(1)-GARCH(1,1)"

    # get coverage stats
    if 300 <= len(online_df):
        online_coverage_stats = model_test.get_coverage_stats(online_df.iloc[-300:].copy())
    else:
        online_coverage_stats = model_test.get_coverage_stats(online_df.copy())

    # naive coverage stats - only last 300 observations
    if 300 <= len(naive_df):
        naive_coverage_stats = model_test.get_coverage_stats(naive_df.iloc[-300:].copy())
    else:
        naive_coverage_stats = model_test.get_coverage_stats(naive_df.copy())
    
    # append symbol and cutoff date to coverage stats dictionary
    online_coverage_stats['symbol'] = symbol
    online_coverage_stats['start_dt'] = start_dt
    online_coverage_stats['end_dt'] = end_dt
    online_coverage_stats['test_set'] = "Simulated Distribution Shift 700"

    if conformity_score == 'gamma':
        online_coverage_stats['model'] = "OC: Gamma Score"
        online_coverage_stats['base_model'] = "AR(1)"
    else:
        online_coverage_stats['model'] = "OC: Residual Normalized Score"
        online_coverage_stats['base_model'] = "AR(1)-GARCH(1,1)"

    naive_coverage_stats['symbol'] = symbol
    naive_coverage_stats['start_dt'] = start_dt
    naive_coverage_stats['end_dt'] = end_dt
    naive_coverage_stats['test_set'] = "Simulated Distribution Shift 700"
    
    if conformity_score == 'gamma':
        naive_coverage_stats['model'] = "NC: Gamma Score"
        naive_coverage_stats['base_model'] = "AR(1)"
    else:
        naive_coverage_stats['model'] = "NC: Residual Normalized Score"
        naive_coverage_stats['base_model'] = "AR(1)-GARCH(1,1)"

    # save as dataframe
    online_coverage_stats_df  = pd.DataFrame([online_coverage_stats])        
    naive_conformal_coverage_df  = pd.DataFrame([naive_coverage_stats])

    return (online_df.reset_index(), online_coverage_stats_df), (naive_df.reset_index(), naive_conformal_coverage_df)

In [None]:
from joblib import Parallel, delayed

# results = Parallel(n_jobs=-1)(  # -1 = use all cores
#     delayed(process_symbol_conformal)(sym_id, "gamma")
#     for sym_id in list(synth_data_df["id"].unique())
# )

results = Parallel(n_jobs=-1)(  # -1 = use all cores
    delayed(process_symbol_conformal)(sym_id, conformity_score = "residual_normalized", dataset_name = "Simulated DistShift+Anom 700", min_std=min_std, max_std=max_std)
    for sym_id, min_std, max_std in (synth_data_df[["id", "min_std", "max_std"]].drop_duplicates().to_numpy())
)

forecast_dfs = [f[0] for f, c in results if f[0] is not None]
forecast_dfs += [c[0] for f, c in results if c[0] is not None]
coverage_dfs = [f[1] for f, c in results if f[1] is not None]
coverage_dfs += [c[1] for f, c in results if c[1] is not None]

# --- Batch insert after loop ---
if forecast_dfs:
    all_forecasts = pd.concat(forecast_dfs, ignore_index=True)
    all_forecasts["symbol"] = all_forecasts["symbol"].astype(str)
    all_forecasts.to_sql("conformal_results", conn, if_exists="append", index=False)

if coverage_dfs:
    all_coverage = pd.concat(coverage_dfs, ignore_index=True)
    all_coverage.to_sql("model_coverage_stats", conn, if_exists="append", index=False)

### Evaluation

In [None]:
eval_query = """
select mc.*
from model_coverage_stats mc
where mc.test_set = 'Simulated Distribution Shift 700'
"""
eval_df = pd.read_sql(eval_query, conn)

In [None]:
eval_df[["model"]].groupby("model").value_counts()

In [None]:
plot_evaluation_metrics(eval_df, "coverage", bin_size=0.75)

In [None]:
plot_evaluation_metrics(eval_df, "median_width", bin_size=0.75)

In [None]:
plot_evaluation_metrics(eval_df, "cwc_score", bin_size=0.1)

In [None]:
plot_evaluation_metrics(eval_df, "mwi_score", bin_size=0.8)