In [3]:
import numpy as np
import pandas as pd
from script.data_pipeline import shinyDataFetcher
import plotly.graph_objects as go
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
from statsmodels.regression.rolling import RollingOLS
import matplotlib.pyplot as plt
import itertools
import warnings
import statsmodels.tsa.stattools as ts
from IPython.display import HTML, display
import itables

warnings.filterwarnings("ignore")

def write_table(df, filename):
    html = itables.to_html_datatable(df, display_logo_when_loading=True)
    with open(f'../docs/dataframe/{filename}.html', 'w') as f:
        f.write(html)

EV_stocks = ["TSLA", "RIVN", "LCID", "F", "GM", "NIO", "XPEV", "BYDDF", "LI"]
EV_stocks_data = {}
for stock in EV_stocks:
    EV_stocks_data[stock] = shinyDataFetcher(asset=stock, durationStr="3 Y", barSizeSetting="1 day").fetch_asset_data()

log_prices = {}
for ticker in EV_stocks:
    df_temp = EV_stocks_data[ticker].copy()
    df_temp["log_close"] = np.log(df_temp["close"])
    log_prices[ticker] = df_temp.set_index("timestamp")["log_close"].dropna()

# # Loop over all unique pairs
# for s1, s2 in itertools.combinations(EV_stocks, 2):
#     # Align datasets by timestamp
#     print(f"Processing pair: {s1} & {s2}")
#     df_pair = pd.concat([log_prices[s1], log_prices[s2]], axis=1).dropna()
#     if df_pair.empty:
#         continue

#     X = log_prices[s1]
#     Y = log_prices[s2]
#     X_const = sm.add_constant(X)  # Adds a constant term to the predictor
#     model = sm.OLS(Y, X_const).fit()
#     print(f"OLS Coefficients: {model.params.to_dict()}, OLS p-value: {model.pvalues[1]:.4f}, OLS R-squared: {model.rsquared:.4f}")

#     # Perform cointegration test
#     result = coint(df_pair.iloc[:, 0], df_pair.iloc[:, 1])
#     test_stat, p_value, crit_values = result

#     print(f"Test Statistic: {test_stat:.3f}, p-value: {p_value:.4f}", f"Critical Values: {crit_values}")
#     print(f"r-square > 0.5? {'Yes' if model.rsquared > 0.5 else 'No'}")
#     print(f"Is cointegrated? {'Yes' if p_value < 0.05 else 'No'}")
#     print("-" * 50)

In [4]:
fig = go.Figure()
asset_a = "TSLA"
asset_b = "RIVN"
fig.add_trace(go.Scatter(x=EV_stocks_data[asset_a]["timestamp"], y=np.log(EV_stocks_data[asset_a]["close"]), mode="lines", name=f"{asset_a}"))
fig.add_trace(go.Scatter(x=EV_stocks_data[asset_b]["timestamp"], y=np.log(EV_stocks_data[asset_b]["close"]), mode="lines", name=f"{asset_b}"))
fig.update_layout(title=f"{asset_a}  vs. {asset_b} Stock Log Prices", xaxis_title="timestamp", yaxis_title="Price (USD)", legend=dict(x=0, y=1))
fig.show()
fig.write_html("html_plot/log_price.html")

In [None]:
X = np.log(EV_stocks_data[asset_a]["close"])
Y = np.log(EV_stocks_data[asset_b]["close"])
X_const = sm.add_constant(X)  # Adds a constant term to the predictor
model = sm.OLS(Y, X_const).fit()
print(model.summary())


result = ts.coint(X, Y)
print("Cointegration test result:")
print("========================================")
print("Test Statistic:", result[0])
print("p-value:", result[1])

                            OLS Regression Results                            
Dep. Variable:                  close   R-squared:                       0.029
Model:                            OLS   Adj. R-squared:                  0.027
Method:                 Least Squares   F-statistic:                     22.17
Date:                Thu, 17 Apr 2025   Prob (F-statistic):           2.97e-06
Time:                        10:13:11   Log-Likelihood:                -371.88
No. Observations:                 753   AIC:                             747.8
Df Residuals:                     751   BIC:                             757.0
Df Model:                           1                                         
Covariance Type:            nonrobust                                         
                 coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
const          1.4184      0.305      4.652      0.0

In [None]:
def rolling_adf(spread_series, time_index, window=90):
    adf_stats = []
    p_values = []
    dates = []

    for i in range(window, len(spread_series)):
        window_data = spread_series.iloc[i - window : i]
        result = adfuller(window_data, maxlag=1, autolag=None)
        adf_stats.append(result[0])  # Test statistic
        p_values.append(result[1])  # p-value
        dates.append(time_index.iloc[i])
    return pd.DataFrame({"timestamp": dates, "adf_stat": adf_stats, "p_value": p_values})


trade_df = pd.merge(EV_stocks_data["RIVN"][["timestamp", "open", "close"]], EV_stocks_data["TSLA"][["timestamp", "open", "close"]], on="timestamp", suffixes=("_RIVN", "_TSLA"))
trade_df["log_close_TSLA"] = np.log(trade_df["close_TSLA"])
trade_df["log_close_RIVN"] = np.log(trade_df["close_RIVN"])
X = trade_df["log_close_TSLA"].astype(float)
Y = trade_df["log_close_RIVN"].astype(float)
X_with_const = sm.add_constant(X)
model = RollingOLS(endog=Y, exog=X_with_const, window=20)
rres = model.fit()
trade_df["hedge_ratio"] = rres.params["log_close_TSLA"]
trade_df["spread"] = trade_df["log_close_RIVN"] - trade_df["hedge_ratio"] * trade_df["log_close_TSLA"]
trade_df["z_score"] = (trade_df["spread"] - trade_df["spread"].rolling(window=20).mean()) / trade_df["spread"].rolling(window=20).std()
trade_df.dropna(inplace=True)
rolling_adf_df = rolling_adf(trade_df["spread"], trade_df["timestamp"], window=60)
trade_df = pd.merge(trade_df, rolling_adf_df, on="timestamp", how="left")
trade_df.dropna(inplace=True)


In [43]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

fig = go.Figure()
fig.add_trace(go.Scatter(x=trade_df["timestamp"], y=trade_df["spread"], mode="lines", name=f"spread"))
fig.add_trace(go.Scatter(x=trade_df["timestamp"], y=trade_df["z_score"], mode="lines", name=f"z-score", opacity=0.4))
fig.add_hline(y=2, line_dash="dash", line_color="red", annotation_text="z-score upper threshold", annotation_position="top right", opacity=0.4)
fig.add_hline(y=-2, line_dash="dash", line_color="red", annotation_text="z-score lower threshold", annotation_position="bottom right", opacity=0.4)
fig.update_layout(title="spread through time")
fig.show()
# fig.write_html("html_plot/log_price.html")

In [7]:
fig = go.Figure()
rolling_window = 60
fig.add_trace(go.Scatter(x=rolling_adf_df["timestamp"], y=rolling_adf_df["p_value"], mode="lines", name="Rolling ADF p-value"))
fig.add_hline(y=0.05, line_dash="dash", line_color="red", annotation_text="Significance threshold (0.05)", annotation_position="bottom right")
fig.update_layout(title=f"Rolling ADF Test on Spread (window={rolling_window})", xaxis_title="timestamp", yaxis_title="p-value")
fig.show()
fig.write_html('html_plot/adf_test.html')

In [62]:
df = trade_df.copy()
df["signal_z_score"] = df["z_score"].shift(1)
df["signal_spread"] = df["spread"].shift(1)
df["signal_p_value"] = df["p_value"].shift(1)
df["position"] = 0
df["trade_entry"] = False
df["trade_exit"] = False
df["pnl"] = 0.0



in_position = False
position_type = 0
entry_index = None
entry_tsla_price = None
entry_rivn_price = None
tsla_qty = 0
rivn_qty = 0



blotter = []
ledger = []


notional = 10000


account_value = 100000
cumulative_pnl = 0.0

for i in range(1, len(df)):
    z = df["signal_z_score"].iloc[i]
    spread = df["signal_spread"].iloc[i]
    p_value = df["signal_p_value"].iloc[i]
    tsla_open = df["open_TSLA"].iloc[i]
    rivn_open = df["open_RIVN"].iloc[i]
    tsla_close = df["close_TSLA"].iloc[i]
    rivn_close = df["close_RIVN"].iloc[i]
    timestamp = df["timestamp"].iloc[i]
    beta = df["hedge_ratio"].iloc[i]

    if not in_position and p_value < 0.05:
        is_entry = False
        if z > 2:
            if beta > 0:
                # short spread: short TSLA, long RIVN
                tsla_qty = -round(notional / tsla_open)
                rivn_qty = round((beta * notional) / rivn_open)
                direction = "short TSLA, long RIVN"
                position_type = -1
                is_entry = True
            else:
                # short spread with negative beta: long TSLA, short RIVN
                tsla_qty = round(notional / tsla_open)
                rivn_qty = -round((abs(beta) * notional) / rivn_open)
                direction = "long TSLA, short RIVN (β<0)"
                position_type = -1
                is_entry = True

        elif z < -2:
            if beta > 0:
                # long spread: long TSLA, short RIVN
                tsla_qty = round(notional / tsla_open)
                rivn_qty = -round((beta * notional) / rivn_open)
                direction = "long TSLA, short RIVN"
                position_type = 1
                is_entry = True
            else:
                # long spread with negative beta: short TSLA, long RIVN
                tsla_qty = -round(notional / tsla_open)
                rivn_qty = round((abs(beta) * notional) / rivn_open)
                direction = "short TSLA, long RIVN (β<0)"
                position_type = 1
                is_entry = True

        if is_entry:
            df.at[df.index[i], "position"] = position_type
            df.at[df.index[i], "trade_entry"] = True
            in_position = True
            entry_index = i

            entry_tsla_price = tsla_open
            entry_rivn_price = rivn_open

            blotter.append(
                {
                    "entry_time": timestamp,
                    "entry_tsla": tsla_open,
                    "entry_rivn": rivn_open,
                    "direction": direction,
                    "entry_spread": spread,
                    "hedge_ratio": beta,
                    "TSLA_qty": tsla_qty,
                    "RIVN_qty": rivn_qty,
                }
            )

    daily_pnl = 0.0
    if in_position:
        daily_pnl = (tsla_close - entry_tsla_price) * tsla_qty + (rivn_close - entry_rivn_price) * rivn_qty
        df.at[df.index[i], "pnl"] = daily_pnl
        # exit condition
        mean_reversion_triggered = abs(z) < 0.5
        timeout_triggered = i - entry_index >= 10 and (abs(z) > 3 or p_value > 0.05)
        if mean_reversion_triggered or timeout_triggered:
            reason = "mean_reversion" if mean_reversion_triggered else "timeout"
            df.at[df.index[i], "position"] = 0
            df.at[df.index[i], "trade_exit"] = True
            pnl_tsla = (df["open_TSLA"].iloc[i] - entry_tsla_price) * tsla_qty
            pnl_rivn = (df["open_RIVN"].iloc[i] - entry_rivn_price) * rivn_qty
            daily_pnl = pnl_tsla + pnl_rivn
            df.at[df.index[i], "pnl"] = daily_pnl
            cumulative_pnl += daily_pnl
            account_value += daily_pnl
            blotter[-1].update({"exit_time": timestamp, "exit_tsla": tsla_open, "exit_rivn": rivn_open, "exit_spread": df["spread"].iloc[i], "pnl": daily_pnl, "reason": reason})
            in_position = False
            position_type = 0
            entry_index = None
            entry_tsla_price = None
            entry_rivn_price = None
            tsla_qty = 0
            rivn_qty = 0
    else:
        df.at[df.index[i], "pnl"] = 0.0

    ledger.append(
        {
            "timestamp": timestamp,
            "position": position_type,
            "TSLA_qty": tsla_qty,
            "RIVN_qty": rivn_qty,
            "TSLA_close": tsla_close,
            "RIVN_close": rivn_close,
            "daily_pnl": daily_pnl,
            "account_value": account_value + daily_pnl,
            "realized_pnl": cumulative_pnl,
        }
    )



blotter_df = pd.DataFrame(blotter)
ledger_df = pd.DataFrame(ledger)

In [58]:
blotter_df
write_table(blotter_df, 'blotter')

In [63]:
blotter_df

Unnamed: 0,entry_time,entry_tsla,entry_rivn,direction,entry_spread,hedge_ratio,TSLA_qty,RIVN_qty,exit_time,exit_tsla,exit_rivn,exit_spread,pnl,reason
0,2023-01-19,127.28,16.27,"short TSLA, long RIVN",-0.820653,0.55482,-79,341,2023-02-02,187.33,20.44,0.813464,-3321.98,timeout
1,2023-07-31,267.41,27.02,"long TSLA, short RIVN (β<0)",5.509661,-1.096994,37,-406,2023-08-14,235.7,20.9,-1.243744,1311.45,mean_reversion
2,2023-09-28,240.02,23.02,"short TSLA, long RIVN",0.275336,0.485939,-42,211,2023-10-19,225.84,17.8,2.134464,-505.86,timeout
3,2023-11-17,231.97,16.48,"short TSLA, long RIVN",0.660076,0.36414,-43,221,2023-12-05,233.85,17.6,-0.389895,166.68,mean_reversion
4,2023-12-11,242.75,19.11,"long TSLA, short RIVN",-3.450797,1.374678,41,-719,2023-12-26,254.49,23.47,-19.175641,-2653.5,timeout
5,2024-04-16,156.77,8.38,"long TSLA, short RIVN",-6.620424,1.781544,64,-2126,2024-04-24,162.92,9.16,-3.107723,-1264.68,mean_reversion
6,2024-05-23,181.8,10.57,"long TSLA, short RIVN (β<0)",2.534002,-0.32362,55,-306,2024-06-03,178.13,11.12,1.306238,-370.15,mean_reversion
7,2025-01-07,405.81,15.93,"short TSLA, long RIVN",1.644824,0.031547,-25,20,2025-01-23,416.06,12.61,2.29016,-322.65,timeout


In [53]:
ledger_df
write_table(ledger_df, 'ledger')

In [54]:
itables.show(ledger_df)

timestamp,position,TSLA_qty,RIVN_qty,TSLA_close,RIVN_close,daily_pnl,account_value,realized_pnl
Loading ITables v2.3.0 from the internet... (need help?),,,,,,,,


In [55]:
# Create cumulative PnL column
df["cumulative_pnl"] = df["pnl"].cumsum()

# Trade summary metrics
total_trades = df["trade_exit"].sum()
total_pnl = df["pnl"].sum()
winning_trades = df[df["pnl"] > 0]["pnl"].count()
losing_trades = df[df["pnl"] < 0]["pnl"].count()
avg_return = df["pnl"][df["pnl"] != 0].mean()
win_rate = winning_trades / total_trades if total_trades > 0 else np.nan
entry_dates = df.loc[df["trade_entry"], "timestamp"].reset_index(drop=True)
exit_dates = df.loc[df["trade_exit"], "timestamp"].reset_index(drop=True)
holding_durations = [(exit_date - entry_date).days for entry_date, exit_date in zip(entry_dates, exit_dates)]
avg_holding_period = np.mean(holding_durations) if holding_durations else np.nan


# Summary dictionary
summary = {
    "Total Trades": total_trades,
    "Total PnL": round(total_pnl, 4),
    "Winning Trades": winning_trades,
    "Losing Trades": losing_trades,
    "Win Rate (%)": round(win_rate * 100, 2) if not np.isnan(win_rate) else "N/A",
    "Average Trade Return": round(avg_return, 4) if not np.isnan(avg_return) else "N/A",
    "Average Holding Period (days)": round(avg_holding_period, 2) if not np.isnan(avg_holding_period) else "N/A",
}



In [61]:
# Plot cumulative PnL with Plotly
fig = go.Figure()
fig.add_trace(go.Scatter(x=ledger_df.index, y=ledger_df["account_value"], mode="lines", name="account value"))
fig.update_layout(title="NAV Over Time", xaxis_title="Date", yaxis_title="account value")
fig.show()
fig.write_html("../docs/images/cumulative_pnl.html")

In [10]:
import pprint
pprint.pprint(summary)

{'Average Holding Period (days)': np.float64(9.69),
 'Average Trade Return': np.float64(0.5479),
 'Losing Trades': np.int64(16),
 'Total PnL': np.float64(19.1762),
 'Total Trades': np.int64(35),
 'Win Rate (%)': np.float64(54.29),
 'Winning Trades': np.int64(19)}
