# Momentum Studies

Study momentum (SMA, EMA, RSI) strategies on ETFs.

- [ ] KDE of fast/slow windows
- [ ] Add RSI strategies
- [ ] Investigate leverged and inverse (levered) ETFs
- [ ] Cross check with SPY and IWM
- [ ] Moving X years window for MA windows
- [ ] Real backtest with fees and ask prices
- [ ] Deploy with half Kelly

## Fetch data

In [None]:
# Alpha Vantage (no used)

import os
import dotenv
from urllib.parse import urlencode
import requests

dotenv.load_dotenv()
av_key = os.environ.get('ALPHAVANTAGE_API_KEY')
av_url = 'https://www.alphavantage.co/query'

params = {
    'function': 'TIME_SERIES_DAILY',
    'outputsize': 'full',
    'apikey': av_key,
    'datatype': 'csv',
}

for symbol in ['TQQQ', 'SQQQ', 'QQQ', 'PSQ']:
    p = os.path.join('cache', f'{symbol}.csv')
    if os.path.isfile(p):
        continue
    params['symbol'] = symbol
    resp = requests.get(f'{av_url}?{urlencode(params, doseq=True)}')
    resp.raise_for_status()
    with open(p, "w") as fd:
        fd.write(resp.text)

In [None]:
# Yahoo Finance

import pandas as pd
from datetime import date
import os

now = date.today().isoformat()

for s in ['TQQQ', 'SQQQ', 'QQQ', 'PSQ']:
    df = yf.download(s, period='max',interval='1d',multi_level_index=False,auto_adjust=True)
    p = os.path.join('cache',f'{s.lower()}-{now}.csv')
    df.to_csv(p)

In [None]:
import functools as fc
import itertools as it
import pandas as pd
import numpy as np

# raw csv
df = pd.read_csv("cache/tqqq-2025-11-10.csv")
df.rename(columns={
    'Open':'open',
    'High':'high',
    'Low':'low',
    'Close':'close',
    'Volume':'volume',
    'Date':'timestamp'},inplace=True)

# index by date
df.timestamp = pd.to_datetime(df.timestamp)
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)

df['absret'] = df.open - df.shift(1).open
windows = list(range(1,30)) + list(range(30,100,5)) + list(range(100,500,10))

# compute sma
for w in windows:
    df[f'sma{w}'] = df.open.rolling(w, min_periods=1).mean()    
    df[f'sma{w}'] = df[f'sma{w}'].where(df[f'sma{w}'].notna(), df.open)

# compute ema
for w in windows:
    df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()    
    df[f'ema{w}'] = df[f'ema{w}'].where(df[f'ema{w}'].notna(), df.open)
df = df.copy()

# compute rsi
for w in windows:
    up = df.absret.where(df.absret > 0.0).rolling(w,min_periods=0).mean().fillna(0)
    down = df.absret.where(df.absret < 0.0).rolling(w,min_periods=0).mean().fillna(0)
    df[f'rsi{w}'] = 100 - (100 / (1 + (up / down)))
df = df.copy()

signals = []

# open and sma/ema
for w in windows:
    df[f'open_sma{w}'] = df.open - df[f'sma{w}']
    df[f'open_ema{w}'] = df.open - df[f'ema{w}']
    signals += [f'open_sma{w}',f'open_ema{w}']
df = df.copy()

# fast and slow sma/ema
for [fast, slow] in filter(lambda w: w[0] < w[1], it.product(windows, windows)):
    df[f'sma{fast}_{slow}'] = df[f'sma{fast}'] - df[f'sma{slow}']
    df[f'ema{fast}_{slow}'] = df[f'ema{fast}'] - df[f'ema{slow}']
    df[f'emasma{fast}_{slow}'] = df[f'ema{fast}'] - df[f'sma{slow}']
    df = df.copy()
    signals += [f'sma{fast}_{slow}',f'ema{fast}_{slow}',f'emasma{fast}_{slow}']

df.head()

  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean()
  df[f'ema{w}'] = df.open.ewm(span=w, adjust=False).mean

## Compute SMA, EMA, RSI Indicators

## Compute Information Ratio For Log Returns

In [None]:
import matplotlib.pyplot as plt

eodret = np.log(df.open / df.shift(1).open).shift(-1)

res = pd.DataFrame(np.nan, index=signals, columns=('ret','sd','avg'))
res.loc['benchmark'] = [eodret.sum(), eodret.std(), eodret.mean()]

for s in signals:
    perf = eodret.where(df[s] > 0, 0)
    res.loc[s] = [perf.sum(), perf.std(), perf.mean()]

res['ir'] = np.nan_to_num((res.avg - res.loc['benchmark'].avg) / res.sd)

res.sort_values(['ir'],inplace=True, ascending=False)
# best strategies
res.head()

## Heatmaps For Best Fast and Slow MAs

In [None]:
import seaborn as sns
import matplotlib.pylab as plt
from matplotlib.patches import Rectangle

# fast / slow
sma_ir = np.full((len(windows) + 1, len(windows) + 1), np.nan)
ema_ir = np.full((len(windows) + 1, len(windows) + 1), np.nan)
mix_ir = np.full((len(windows) + 1, len(windows) + 1), np.nan)

ew = list(enumerate(windows,start=1))
for [i,w] in ew:
    sma_ir[0,i] = res.loc[f'open_sma{w}'].ir
    ema_ir[0,i] = res.loc[f'open_ema{w}'].ir
    
for [[i,fast], [j,slow]] in filter(lambda w: w[0][1] < w[1][1], it.product(ew,ew)):
    sma_ir[i,j] = res.loc[f'sma{fast}_{slow}'].ir
    ema_ir[i,j] = res.loc[f'ema{fast}_{slow}'].ir
    mix_ir[i,j] = res.loc[f'emasma{fast}_{slow}'].ir

cm = sns.diverging_palette(h_neg=10, h_pos=133, s=75, l=50, as_cmap=True)
fig, (ax1, ax2, ax3) = plt.subplots(nrows=3, ncols=1, figsize=(6, 18)) 
plt.subplots_adjust(wspace=0.3,hspace=.5) 

for [nam, df, ax] in [['sma',sma_ir,ax1],['ema',ema_ir,ax2],['ema/sma',mix_ir,ax3]]:
    ax = sns.heatmap(
        df,
        ax=ax,
        mask=np.isnan(sma_ir),
        vmin=np.min(sma_ir),
        vmax=np.max(sma_ir),
        cmap=cm)
    
    for idx in np.argsort(np.nan_to_num(df),axis=None)[::-1][:6]:
        pos = (idx % df.shape[0], idx / df.shape[0])
        ax.add_patch(Rectangle(pos,1,1,fill=False,edgecolor='red',lw=4,clip_on=False))
        
    ax.set_yticklabels(['open'] + windows, rotation=0)
    ax.set_xticklabels(['open'] + windows, rotation=45, ha='right')
    ax.set(ylabel="fast", xlabel="slow")
    ax.set_title(f'{nam} ir')
    
plt.show()

In [None]:
lookfwd = 60 # trading days
runs = []
active = None
for date in df.index:
    go_long = df.long_signal[date]
    go_short = df.short_signal[date]
    
    if active == None:
        if go_long:
            active = date
        continue
    if not go_short and len(df.open[active:date]) <= lookfwd:
        continue

    entry = df.open[active]
    run = df.open[active:date].apply(lambda x: x/entry).values[1:]
    runs.append(np.pad(run, (0,lookfwd-len(run)), mode='constant',constant_values=np.nan))
    active = None


signal = pd.DataFrame(
    {
        'std': np.nanstd(runs,axis=0),
        'mean': np.nanmean(runs,axis=0),
        'min': np.nanmin(runs,axis=0),
        'max': np.nanmax(runs,axis=0),
    } + np.quartile(runs,[.25,.5,.75],axis=0
    columns=['std','mean','min','max'],
    index=range(lookfwd))

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection

open_segs = np.column_stack((range(len(df.open)), np.log(df.open)))
open_segs = list(zip(open_segs, open_segs[1:]))
color_segs = np.where(df.above_sma, 'green', 'red')

px = 1/plt.rcParams['figure.dpi']  # pixel in inches
fig, [ax0, ax1] = plt.subplots(nrows=2,figsize=(1500*px, 1000*px))

# price
ax0.set_xlim(0, len(df.open))
ax0.set_ylim(0, np.log(df.open.max()))
ax0.scatter(range(len(df.index)), np.log(df.open.mask(~df.long_signal)), color='black', s=20)
ax0.add_collection(LineCollection(open_segs,colors=color_segs))
ax0.plot(range(len(df.sma)), np.log(df.sma))

# signal returns
ax1.set_xlim(1, lookfwd)
maxabs = max(abs(1-signal['min'].min()), abs(1-signal['max'].max()))
ax1.set_ylim(1-maxabs, 1+maxabs)

ax1.plot(range(1, lookfwd+1), np.ones(lookfwd), color='red')
# 1 sigma error bars
ax1.errorbar(range(1, lookfwd+1), signal['mean'], yerr=signal['std']*2)
plt.show()

In [36]:
from lightweight_charts import JupyterChart



chart = JupyterChart(toolbox=True) 
chart.set(df[['open','high','low','close','volume']])


chart.horizontal_line(0)
for col in ['sma20']:
    line = chart.create_line(col)
    line.set(df[[col]])
chart.load()
#res.head()
