![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [1]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn
from datetime import timedelta, datetime
qb= QuantBook()


# Setting up preliminary information

In [None]:
symbols = ["GOOGL", "MSFT"]
time_range = 365
end = datetime.now()
start = end - timedelta(days=time_range)
resolution = Resolution.Hour
for symbol in symbols:
    qb.add_equity(symbol)

In [23]:
history = qb.history(symbols, start, end, resolution)


In [24]:
if history.empty:
    raise RuntimeError("couldn't find any information")
else:
    print(history.head())
    print(history.tail())



In [25]:
closes = history["close"]
# print(closes.head())
# print(closes.tail())

closes = closes.unstack(level=0)
print(closes.head())


# Starting Analysis

In [None]:
X = closes["GOOGL"] # we'll use this as our independent variable
Y = closes["MSFT"]
print(X)

plt.scatter(X, Y, color='blue', label='GOOGL vs MSFT')

# We'll create a predictor function using OLS


In [29]:
X = np.vstack([np.ones(len(X)), X]).T
# print(X)

theta = np.linalg.lstsq(X, Y, rcond=None)
print(theta) # solution matrix is the first variable, with the first being our intercept and second being our slope, since we only have 1 independent variable, but many observations, our A matrix should be 2 columns
beta = theta[0][1]
alpha_hat = theta[0][0]


In [31]:
spread = closes['MSFT'] - (beta * closes['GOOGL'] + alpha_hat)
print((beta * closes['GOOGL'] + alpha_hat).head())
print(closes["MSFT"].head())
# print(spread.head())

In [None]:
# Rolling Statistics 
sma_window = 30 # same as SMA(30) in the algorithm (30 hourly bars)
spread_sma = spread.rolling(sma_window).mean()
spread_std = spread.rolling(sma_window).std()

z_score = (spread - spread_sma) / spread_std



In [None]:
# plotting the spread and moving average
plt.plot(spread.index, spread, label='spread(X - beta.Y)')
plt.plot(spread_sma.index, spread_sma, label=f'SMA({sma_window})')
plt.title('Constructed spread and rolling mean')
plt.legend()
plt.show()

In [None]:
def macd_series(series, fast=12, slow=26, signal=9):
    ema_fast = series.ewm(span=fast, adjust=False).mean()
    ema_slow = series.ewm(span=slow, adjust=False).mean()
    macd_line = ema_fast - ema_slow
    signal_line = macd_line.ewm(span=signal, adjust=False).mean()
    hist = macd_line - signal_line
    return macd_line, signal_line, hist

macd_line, signal_line, macd_hist = macd_series(spread, fast=12, slow=26, signal=9)

plt.plot(spread.index, macd_line, label='MACD line')
plt.plot(spread.index, signal_line, label='Signal line')
plt.legend() 
plt.title('MACD on spread') 
plt.show()



In [38]:
s = spread.dropna()
s_lag = s.shift(1).dropna()

# print(s)

# need to add what these ones do
ds = (s - s_lag).dropna()
s_lag = s_lag.loc[ds.index]

B = np.vstack([np.ones(len(s_lag)), s_lag]).T
# print(B)

alpha = np.linalg.lstsq(B, ds.values, rcond=None)
coefs = alpha[0]
b = alpha[0][1]
# print(coefs)
# print(b)

dt = 1.0 # unit = 1 hour between observations

theta = np.nan
if (1 + b) > 0:
    theta = -np.log(1 + b) / dt

print(theta)
half_life = np.nan
if (theta > 0):
    half_life = np.log(2) / theta
    
print(half_life)

In [39]:
static_z = 2.0

hl = max(1.0, np.nan_to_num(half_life, 4.0))
dynamic_z = 2.0 * np.sqrt(hl / 50.0)
print(f"static_z={static_z:.2f}, dynamic_z={dynamic_z:.2f} (half-life based)")

z = z_score
macd_pos = macd_line > signal_line
macd_neg = macd_line < signal_line


In [None]:
#Backtest logic
start_nav = 100000.0
nav = start_nav
target_dollars = 0.01 * start_nav  # target 1% of portfolio per trade before scaling
positions = []   # list of (entry_time, entry_spread, sign, units)
trade_log = []

in_position = False
pos = None  # (entry_idx, sign, units, entry_spread)

for t in range(len(spread)-1):
    idx = spread.index[t]
    s_t = spread.iloc[t]
    z_t = z.iloc[t]
    macd_up = macd_pos.iloc[t]
    macd_down = macd_neg.iloc[t]

    # determine threshold (can use static or dynamic)
    z_threshold = dynamic_z  # choose dynamic; switch to static_z to compare

    # generate signals
    enter_long = (macd_up and (z_t < -z_threshold))
    enter_short = (macd_down and (z_t > z_threshold))

    if not in_position:
        if enter_long or enter_short:
            sign = 1 if enter_long else -1   # +1 => long spread (buy QQQ, short SPY)
            # volatility scaled units: target_dollars divided by abs(spread) (avoid divide by zero)
            units = (target_dollars / (abs(s_t) + 1e-9))
            # floor units to reasonable number (optional)
            in_position = True
            pos = (t, sign, units, s_t)
    else:
        entry_t, sign, units, entry_s = pos
        # exit if mean reversion (z crossing zero) or opposite signal
        z_entry = z.iloc[entry_t]
        exit_condition = ( (sign == 1 and z_t >= 0) or (sign == -1 and z_t <= 0) )
        opposite = ( (sign == 1 and enter_short) or (sign == -1 and enter_long) )
        if exit_condition or opposite:
            # P&L is units * (spread_exit - spread_entry) * sign
            spread_exit = s_t
            pnl = units * (spread_exit - entry_s) * sign
            nav += pnl
            trade_log.append({
                'entry_time': spread.index[entry_t],
                'exit_time': idx,
                'entry_spread': entry_s,
                'exit_spread': spread_exit,
                'sign': sign,
                'units': units,
                'pnl': pnl,
                'nav': nav
            })
            in_position = False
            pos = None

# Summarize
trade_df = pd.DataFrame(trade_log)
print("Trades:", len(trade_df))
print("Final NAV:", nav)
if not trade_df.empty:
    print("Avg PnL per trade:", trade_df['pnl'].mean())
    print("Win rate:", (trade_df['pnl'] > 0).mean())
    # cumulative NAV plot
    trade_df.set_index('exit_time', inplace=True)
    trade_df['nav'].plot(title='NAV after each closed trade')
    plt.show()


In [None]:
if trade_df.empty:
    print("No trades executed with these rules — try lowering threshold or using static_z.")
else:
    total_pnl = trade_df['pnl'].sum()
    returns = trade_df['pnl'] / start_nav
    ann_return = (1 + returns.sum()) ** (365.0 / time_range) - 1
    print(f"Total PnL: {total_pnl:.2f}, Trade count: {len(trade_df)}")
    print(f"Approx annualized return (trade-level sum approximation): {ann_return:.2%}")
    # simple equity curve using exit NAVs
    equity = trade_df['nav']
    plt.plot(equity.index, equity.values)
    plt.title('Equity curve (nav after each closed trade)')
    plt.show()
