In [1]:
#This is the code for Table4.8 in Chapter4.4:Comparison of Trading Strategies Based on the HMM Model and trading Strategies Based on the ARIMA Model
#After modifying the dataset file path, it can be directly reproduce the results.

#!!! IMPORTANT REMINDER !!! 
#!!! IMPORTANT REMINDER !!! 
#!!! IMPORTANT REMINDER !!! 
# Before running this code, make sure to modify the input dataset path in the # === 1. Load and preprocess data ===
import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
import warnings
warnings.filterwarnings("ignore")

# -------------------------
# 1. Load and preprocess data
# -------------------------
#!!! IMPORTANT REMINDER !!! 
#!!! IMPORTANT REMINDER !!! 
#!!! IMPORTANT REMINDER !!! 
# !!! Modify "C:/Users/ZhangYinhang/ES_F_data.csv"  to your save path in the below !!!

df = pd.read_csv("C:/Users/ZhangYinhang/ES_F_data.csv", parse_dates=['Date'])
df = df.sort_values('Date')
df['log_ret'] = np.log(df['Close_ES=F']).diff()
df.dropna(inplace=True)

# -------------------------
# 2. Set backtest and training parameters
# -------------------------
train_start = '2022-06-23'
train_end = '2024-12-23'
backtest_start = '2024-12-24'
rolling_window = 631       # rolling training window
roll_step = 10             # step size for rolling
predict_horizon = 10       # forecast horizon

train_mask = (df['Date'] >= train_start) & (df['Date'] <= train_end)
backtest_mask = df['Date'] >= backtest_start

train_df = df.loc[train_mask].copy()
backtest_df = df.loc[backtest_mask].copy().iloc[:100]  # backtest first 100 days only

# -------------------------
# 3. Rolling ARIMA forecast
# -------------------------
all_pred_mean = []

for start in range(0, len(backtest_df), roll_step):
    # rolling training set
    train_roll = pd.concat([train_df, backtest_df.iloc[:start]])
    if len(train_roll) > rolling_window + predict_horizon:
        train_roll = train_roll.iloc[-(rolling_window + predict_horizon):]

    train_series = train_roll['log_ret']

    # ---------- automatic order selection ----------
    best_aic = float("inf")
    best_order = (1,0,1)
    p_range = range(0,4)
    d_range = range(0,2)
    q_range = range(0,4)

    for p in p_range:
        for d in d_range:
            for q in q_range:
                try:
                    model = ARIMA(train_series, order=(p,d,q))
                    res = model.fit()
                    if res.aic < best_aic:
                        best_aic = res.aic
                        best_order = (p,d,q)
                except:
                    continue

    # ---------- forecast using best order ----------
    model = ARIMA(train_series, order=best_order)
    res = model.fit()
    pred = res.get_forecast(steps=predict_horizon)
    pred_mean = pred.predicted_mean.values
    all_pred_mean.extend(pred_mean)

# truncate to match backtest length
pred_series = pd.Series(all_pred_mean[:len(backtest_df)], index=backtest_df.index)

# -------------------------
# 4. Generate strategy signals
# -------------------------
signal = [0]*len(backtest_df)
lookback_window = predict_horizon
prev_mean = train_df['Close_ES=F'].iloc[-lookback_window:].mean()  # initial reference mean

for i in range(0, len(backtest_df), lookback_window):
    end_idx = min(i + lookback_window, len(backtest_df))
    curr_pred_mean = pred_series.iloc[i:end_idx].mean()
    
    if curr_pred_mean > prev_mean:
        signal[i:end_idx] = [1]*(end_idx - i)
    else:
        signal[i:end_idx] = [-1]*(end_idx - i)
    
    prev_mean = curr_pred_mean

signals = pd.Series(signal, index=backtest_df.index)

# -------------------------
# 5. Backtest strategy
# -------------------------
returns = backtest_df['log_ret'].copy()
strategy_ret = returns * signals

cum_return_list = []
max_drawdown_list = []
win_rate_list = []

for i in [20, 40, 60, 80, 100]:
    strat_slice = strategy_ret.iloc[:i]
    cum_return = (strat_slice + 1).prod() - 1

    cum_nav = (strat_slice + 1).cumprod()
    max_dd = (cum_nav.cummax() - cum_nav).max()

    trades = strat_slice[strat_slice != 0]
    win_rate = (trades > 0).sum() / len(trades) if len(trades) > 0 else np.nan

    cum_return_list.append(f"{cum_return*100:.2f}%")
    max_drawdown_list.append(f"{max_dd*100:.2f}%")
    win_rate_list.append(f"{win_rate*100:.2f}%" if not np.isnan(win_rate) else 'nan%')

# -------------------------
# 6. Output results for selected windows
# -------------------------
result_df = pd.DataFrame({
    'Trading days': [20, 40, 60, 80, 100],
    'CumulativeReturn': cum_return_list,
    'MaxDrawdown': max_drawdown_list,
    'WinRate': win_rate_list
})

print(result_df)


   Trading days CumulativeReturn MaxDrawdown WinRate
0            20           -1.96%       4.80%  45.00%
1            40           -1.50%       6.89%  47.50%
2            60            5.54%       6.89%  51.67%
3            80           -3.51%      15.49%  50.00%
4           100           -0.08%      15.49%  51.00%
