In [None]:
# Parameters
hurst_power = 8   # Number of steps used is 2^power
ticker_list = [
    '^TNX',
    'TBT',
    #'SPY',
    'QQQ',
    #'SOXL',
    #'TECL'
]
tickers_to_plot = [
    '^TNX',
    'QQQ'
]

rsi_window = 14
macd_fast_window = 12
macd_slow_window = 26
macd_signal = 9

n_days = 2 ** hurst_power + 400
resolution='1d'

In [None]:
# Installs (only need to run once)
!pip install numpy pandas pandas_ta yfinance scipy statsmodels matplotlib

In [None]:
# Imports
import math
import time
import pandas as pd
import numpy as np
import yfinance as yf
import pandas_ta as ta
import scipy.stats as sps
import statsmodels.api as sm
import matplotlib.pyplot as plt
from datetime import date, datetime, timedelta

from google.colab import output
output.no_vertical_scroll()
from IPython.display import display, HTML
pd.options.display.max_rows = None
from google.colab import data_table
data_table.enable_dataframe_formatter()

pd.options.mode.chained_assignment = None

In [None]:
start_date = date.today() - timedelta(days=n_days)
end_date = date.today()

final_df = pd.DataFrame()

def rma(x, n, y0):
    a = (n-1) / n
    ak = a**np.arange(len(x)-1, -1, -1)
    return np.r_[np.full(n, np.nan), y0, np.cumsum(ak * x) / ak / n + y0 * a**np.arange(1, len(x)+1)]


for ticker in ticker_list:
    # Download data and convert to numpy array
    df = yf.download(ticker, start=start_date, end=end_date + timedelta(days=2), interval=resolution)


    # Compute MACD using pandas_ta and get sign changes over signal line
    df.ta.macd(close=round(df['Close'], 2), fast=12, slow=26, signal=9, append=True)
    df['MACDsignChange'] = np.sign(df['MACDh_12_26_9']).diff(1).ne(0)
    df['Signal'] = ''
    # Drop NA values and reset index
    df.dropna(subset=['MACDh_12_26_9'], inplace=True)
    df = df.iloc[1:,:]
    df.reset_index(drop=False, inplace=True)

    # Generate signal based on value of MACD and sign change
    for index, row in df.loc[1:,:].iterrows():
      # For TNX, multiply MACD by 100
      if ticker == '^TNX':
          df.loc[index, 'MACDh_12_26_9'] = df.loc[index, 'MACDh_12_26_9'] * 100

      if df.loc[index, 'MACDh_12_26_9'] < 0 and df.loc[index, 'MACDsignChange'] == True:
          df.loc[index, 'Signal'] = 'Short'
      elif df.loc[index, 'MACDh_12_26_9'] > 0 and df.loc[index, 'MACDsignChange'] == True:
          df.loc[index, 'Signal'] = 'Long'
      else:
          df.loc[index, 'Signal'] = df.loc[index-1, 'Signal']

    # Add RSI to the dataframe
    n = rsi_window
    df.loc[:, 'change'] = df['MACDh_12_26_9'].diff()
    df.loc[:, 'gain'] = df.change.mask(df.change < 0, 0.0)
    df.loc[:, 'loss'] = -df.change.mask(df.change > 0, -0.0)
    df.loc[:, 'avg_gain'] = rma(df['gain'].iloc[n+1:].to_numpy(), n, np.nansum(df['gain'].to_numpy()[:n+1])/n)
    df.loc[:, 'avg_loss'] = rma(df['loss'].iloc[n+1:].to_numpy(), n, np.nansum(df['loss'].to_numpy()[:n+1])/n)
    df.loc[:, 'rs'] = df['avg_gain'] / df['avg_loss']
    df.loc[:, 'rsi_14'] = round(100 - (100 / (1 + df['rs'])))

    df = df.dropna()

    # Add Hurst Exponent and P-value to the dataframe
    n = 2**hurst_power
    prices = df['Close'].values
    hursts = np.full(len(df), np.nan)  # Preallocate hursts array with NaN
    tstats = np.full(len(df), np.nan)  # Preallocate tstats array with NaN
    pvalues = np.full(len(df), np.nan)

    # Compute returns
    returns = prices[1:] / prices[:-1] - 1

    # Start sliding the rolling window from n to the end of array
    for t in np.arange(n, len(returns) + 1):
        # Rolling window sample
        window_data = returns[t - n:t]

        # Generate list of powers of two
        X = np.arange(2, hurst_power + 1)
        Y = np.array([])

        for p in X:
            m = 2**p
            s = 2**(hurst_power - p)
            rs_array = np.array([])
            for i in np.arange(0, s):
                subsample = window_data[i * m:(i + 1) * m]
                mean = np.average(subsample)
                deviate = np.cumsum(subsample - mean)
                difference = max(deviate) - min(deviate)
                stdev = np.std(subsample)
                rescaled_range = difference / stdev
                rs_array = np.append(rs_array, rescaled_range)
            Y = np.append(Y, np.log2(np.average(rs_array)))

        reg = sm.OLS(Y, sm.add_constant(X))
        res = reg.fit()
        hurst = res.params[1]
        tstat = (res.params[1] - 0.5) / res.bse[1]
        pvalue = 2 * (1 - sps.t.cdf(abs(tstat), res.df_resid))

        hursts[t - 1] = hurst
        tstats[t - 1] = tstat
        pvalues[t - 1] = pvalue

    df['hursts'] = hursts.tolist()
    df['pvalues'] = pvalues.tolist()

    df = df.dropna()

    columns_to_keep = [
        'Date',
        'Close',
        'MACDh_12_26_9',
        'rsi_14',
        'hursts',
        'pvalues',
    ]

    df_final = df[columns_to_keep].copy()
    df = pd.DataFrame()
    df_final = df_final.rename(columns={
        'Close': f"{ticker} Price",
        "MACDh_12_26_9": f"{ticker} Hist",
        "rsi_14": f"{ticker} RSI",
        "hursts": f"{ticker} Hurst",
        "pvalues": f"{ticker} P-Value"
    })

    final_df = pd.concat([final_df, df_final], axis=1)

final_df = final_df.round(3)
final_df = final_df.loc[:,~final_df.columns.duplicated()].copy()

fig, axs = plt.subplots(nrows=4, ncols=1, figsize=(14, 14), sharex=True)

# Plot all prices on a log scale
for ticker in tickers_to_plot:
    price_col = f'{ticker} Price'
    if price_col in final_df.columns:
        axs[0].plot(final_df['Date'], final_df[price_col], label=ticker)
axs[0].set_title('Prices (Log Scale)')
axs[0].set_ylabel('Price')
axs[0].set_yscale('log')
axs[0].legend()
axs[0].grid(True)

# Plot all histograms
for ticker in tickers_to_plot:
    hist_col = f'{ticker} Hist'
    if hist_col in final_df.columns:
        axs[1].bar(final_df['Date'], final_df[hist_col], label=ticker, alpha=0.5)
axs[1].set_title('Histograms')
axs[1].set_ylabel('Histogram')
axs[1].legend()
axs[1].grid(True)

# Plot all RSIs
for ticker in tickers_to_plot:
    rsi_col = f'{ticker} RSI'
    if rsi_col in final_df.columns:
        axs[2].plot(final_df['Date'], final_df[rsi_col], label=ticker)
axs[2].set_title('RSIs')
axs[2].set_ylabel('RSI')
axs[2].legend()
axs[2].grid(True)

# Plot Hurst Exponent and P-Values
for ticker in tickers_to_plot:
    hurst_col = f'{ticker} Hurst'
    pvalue_col = f'{ticker} P-Value'

    if hurst_col in final_df.columns:
        axs[3].plot(final_df['Date'], final_df[hurst_col], label=f'{ticker} Hurst')
    if pvalue_col in final_df.columns:
        axs[3].plot(final_df['Date'], final_df[pvalue_col], label=f'{ticker} P-Value')

axs[3].set_title('Hurst Exponent and P-Values')
axs[3].axhline(y=0.5, label='Hurst = 0.5', color='b')
axs[3].axhline(y=0.05, label='P-value = 0.05', color='r')
axs[3].set_xlabel('Date')
axs[3].set_ylabel('Value')
axs[3].legend()
axs[3].grid(True)

plt.tight_layout()
plt.show()

display(final_df)