In [17]:
# imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.stattools import adfuller
import statsmodels.api as sm
from ib_insync import *
from datetime import datetime, timedelta
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import yfinance as yf
from arch.unitroot import VarianceRatio
from pykalman import KalmanFilter

In [18]:
# lets look at the prices from both to visually see cointegration before testing:

# download data from yfinance as TWS is down at the time of writing this
start_date = '2007-01-01'
end_date = '2009-01-15'

# identify pair:
t1 = 'EWA'
t2 = 'EWC'

print(f"Downloading {t1} data...")
gld = yf.download(t1, start=start_date, end=end_date, auto_adjust=True)
print(f"Downloading {t2} data...")
gdx = yf.download(t2, start=start_date, end=end_date, auto_adjust=True)

# rename columns
gld_close = gld[['Close']].copy()
gld_close.columns = [t1]

gdx_close = gdx[['Close']].copy()
gdx_close.columns = [t2]

# concatenate the data for easier management
pair = pd.concat([gld_close, gdx_close], axis=1).dropna()
print("\nCombined data:")
print(pair.head())

# plot prices so we can visualize relationship
print("\nGenerating plot...")
fig = go.Figure()
fig.add_trace(go.Scatter(x=pair.index, y=pair[t1], mode='lines', name=f'{t1} Close'))
fig.add_trace(go.Scatter(x=pair.index, y=pair[t2], mode='lines', name=f'{t2} Close'))

fig.update_layout(
    title=f'{t1} and {t2} Closing Prices (2013-2017)',
    xaxis_title='Date',
    yaxis_title='Price (USD)',
    hovermode='x unified',
    template='plotly_white'
)

fig.show()

Downloading EWA data...


[*********************100%***********************]  1 of 1 completed


Downloading EWC data...


[*********************100%***********************]  1 of 1 completed


Combined data:
                  EWA        EWC
Date                            
2007-01-03  10.309549  16.954327
2007-01-04  10.116851  16.680313
2007-01-05   9.950424  16.625509
2007-01-08  10.007359  16.515907
2007-01-09  10.064295  16.550159

Generating plot...





In [19]:
# now lets prove that these two price series are cointegrated:

# cointegration test method
def cointegration_test(series1, series2):
    # step 1: Regress series1 on series2
    X = sm.add_constant(series2)
    model = sm.OLS(series1, X).fit()
    beta = model.params[1]
    spread = series1 - beta * series2
    
    # step 2: ADF test on residuals
    adf_result = adfuller(spread.dropna())
    print(f"ADF Statistic: {adf_result[0]}")
    print(f"p-value: {adf_result[1]}")
    print(f"Critical Values: {adf_result[4]}")

    return beta, spread, adf_result[1]

def hurst_exponent(series):
    """Calculates Hurst exponent for mean-reversion detection"""
    lags = range(2, 100)
    tau = [np.std(np.subtract(series[lag:], series[:-lag])) for lag in lags]
    poly = np.polyfit(np.log(lags), np.log(tau), 1)
    return poly[0]

def enhanced_cointegration_test(series1, series2):
    """Comprehensive cointegration check with diagnostics"""
    # use Engle-Granger cointegration test for pairwise cointegration
    beta, spread, p_value = cointegration_test(series1, series2)

    # do they cointegrate within a 95% confidence range?
    if p_value < 0.05:
        print("Cointegration exists (reject null hypothesis)")
    else:
        print("No cointegration - find different pair")
        
    # calculate Hurst exponent
    H = hurst_exponent(spread.values)
    
    # calculate half-life
    lagged_spread = spread.shift(1).dropna()
    delta = spread.diff().dropna()
    model = sm.OLS(delta, lagged_spread)
    half_life = -np.log(2) / model.fit().params[0]
    
    # run variance ratio test
    vr = VarianceRatio(spread).pvalue
    
    print(f"\nAdditional Diagnostics:")
    print(f"Hurst Exponent: {H:.4f} → {'Mean-reverting' if H<0.5 else 'Trending'}")
    print(f"Half-life: {half_life:.2f} days")
    print(f"Variance Ratio p-value: {vr:.4f}")
    
    return beta, spread, p_value, H, half_life

# run cointegration test:
beta, spread, p_value, H, half_life = enhanced_cointegration_test(pair[t1], pair[t2])

z_score = (spread - spread.mean()) / spread.std()

ADF Statistic: -1.6496722875468501
p-value: 0.4572113421497212
Critical Values: {'1%': np.float64(-3.443262740636999), '5%': np.float64(-2.8672350350046787), '10%': np.float64(-2.569803239025633)}
No cointegration - find different pair

Additional Diagnostics:
Hurst Exponent: 0.3341 → Mean-reverting
Half-life: 41.94 days
Variance Ratio p-value: 0.0003



Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`


Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`



In [20]:
# ======================
# BACKTEST PARAMETERS
# ======================

# Condition: Do not trade under 45 half life days, this would be part of the screening process

entry_std_score = 0.25
exit_std_score = 0.25
observation_cov = 0.001
delta = 0.0001
transition_cov = delta / (1 - delta)
initial_capital = 100000
allocated_capital = 35000
commission_pct = 0.0007
slippage_pct = 0.00002

In [21]:
# ======================
# BORROW FEE SIMULATION PARAMETERS
# ======================

borrow_init = 0.003  # lets start with a 0.3% fee
mu_annual = 0.0  # long run drift
sigma_annual = 0.25  # 25% annual volatility
dt = 1/252  # daily

mu_dt = (mu_annual - 0.5 * sigma_annual**2) * dt
sigma_dt = sigma_annual * np.sqrt(dt)

# ======================
# BORROW FEE SIMULATION
# ======================

borrow_A = pd.Series(index=pair.index, dtype=float)
borrow_B = pd.Series(index=pair.index, dtype=float)

for i in range(len(pair)):
    if i == 0:
        borrow_A[i] = borrow_init
        borrow_B[i] = borrow_init
    else:
        borrow_A.iloc[i] = borrow_A.iloc[i-1] * np.exp(mu_dt + sigma_dt * np.random.randn())
        borrow_B.iloc[i] = borrow_B.iloc[i-1] * np.exp(mu_dt + sigma_dt * np.random.randn())


Series.__setitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To set a value by position, use `ser.iloc[pos] = value`


Series.__setitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To set a value by position, use `ser.iloc[pos] = value`



In [22]:
# ======================
# DATA PREP
# ======================

# connect data from preprocessing:
data = pair

log_prices = np.log(data[[t1, t2]])
y = log_prices[t1]
x_obs = sm.add_constant(log_prices[[t2]])

# reshape for kalman filter:
observation_matrices_3d = x_obs.values.reshape(x_obs.shape[0], 1, x_obs.shape[1])

# ======================
# KALMAN FILTER
# ======================
kf = KalmanFilter(
    n_dim_obs=1,
    n_dim_state=2,
    initial_state_mean=np.zeros(2),
    initial_state_covariance=np.ones((2, 2)),
    transition_matrices=np.eye(2),
    observation_matrices=observation_matrices_3d,
    observation_covariance=observation_cov,
    transition_covariance=np.eye(2) * transition_cov
)

# get kalman filter results:
state_means, state_covs = kf.filter(y.values)

# ======================
# RESULT PROCESSING
# ======================
# get beta and intercept:
beta = pd.Series(state_means[:, 1], index=data.index, name='beta')
intercept = pd.Series(state_means[:, 0], index=data.index, name='intercept')

# calculate spread (forecast error):
expected_y = (observation_matrices_3d @ state_means.reshape(-1, 2, 1)).flatten()
spread = pd.Series(y - expected_y, name='spread')

# calculate variance of forecast error/spread (Q):
Q = (observation_matrices_3d @ state_covs @ observation_matrices_3d.transpose((0, 2, 1))).flatten()
z_score = pd.Series(spread / np.sqrt(Q), name='z_score')

# ======================
# INIT LEDGER
# ======================
ledger = pd.DataFrame(index=data.index)
ledger['position'] = 0           # +1 long spread, -1 short spread, 0 flat
ledger['shares_A'] = 0.0         
ledger['shares_B'] = 0.0         
ledger['cash'] = initial_capital
ledger['equity'] = initial_capital
ledger['spread'] = spread
ledger['z_score'] = z_score
ledger['hedge_ratio'] = beta

# ======================
# BACKTEST ENGINE
# ======================
for t in range(len(data) - 1):
    today = data.index[t]
    tomorrow = data.index[t+1]
    
    # get the current portfolio state:
    current_position = ledger.loc[today, 'position']
    current_shares_A = ledger.loc[today, 'shares_A']
    current_shares_B = ledger.loc[today, 'shares_B']
    current_cash = ledger.loc[today, 'cash']
    
    # get today's kalman filter results:
    today_beta = beta.loc[today]
    today_z_score = z_score.loc[today]
    
    # get today's prices
    t1_price_today = data.loc[today, t1]
    t2_price_today = data.loc[today, t2]
    
    # get tomorrow's prices (trade simulation):   NOTE: trade execution assumes the use of last close prices (daily), so if we get a signal in this iteration, we trade at tomorrow's prices
    t1_price_tomorrow = data.loc[tomorrow, t1]
    t2_price_tomorrow = data.loc[tomorrow, t2]
    
    # trade logic variables - nothing happens if not changed...
    signal_position = current_position
    target_shares_A = current_shares_A
    target_shares_B = current_shares_B
    
    # ======================
    # ENTRY LOGIC
    # ======================

    if current_position == 0:
        if today_z_score <= -entry_std_score:  # long spread detected
            signal_position = 1
        elif today_z_score >= entry_std_score:  # short spread detected
            signal_position = -1

    # ======================
    # EXIT LOGIC
    # ======================

    elif current_position == 1:  # in long spread
        if today_z_score >= -exit_std_score:  # exit condition hit
            signal_position = 0
    elif current_position == -1:  # in short spread
        if today_z_score <= exit_std_score:   # exit condition hit
            signal_position = 0

    # ======================
    # BORROW FEES
    # ======================

    if current_position == 1:  # long spread - shorting B
        borrow_fee = ((borrow_B.loc[today]) * abs(current_shares_B) * t1_price_today)/365
        current_cash -= borrow_fee

    elif current_position == -1:  # short spread - shorting A
        borrow_fee = ((borrow_A.loc[today]) * abs(current_shares_A) * t2_price_today)/365
        current_cash -= borrow_fee
            
    # ======================
    # TRADE EXECUTION
    # ======================

    # NOTE: Again -- just for clarification -- "tomorrow"'s prices are the prices at which we will actually be trading. "today"'s prices are the prices at yesterday's close where we are calculating signals.

    if signal_position != current_position:
        # close current position
        close_value = (current_shares_A * t1_price_today + 
                      current_shares_B * t2_price_today)
        current_cash += close_value
        
        # apply transaction costs (closing)
        close_trade_value = abs(current_shares_A * t1_price_today) + abs(current_shares_B * t2_price_today)
        current_cash -= close_trade_value * (commission_pct + slippage_pct)
        
        # open new position (if entering a position)
        if signal_position != 0:
            # calculate position units (N) using hedge ratio
            N = allocated_capital / (t1_price_today + abs(today_beta) * t2_price_today)
            
            if signal_position == 1:  # long spread --> long A, short Beta * B
                target_shares_A = N
                target_shares_B = -N * today_beta
            else:                     # short spread --> short A, long Beta * B
                target_shares_A = -N
                target_shares_B = N * today_beta
                
            # calculate position cost and deduct from cash
            position_cost = (target_shares_A * t1_price_today + 
                            target_shares_B * t2_price_today)
            current_cash -= position_cost
            
            # apply transaction costs on open
            open_trade_value = abs(target_shares_A * t1_price_today) + abs(target_shares_B * t2_price_today)
            current_cash -= open_trade_value * (commission_pct + slippage_pct)
        else:
            target_shares_A, target_shares_B = 0, 0

    # ======================
    # UPDATE LEDGER
    # ======================
    equity = current_cash + (target_shares_A * t1_price_tomorrow + 
                            target_shares_B * t2_price_tomorrow)
    ledger.at[tomorrow, 'position'] = signal_position
    ledger.at[tomorrow, 'shares_A'] = target_shares_A
    ledger.at[tomorrow, 'shares_B'] = target_shares_B
    ledger.at[tomorrow, 'cash'] = current_cash
    ledger.at[tomorrow, 'equity'] = equity

# ======================
# RESULTS
# ======================
results = ledger[ledger['equity'] != 0].dropna()
returns = results['equity'].pct_change().fillna(0)

# metrics:
initial_value = results['equity'].iloc[0]
final_value = results['equity'].iloc[-1]
years = (results.index[-1] - results.index[0]).days / 365.25

# returns:
total_return = (final_value / initial_value) - 1
annualized_return = (1 + total_return) ** (1 / years) - 1

# volatility and risk metrics:
volatility = returns.std() * np.sqrt(252)
running_max = results['equity'].cummax()
drawdown = (results['equity'] - running_max) / running_max
max_drawdown = drawdown.min()
negative_returns = returns[returns < 0]
downside_deviation = negative_returns.std() * np.sqrt(252) if not negative_returns.empty else 0

# Sharpe, Sortino, Calmar ratios:
sharpe_ratio = annualized_return / volatility if volatility != 0 else 0
sortino_ratio = annualized_return / downside_deviation if downside_deviation != 0 else 0
calmar_ratio = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0

print(f"Backtest Results ({t1}/{t2})")
print(f"Period: {results.index[0].date()} to {results.index[-1].date()} ({years:.2f} years)")
print(f"Total Return: {total_return:.2%}")
print(f"Annualized Return: {annualized_return:.2%}")
print(f"Annualized Volatility: {volatility:.2%}")
print(f"Max Drawdown: {max_drawdown:.2%}")
print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
print(f"Sortino Ratio: {sortino_ratio:.2f}")
print(f"Calmar Ratio: {calmar_ratio:.2f}")


Setting an item of incompatible dtype is deprecated and will raise an error in a future version of pandas. Value '100138.59398921077' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.


Setting an item of incompatible dtype is deprecated and will raise an error in a future version of pandas. Value '100190.36979906079' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.



Backtest Results (EWA/EWC)
Period: 2007-01-03 to 2009-01-14 (2.03 years)
Total Return: 14.75%
Annualized Return: 7.01%
Annualized Volatility: 2.72%
Max Drawdown: -1.40%
Sharpe Ratio: 2.58
Sortino Ratio: 4.40
Calmar Ratio: 5.00


In [23]:
# Plot trade markers on price
long_signals = ledger[(ledger['position'].diff() == 1)]
short_signals = ledger[(ledger['position'].diff() == -1)]
exit_signals = ledger[(ledger['position'].diff() != 0) & (ledger['position'] == 0)]

import plotly.graph_objects as go

fig = go.Figure()

# --- Left y-axis: Price series t1 ---
fig.add_trace(go.Scatter(
    x=data.index, y=data[t1],
    mode='lines',
    name=f'{t1} Price',
    line=dict(color='blue'),
    yaxis='y1'
))

# --- Left y-axis: Price series t2 ---
fig.add_trace(go.Scatter(
    x=data.index, y=data[t2],
    mode='lines',
    name=f'{t2} Price',
    line=dict(color='orange'),
    yaxis='y1'
))

# --- Trade markers ---
fig.add_trace(go.Scatter(
    x=long_signals.index, y=data.loc[long_signals.index, t1],
    mode='markers',
    marker=dict(symbol='triangle-up', color='green', size=10),
    name='Long Entry (t1)',
    yaxis='y1'
))

fig.add_trace(go.Scatter(
    x=short_signals.index, y=data.loc[short_signals.index, t1],
    mode='markers',
    marker=dict(symbol='triangle-down', color='red', size=10),
    name='Short Entry (t1)',
    yaxis='y1'
))

fig.add_trace(go.Scatter(
    x=exit_signals.index, y=data.loc[exit_signals.index, t1],
    mode='markers',
    marker=dict(symbol='circle', color='black', size=8),
    name='Exit (t1)',
    yaxis='y1'
))

fig.add_trace(go.Scatter(
    x=long_signals.index, y=data.loc[long_signals.index, t2],
    mode='markers',
    marker=dict(symbol='triangle-down', color='red', size=10),
    name='Long Entry (t2)',
    yaxis='y1'
))

fig.add_trace(go.Scatter(
    x=short_signals.index, y=data.loc[short_signals.index, t2],
    mode='markers',
    marker=dict(symbol='triangle-up', color='green', size=10),
    name='Short Entry (t2)',
    yaxis='y1'
))

fig.add_trace(go.Scatter(
    x=exit_signals.index, y=data.loc[exit_signals.index, t2],
    mode='markers',
    marker=dict(symbol='circle', color='black', size=8),
    name='Exit (t2)',
    yaxis='y1'
))

# --- Z-score on secondary y-axis ---
fig.add_trace(go.Scatter(
    x=z_score.index, y=z_score,
    mode='lines',
    name='Z-score',
    line=dict(color='green', dash='solid'),
    yaxis='y2'
))

# Threshold lines
for val, color, dash, label in [
    (entry_std_score, 'green', 'dash', 'Entry Threshold'),
    (-exit_std_score, 'red', 'dash', 'Exit Threshold')
]:
    fig.add_trace(go.Scatter(
        x=[data.index.min(), data.index.max()],
        y=[val, val],
        mode='lines',
        line=dict(color=color, dash=dash, width=1),
        name=label if label else '_nolegend_',
        yaxis='y2'
    ))

# --- Layout ---
fig.update_layout(
    title=f'{t1}/{t2} Price Series + Z-score with Trade Signals',
    xaxis=dict(title='Date'),
    yaxis=dict(title=f'Price ({t1}/{t2})', side='left'),
    yaxis2=dict(title='Z-score', overlaying='y', side='right'),
    legend=dict(x=0.01, y=0.99),
    width=1700,
    height=600
)

fig.show()

In [24]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# --- Ensure all necessary variables from the backtest are available ---
# This code assumes the following exist:
# ledger, data, t1, t2, log_prices, beta, z_score, z_score_sma
# initial_capital, entry_threshold, exit_threshold, stop_loss_threshold

print("Generating interactive Plotly charts...")

# --- 1. Filter data to the actual backtesting period ---
results = ledger[ledger['equity'] != initial_capital].dropna()

if results.empty:
    print("No trades were made, cannot generate plots.")
else:
    start_date = results.index[0]
    end_date = results.index[-1]

    # Align all data to the same date range for clean plotting
    plot_data = data[start_date:end_date]
    plot_log_prices = log_prices[start_date:end_date]
    plot_beta = beta[start_date:end_date]
    plot_z_score = z_score[start_date:end_date]
    plot_results = results.copy()

    # --- 2. Calculate Drawdown for plotting ---
    plot_results['running_max'] = plot_results['equity'].cummax()
    plot_results['drawdown'] = (plot_results['equity'] - plot_results['running_max']) / plot_results['running_max']

    # --- 3. Identify Trade Entry and Exit points for markers ---
    plot_results['prev_position'] = plot_results['position'].shift(1).fillna(0)
    trades = plot_results[plot_results['position'] != plot_results['prev_position']]
    # We distinguish between entering a trade (from flat) and exiting a trade (to flat)
    entry_signals = trades[trades['prev_position'] == 0]
    exit_signals = trades[trades['position'] == 0]


    # ==========================================================
    # PLOT 1: Performance Summary (Equity Curve & Drawdown)
    # ==========================================================
    fig1 = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1,
                         subplot_titles=('Portfolio Equity Curve', 'Drawdown'),
                         row_heights=[0.7, 0.3])

    fig1.add_trace(go.Scatter(x=plot_results.index, y=plot_results['equity'],
                              mode='lines', name='Equity', line=dict(color='blue')),
                   row=1, col=1)

    fig1.add_trace(go.Scatter(x=plot_results.index, y=plot_results['drawdown'],
                              mode='lines', name='Drawdown', fill='tozeroy',
                              line=dict(color='red')),
                   row=2, col=1)

    fig1.update_layout(title_text=f'Backtest Performance: {t1}/{t2}',
                       xaxis_title='',
                       yaxis_title='Portfolio Value ($)',
                       yaxis2_title='Drawdown',
                       showlegend=False, template='plotly_white')
    fig1.update_yaxes(tickformat='.2%', row=2, col=1)
    fig1.show()


    # ==========================================================
    # PLOT 2: Prices and Dynamic Hedge Ratio (Beta)
    # ==========================================================
    fig2 = make_subplots(specs=[[{"secondary_y": True}]])

    fig2.add_trace(go.Scatter(x=plot_log_prices.index, y=plot_log_prices[t1],
                              mode='lines', name=f'Log Price ({t1})', line=dict(color='teal')),
                   secondary_y=False)
    fig2.add_trace(go.Scatter(x=plot_log_prices.index, y=plot_log_prices[t2],
                              mode='lines', name=f'Log Price ({t2})', line=dict(color='orange')),
                   secondary_y=False)

    fig2.add_trace(go.Scatter(x=plot_beta.index, y=plot_beta,
                              mode='lines', name='Kalman Filter Beta', line=dict(color='purple', dash='dash')),
                   secondary_y=True)

    fig2.update_layout(title_text='Log Prices and Dynamic Hedge Ratio (Beta)',
                       xaxis_title='', template='plotly_white')
    fig2.update_yaxes(title_text="Log Price", secondary_y=False)
    fig2.update_yaxes(title_text="Beta", secondary_y=True)
    fig2.show()


Generating interactive Plotly charts...
