# Libraries

In [1]:
import time
import numpy as np
import pandas as pd
from pandas import DataFrame
from pandas.core.api import Series as Series
import matplotlib.pyplot as plt
from pytrends.request import TrendReq
import yfinance as yf
import datetime
from statsmodels.tsa.stattools import grangercausalitytests
import warnings as wrn
import os
from enum import Enum
from typing import Tuple

# Setting up params

In [2]:
# general_stocks = ['KO', 'PFE', 'WMT', 'PG', 'JNJ', 'DIS', 'PEP', 'MCD', 'T', 'VZ']
# tech_stocks = ['AAPL', 'AMZN', 'MSFT', 'GOOGL', 'NVDA', 'TSLA', 'META', 'INTC', 'IBM', 'AMD']
# finance_stocks = ['GS', 'BAC', 'WFC', 'USB', 'JPM', 'MA', 'V', 'AXP', 'C', 'BLK']
decentralized_currencies = ['BTC', 'ETH', 'ADA', 'SOL', 'XRP', 'XMR', 'LTC', 'DOT', 'LINK', 'XTZ', 'DOGE', 'SHIB']

# general_stocks_names = ['Coca-Cola', 'Pfizer', 'Walmart', 'Procter & Gamble', 'Johnson & Johnson', 'Disney', 'Pepsi', 'McDonalds', 'AT&T', 'Verizon']
# tech_stocks_names = ['Apple', 'Amazon', 'Microsoft', 'Google', 'Nvidia', 'Tesla', 'Meta', 'Intel', 'IBM', 'AMD']
# finance_stocks_names = ['Goldman Sachs', 'Bank of America', 'Wells Fargo', 'US Bancorp', 'JPMorgan Chase', 'Mastercard', 'Visa', 'American Express', 'Citigroup', 'BlackRock']
decentralized_currencies_names = ['Bitcoin', 'Ethereum', 'Cardano', 'Solana', 'Ripple', 'Monero', 'Litecoin', 'Polkadot', 'Chainlink', 'Tezos', 'Dogecoin', 'Shiba Inu']

# color_map = {
#     'general': 'deepskyblue',
#     'tech': 'limegreen',
#     'finance': 'darkorchid',
#     'crypto': 'red'
# }

start = '2019-06-30'
end = '2024-07-01'
max_lags = 7

balance = 10000

# Functions
## get_trends_data
gets the trend data using pytrends, given a certain timeframe

In [3]:
def get_trends_data(keyword, 
                    timeframe=datetime.date.today().strftime('%Y-%m-%d') + ' ' + (datetime.date.today() - datetime.timedelta(days = 269)).strftime('%Y-%m-%d'),
                    retries=5, 
                    backoff_factor=1.0,
                    verbose=True):
    pytrends = TrendReq(hl='en-US', tz=360, timeout=(10,25), )
    pytrends.build_payload(keyword, cat = 0, timeframe = timeframe, geo='')
    
    for i in range(retries):
        try:
            df = pytrends.interest_over_time()
            if df is not None and not df.empty:
                if verbose:
                    print(f"Trend Data for {keyword[0]} at timeframe {timeframe} retrieved successfully.")
                df.reset_index(inplace = True)
                df.rename(columns = {'date': 'Date', keyword[0]: 'Trend'}, inplace = True)
                df['Date'] = pd.to_datetime(df['Date'].dt.strftime('%m/%d/%Y'))
                return df
            else:
                print("No data retrieved or DataFrame is empty.")
                return None
        except Exception as e:
            if "429" in str(e):
                sleep_time = backoff_factor * (2 ** i)
                if verbose:
                    print(f"Rate limit exceeded. Retrying in {sleep_time} seconds...")
                time.sleep(sleep_time)
            else:
                raise(f"An error occurred: {e}")
    print("Failed to retrieve data after several retries.")
    return None

## get_stock_data
gets the prices of a certain stock in a certain timeframe

In [4]:
def get_stock_data(ticker, start, end, verbose = True):
    currTicker = yf.Ticker(ticker)
    tickerDF = currTicker.history(repair = True, start = start, end = end, auto_adjust = False).drop(columns = ['Dividends', 'Stock Splits', 'Repaired?']).reset_index()
    if verbose:
        print(f"Stock Data for {ticker} retrieved successfully.")
    tickerDF['Date'] = pd.to_datetime(tickerDF['Date'].dt.strftime('%m/%d/%Y'))
    return tickerDF

## trend_corr
gets the data of the trends and prices of the stock given to it in a certain timeframe and calculates the correlation between the log_returns and the trends delayed by certain delay

In [5]:
def trend_corr(stock, days = 60, start = '2023-10-01', end = '2024-06-01', delay = 7):
    if not os.path.exists(f"./Data/{stock}_trends({start} - {end}).csv"):
        if stock in general_stocks:
            name = general_stocks_names[general_stocks.index(stock)]
        elif stock in tech_stocks:
            name = tech_stocks_names[tech_stocks.index(stock)]
        elif stock in finance_stocks:
            name = finance_stocks_names[finance_stocks.index(stock)]
        else:
            name = decentralized_currencies_names[decentralized_currencies.index(stock)]
        t = get_trends_data([name], timeframe = f"{start} {end}")
        if t is None:
            raise Exception(f'Failed to retrieve Trend Data of {stock}.')
        t.to_csv(f"./Data/{stock}_trends({start} - {end}).csv")
    else:
        t = pd.read_csv(f"./Data/{stock}_trends({start} - {end}).csv")
    if not os.path.exists(f"./Data/{stock}_Prices({start} - {end}).csv"):
        if stock in decentralized_currencies:
            p = get_stock_data(f'{stock}-USD', start = start, end = end)
        else:
            p = get_stock_data(stock, start = start, end = end)
        p.to_csv(f"./Data/{stock}_Prices({start} - {end}).csv")
    else:
        p = pd.read_csv(f"./Data/{stock}_Prices({start} - {end}).csv")

    t['Date'] = pd.to_datetime(t['Date'])
    p['Date'] = pd.to_datetime(p['Date'])

    full_data = pd.merge(p, t, on='Date')

    full_data['log_returns'] = np.log(full_data.Close / full_data.Close.shift(1))
    full_data['Volatility'] = full_data['log_returns'].rolling(window=days).std() * np.sqrt(days)

    for i in range(1, 8):
        full_data[f'Delay_{i}'] = full_data['Trend'].shift(i)

    rho = full_data.corr()
    rho_c = rho['Close'][f'Delay_{delay}']
    return rho_c, full_data

## plot_stock_data
gets the data of the trends and the prices of the stock given to it in a certain timeframe and plots its close and its delayed trend

In [6]:
def plot_stock_data(stock, days = 60, start = '2023-10-01', end = '2024-06-01', delay = 7, download = False):
    if not os.path.exists(f"./Data/{stock}_trends({start} - {end}).csv"):
        if stock in general_stocks:
            name = general_stocks_names[general_stocks.index(stock)]
        elif stock in tech_stocks:
            name = tech_stocks_names[tech_stocks.index(stock)]
        elif stock in finance_stocks:
            name = finance_stocks_names[finance_stocks.index(stock)]
        else:
            name = decentralized_currencies_names[decentralized_currencies.index(stock)]
        t = get_trends_data([name], timeframe = f"{start} {end}")
        t.to_csv(f"./Data/{stock}_trends({start} - {end}).csv")
    else:
        t = pd.read_csv(f"./Data/{stock}_trends({start} - {end}).csv")
    if not os.path.exists(f"./Data/{stock}_Prices({start} - {end}).csv"):
        p = get_stock_data(stock, start = start, end = end)
        p.to_csv(f"./Data/{stock}_Prices({start} - {end}).csv")
    else:
        p = pd.read_csv(f"./Data/{stock}_Prices({start} - {end}).csv")

    t['Date'] = pd.to_datetime(t['Date'])
    p['Date'] = pd.to_datetime(p['Date'])

    full_data = pd.merge(p, t, on='Date')

    full_data['log_returns'] = np.log(full_data.Close / full_data.Close.shift(1))
    full_data['Volatility'] = full_data['log_returns'].rolling(window=days).std() * np.sqrt(days)

    full_data[f'Delay_{delay}'] = full_data.Trend.shift(7)

    # Determine the color based on the stock category
    if stock in general_stocks:
        color = color_map['general']
        name = general_stocks_names[general_stocks.index(stock)]
    elif stock in tech_stocks:
        color = color_map['tech']
        name = tech_stocks_names[tech_stocks.index(stock)]
    elif stock in finance_stocks:
        color = color_map['finance']
        name = finance_stocks_names[finance_stocks.index(stock)]
    else:
        color = color_map['crypto']
        name = decentralized_currencies_names[decentralized_currencies.index(stock)]

    # Create subplots
    fig, axes = plt.subplots(1, 2, figsize=(16, 7))

    # Plot Close price
    axes[0].plot(full_data['Date'], full_data['Close'], label = 'Close Price', color = color)
    axes[0].set_xlabel('Date')
    axes[0].set_ylabel('Close Price')
    axes[0].set_title(f'{stock}: Close Price')
    legend = axes[0].legend(loc='upper left')
    legend.get_frame().set_alpha(0.3)

    # Plot 7-days delay trend
    axes[1].plot(full_data['Date'], full_data[f'Delay_{delay}'], label = f'{delay}-Days Delayed Trend', color = 'black')
    axes[1].set_xlabel('Date')
    axes[1].set_ylabel(f'{delay}-Days Delayed Trend')
    axes[1].set_title(f'{stock}: {delay}-Days Delayed Trend')
    legend = axes[1].legend(loc='upper right')
    legend.get_frame().set_alpha(0.3)

    fig.suptitle(f'{name} ({stock})', fontsize=20, verticalalignment = 'bottom', fontweight = 'bold')
    plt.tight_layout(pad=2.0)
    plt.subplots_adjust(top=0.95)
    if download:
        plt.savefig(f"./Plots/{stock}_plot({start} - {end}).png", bbox_inches='tight')
    plt.show()

## time_jump
adds time in days to a given date that was accepted as string, returns as string

In [7]:
# function to find the date in string format after a certain number of days
def time_jump(start, days = 7 * 38):
    return (datetime.datetime.strptime(start, '%Y-%m-%d') + datetime.timedelta(days = days)).strftime('%Y-%m-%d')

## get_breakpoints
calculates the breakpoints needed for the multiple requests of data to make the data as long as possible for us

In [8]:
def get_breakpoints(start, end, days = 7 * 38):
    breakpoints = [start]
    while datetime.datetime.strptime(breakpoints[-1], '%Y-%m-%d') < datetime.datetime.strptime(end, '%Y-%m-%d'):
        temp = time_jump(breakpoints[-1], days)
        if datetime.datetime.strptime(temp, '%Y-%m-%d') < datetime.datetime.strptime(end, '%Y-%m-%d'):
            breakpoints.append(temp)
        else:
            breakpoints.append(end)
    return breakpoints

## connectNnormalizeTrends
gets the data of the trends and prices daily in the whole time frame and in parts, and connects them by estimating an eproximation of their absolute amount of searches

In [9]:
def connectNnormalizeTrends(Dfs, stock):
    # setting up the data for step 1
    if not os.path.exists(f'./Data/glimpse_{stock}_5Y.csv'):
        raise Exception(f'Failed to retrieve Glimpse Data of {stock}.')
    glmpsDf = pd.read_csv(f'./Data/glimpse_{stock}_5Y.csv')
    glmpsDf.rename(columns={'Time (week of)': 'Date', 'Absolute Google Search Volume': 'Absolute_Volume'}, inplace=True)
    glmpsDf['Date'] = pd.to_datetime(glmpsDf['Date'])

    df_concat = pd.concat(Dfs).reset_index(drop = True)
    df_concat['Date'] = pd.to_datetime(df_concat['Date'])

    # calculating the mean trend for each week and merging it into glmpsDf
    df_concat['MeanTrend'] = df_concat['Trend'].rolling(window=7, min_periods=1).mean().shift(-6)
    glmpsDf = pd.merge(glmpsDf, df_concat, on='Date', how='left').drop(columns = ['Trend'])

    # setting up the data for step 2
    glmpsDf.rename(columns={'Date': 'Date_Week'}, inplace=True)
    df_concat = df_concat.drop(columns = ['MeanTrend'])
    df_concat['Date_Week'] = (df_concat['Date'] - pd.to_timedelta((df_concat['Date'].dt.weekday + 1) % 7, unit='d')).dt.strftime('%Y-%m-%d')
    df_concat['Date_Week'] = pd.to_datetime(df_concat['Date_Week'])

    # calculating the ratio and search volume for each week
    df_concat = pd.merge(df_concat, glmpsDf[['Date_Week', 'MeanTrend', 'Absolute_Volume']], on='Date_Week', how='left')
    df_concat['Ratio'] = df_concat['Trend'] / (df_concat['MeanTrend'] * 7)
    df_concat['Search_Volume'] = df_concat['Ratio'] * df_concat['Absolute_Volume']

    # adding to df_concat the check ratio to check validity of the data
    df_concat['check_ratio'] = df_concat['Search_Volume'] / df_concat['Trend']

    # renormalizing the data
    df_concat['Normalized_Searches'] = ((df_concat['Search_Volume'] - df_concat['Search_Volume'].min()) / (df_concat['Search_Volume'].max() - df_concat['Search_Volume'].min())) * 100

    # cleaning out unnecessary columns
    df_concat = df_concat.drop(columns = ['Trend', 'Date_Week', 'MeanTrend', 'Absolute_Volume', 'Ratio'])

    df_concat['Date'] = df_concat['Date'].dt.strftime('%Y-%m-%d')

    return df_concat

## getNormalizedData
gets the data normalized, and performs validity checks (optional)

In [10]:
def getNormalizedData(stock, ticker, start = '2019-06-23', end = '2024-06-01', weeks = 38, do_double = True, verbose = False):
    if weeks > 38:
        wrn.warn('The maximum number of weeks is 38. \nThe number of weeks will be set to 38.', category=Warning)
        weeks = 38
    if do_double & (weeks % 2 != 0):
        wrn.warn('The number of weeks must be even to use the double method. \nThe number of weeks will be rounded down to the nearest even number.', category=Warning)
        weeks -= 1
    breakpoints = get_breakpoints(start, end, days = 7 * weeks)
    if do_double:
        breakpoints2 = get_breakpoints(start, end, days = 7 * weeks / 2)
        breakpoints2 = [x for x in breakpoints2 if x not in breakpoints[1:-1]]
    
    # initializing the list to store the dataframes
    Dfs = []

    # extracting the data for each time period
    for i in range(len(breakpoints) - 1):
        startTemp = breakpoints[i]
        endTemp = time_jump(breakpoints[i + 1], days=-1)
        # checking if the data is already extracted
        if not os.path.exists(f"./Data/{stock}_trends({startTemp} - {endTemp}).csv"):
            # extracting the data
            t = get_trends_data([stock], timeframe = f"{startTemp} {endTemp}", verbose = verbose).drop(columns = ['isPartial'])
            if t is None:
                raise Exception(f'Failed to retrieve Trend Data of {stock}.')
            t.to_csv(f"./Data/{stock}_trends({startTemp} - {endTemp}).csv")
        else:
            t = pd.read_csv(f"./Data/{stock}_trends({startTemp} - {endTemp}).csv").drop(columns = ['Unnamed: 0'])

        Dfs.append(t)

    df_concat = connectNnormalizeTrends(Dfs, stock)

    if do_double:
        # initializing the list to store the dataframes
        Dfs2 = []

        # extracting the data for each time period
        for i in range(len(breakpoints2) - 1):
            startTemp = breakpoints2[i]
            endTemp = time_jump(breakpoints2[i + 1], days=-1)
            # checking if the data is already extracted
            if not os.path.exists(f"./Data/{stock}_trends({startTemp} - {endTemp}).csv"):
                # extracting the data
                t = get_trends_data([stock], timeframe = f"{startTemp} {endTemp}", verbose = verbose).drop(columns = ['isPartial'])
                if t is None:
                    raise Exception(f'Failed to retrieve Trend Data of {stock}.')
                t.to_csv(f"./Data/{stock}_trends({startTemp} - {endTemp}).csv")
            else:
                t = pd.read_csv(f"./Data/{stock}_trends({startTemp} - {endTemp}).csv").drop(columns = ['Unnamed: 0'])

            Dfs2.append(t)
        
        df_concat2 = connectNnormalizeTrends(Dfs2, stock)
    
        df_concat["Normalized_Searches"] = (df_concat["Normalized_Searches"] + df_concat2["Normalized_Searches"]) / 2
    
    if not os.path.exists(f"./Data/{stock}_Prices({start}-{end}).csv"):
        stockData = get_stock_data(f'{ticker}-USD', start = start, end = end, verbose = verbose)
        stockData.to_csv(f"./Data/{stock}_Prices({start}-{end}).csv")
    else:
        stockData = pd.read_csv(f"./Data/{stock}_Prices({start}-{end}).csv").drop(columns = ['Unnamed: 0'])

    df_concat[df_concat.Normalized_Searches == 0] = 0.1

    df_concat['log_searches'] = np.log(df_concat.Normalized_Searches / df_concat.Normalized_Searches.shift(1))
    stockData['log_returns'] = np.log(stockData.Close / stockData.Close.shift(1))

    try:
        df_concat['Date'] = df_concat['Date'].dt.strftime('%Y-%m-%d')
    except:
        pass
    try:
        stockData['Date'] = stockData['Date'].dt.strftime('%Y-%m-%d')
    except:
        pass

    finalDf = pd.merge(stockData, df_concat, on='Date', how='left').dropna()

    if verbose:
        # data normalization validity check
        top = 0
        bottom = 0
        for i in range(len(Dfs)):
            top += Dfs[i].shape[0]
            print(f'period {i + 1}:\n=========\nmean: {df_concat.iloc[bottom:top]['check_ratio'].mean():.4f}\nsd: {df_concat.iloc[bottom:top]["check_ratio"].std():.4f}\n\nmean/sd: {df_concat.iloc[bottom:top]['check_ratio'].mean()/df_concat.iloc[bottom:top]["check_ratio"].std():.4f}\n')
            bottom += Dfs[i].shape[0]

        if do_double:
            top = 0
            bottom = 0
            for i in range(len(Dfs2)):
                top += Dfs2[i].shape[0]
                print(f'period {i + len(Dfs) + 1}:\n=========\nmean: {df_concat.iloc[bottom:top]['check_ratio'].mean():.4f}\nsd: {df_concat.iloc[bottom:top]["check_ratio"].std():.4f}\n\nmean/sd: {df_concat.iloc[bottom:top]['check_ratio'].mean()/df_concat.iloc[bottom:top]["check_ratio"].std():.4f}\n')
                bottom += Dfs2[i].shape[0]
    return finalDf

# Backtesting
## Collecting The Data

In [23]:
wrn.filterwarnings('ignore', category=UserWarning)
wrn.filterwarnings('ignore', category=FutureWarning)

coinsDFs = []

for i in range(len(decentralized_currencies)):
    coinsDFs.append(getNormalizedData(decentralized_currencies_names[i], decentralized_currencies[i], 
                                      start = start, end = end, 
                                      do_double = True))

## Granger Causality Tests

In [24]:
wrn.filterwarnings('ignore', category=UserWarning)
wrn.filterwarnings('ignore', category=FutureWarning)

lags = []

for i in range(len(decentralized_currencies)):
    cause = grangercausalitytests(coinsDFs[i][['log_returns', 'log_searches']], 
                                  maxlag = max_lags, 
                                  verbose = False)

    min_p_value = float('inf')
    min_p_lag = None

    for lag, result in cause.items():
        p_value = result[0]['ssr_ftest'][1]
        if p_value < min_p_value:
            min_p_value = p_value
            min_p_lag = lag

    lags.append(min_p_lag)

    coinsDFs[i]['Normalized_Searches_delayed'] = coinsDFs[i]['Normalized_Searches'].shift(min_p_lag)
    coinsDFs[i]['log_searches_delayed'] = coinsDFs[i]['log_searches'].shift(min_p_lag)
    coinsDFs[i].dropna(inplace = True)

    print(f'{decentralized_currencies_names[i]}: {min_p_lag} lags, p-value = {min_p_value:.4f}')
        

Bitcoin: 7 lags, p-value = 0.1163
Ethereum: 1 lags, p-value = 0.1807
Cardano: 2 lags, p-value = 0.0776
Solana: 3 lags, p-value = 0.0782
Ripple: 1 lags, p-value = 0.5286
Monero: 1 lags, p-value = 0.1025
Litecoin: 2 lags, p-value = 0.2933
Polkadot: 3 lags, p-value = 0.0056
Chainlink: 1 lags, p-value = 0.0093
Tezos: 3 lags, p-value = 0.4993
Dogecoin: 5 lags, p-value = 0.0005
Shiba Inu: 2 lags, p-value = 0.0000


## Examples for checks code

In [None]:
# see data validity checks of a certain stock
stock_name = 'Bitcoin' # change to any stock name that is in the list at the top
stock_ticker = 'BTC' # change to the corresponding ticker of the stock

getNormalizedData(stock_name, stock_ticker, 
                  start = start, end = end, 
                  do_double = True, verbose = True)

# see the granger causality test results of a certain stock
df = getNormalizedData(stock_name, stock_ticker, start = start, end = end, do_double = True)

grangercausalitytests(df[['log_returns', 'log_searches']], 
                      maxlag = max_lags, 
                      verbose = True)

# Backtesting
## Data Split
* 4 years train
* 1 year test

## Steps
1. get the data normalized
2. perform granger causality test on the train data (first 1461 samples) to determine the appropriate lag
3. calculate the 

### hyper-parameters:
* bb_window: size of the window of MA of the normalized searches and the bollinger bands on that MA
* bb_window_min: minimum size of the window of MA of the normalized searches and the bollinger bands on that MA
* bb_threshold: how many sds should be added to/subtracted from the bollinger bands
* rsi_window: size of the window of the RSI on the Close
* rsi_window_min: minimum size of the window of RSI on the Close
* rsi_limits: how sensitive the rsi is, 
    - the higher the lower value (the first value of rsi_limits) the less the RSI blocks a BUY signal
    - the lower the higher value (the second value of rsi_limits) the less the RSI blocks a SELL signal
* sell_all: a boolian value that indicates whether the strategy says to sell all of the quantity of the asset when it gives a SELL signal or the strategy says to sell some relative amount of the quantity when it gives a SELL signal
* qty_scale: represents the scale of the quantity that is calculated by the strategy

### additional parameters:
* starting_balance: the starting balance
* commission: the commission for every market dealing
* commission_type: whether the commission is a set amount or some precentage from the price
* slippage_factor: how much slippage exists in the market 

In [29]:
# Enums
class ActionType(Enum):
    BUY = 1
    SELL = -1
    DONOTHING = 0

# strategy class
class Strategy():
    def __init__(self, bb_window: int, rsi_window: int, 
                 bb_window_min: int=None, bb_threshold: float=0.5, rsi_window_min: int=None, rsi_limits: Tuple[float,float]=[30.0, 70.0], sell_all: bool=False, qty_scale: float=0.1,
                 commission_type: object="scalar", slippage_factor=np.inf) -> None:
        self.bb_window = bb_window
        self.rsi_window = rsi_window
        if bb_window_min is None:
            bb_window_min = bb_window
        else:
            if bb_window_min > bb_window:
                raise ValueError("bb_window_min should be less than or equal to bb_window")
            if bb_window_min <= 0:
                raise ValueError("bb_window_min should be a value greater than 0")
            self.bb_window_min = bb_window_min
        if bb_threshold <= 0:
            raise ValueError("bb_threshold should be a value greater than 0")
        self.bb_threshold = bb_threshold
        if rsi_window_min is None:
            rsi_window_min = rsi_window
        else:
            if rsi_window_min > rsi_window:
                raise ValueError("rsi_window_min should be less than or equal to rsi_window")
            if rsi_window_min <= 0:
                raise ValueError("rsi_window_min should be a value greater than 0")
            self.rsi_window_min = rsi_window_min
        if rsi_limits[0] <= 0 or rsi_limits[1] > 100 or rsi_limits[0] >= rsi_limits[1]:
            raise ValueError("rsi_limit should be a tuple of two values between 0 and 100 where the first value is less than the second")
        self.rsi_limits = rsi_limits
        if commission_type not in ["scalar", "precentage"]:
            raise ValueError("commision_type should be either 'scalar' or 'precentage'")
        self.commission_type = commission_type
        self.sell_all = sell_all
        self.slippage_factor = slippage_factor
        if qty_scale <= 0:
            raise ValueError("qty_scale should be a value greater than 0")
        self.qty_scale = qty_scale

    def calc_signal(self, data: pd.DataFrame) -> None:
        norm_search = data['Normalized_Searches_delayed']
        close = data['Close']

        # MA of search volume as threshold for Bollinger Bands
        norm_search_MA = norm_search.rolling(window=self.bb_window, min_periods=self.bb_window_min).mean()
        norm_search_sd = norm_search.rolling(window=self.bb_window, min_periods=self.bb_window_min).std()
        upper_band = norm_search_MA + self.bb_threshold * norm_search_sd
        lower_band = norm_search_MA - self.bb_threshold * norm_search_sd

        data['norm_search_MA_upper_band'] = upper_band
        data['norm_search_MA_lower_band'] = lower_band

        # RSI of close price as threshold limiting the Bollinger Bands signal
        delta = close.diff()
        gains = delta.where(delta > 0, 0)
        losses = -delta.where(delta < 0, 0)
        avg_gain = gains.rolling(window=self.rsi_window, min_periods=self.rsi_window_min).mean()
        avg_loss = losses.rolling(window=self.rsi_window, min_periods=self.rsi_window_min).mean()
        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))

        data['strategy'] = 0
        data.loc[(norm_search < lower_band) & (rsi < self.rsi_limits[0]), 'strategy'] = -1 # Sell signal
        data.loc[(norm_search > upper_band) & (rsi > self.rsi_limits[1]), 'strategy'] = 1 # Buy signal

    def calc_realistic_price(self, row: pd.Series, action: ActionType) -> float:
        slippage_rate = ((row['Close'] - row['Open']) / row['Open']) / self.slippage_factor
        slippage_price = row['Open'] + row['Open'] * slippage_rate

        if action == ActionType.BUY:
            return max(slippage_price, row['Open'])
        else:
            return min(slippage_price, row['Open'])

    def backtest(self, data: pd.DataFrame, starting_balance: float, commission: float=0.0) -> pd.DataFrame:
        data['qty'] = 0.0
        data['balance'] = 0.0

        self.calc_signal(data)

        data.reset_index(inplace=True)

        for i, row in data.iterrows():
            # Get the current balance and qty before the action
            curr_qty = data.loc[i - 1, 'qty'] if i > 0 else 0.0
            curr_balance = data.loc[i - 1, 'balance'] if i > 0 else starting_balance

            # Buy signal when strategy says so as long not in the end of trade data and not holding any stock
            if (i != data.shape[0] - 1) & (data.loc[i, 'strategy'] == 1):
                buy_price = self.calc_realistic_price(row, ActionType.BUY)
                buy_qty = (row['Normalized_Searches_delayed'] / row['norm_search_MA_upper_band'] - 1) * self.qty_scale
                data.loc[i, 'balance'] = curr_balance - (buy_price * buy_qty + commission if self.commission_type == "scalar" else buy_price * buy_qty * (1 + commission))
                data.loc[i, 'qty'] = curr_qty + buy_qty
                holding = True
            
            # Sell signal when strategy says so or at the end of trade and holding any stock
            elif ((data.loc[i, 'strategy'] == -1) | (i == data.shape[0] - 1)) & (curr_qty > 0):
                sell_price = self.calc_realistic_price(row, ActionType.SELL)
                sell_qty = curr_qty if (self.sell_all | (i == data.shape[0] - 1)) else min((row['norm_search_MA_lower_band'] / row['Normalized_Searches_delayed'] - 1) * self.qty_scale, curr_qty)
                data.loc[i, 'balance'] = curr_balance + (sell_price * sell_qty - commission if self.commission_type == "scalar" else sell_price * sell_qty * (1 - commission))
                data.loc[i, 'qty'] = 0.0 if (self.sell_all | (i == data.shape[0] - 1)) else curr_qty - sell_qty
                holding = False
            
            # Do nothing
            else:
                data.loc[i, 'balance'] = curr_balance
                data.loc[i, 'qty'] = curr_qty
            
        data['portfolio_value'] = data['balance'] + data['Close'] * data['qty']

        return data

def calc_total_return(portfolio_values):
    return (portfolio_values.iloc[-1] / portfolio_values.iloc[0]) - 1.0

def calc_annualized_return(portfolio_values):
    yearly_trading_days = 252
    portfolio_trading_days = portfolio_values.shape[0]
    portfolio_trading_years = portfolio_trading_days / yearly_trading_days 
    return (portfolio_values.iloc[-1] / portfolio_values.iloc[0])**(1/portfolio_trading_years) - 1.0

def calc_annualized_sharpe(portfolio_values: pd.Series, rf: float=0.0):
    yearly_trading_days = 252
    annualized_return = calc_annualized_return(portfolio_values)
    annualized_std = portfolio_values.pct_change().std() * np.sqrt(yearly_trading_days)
    if annualized_std is None or annualized_std == 0:
        return 0
    sharpe = (annualized_return - rf) / annualized_std
    return sharpe

def calc_downside_deviation(portfolio_values):
    porfolio_returns = portfolio_values.pct_change().dropna()
    return porfolio_returns[porfolio_returns < 0].std()

def calc_sortino(portfolio_values, rf=0.0):
    yearly_trading_days = 252
    down_deviation = calc_downside_deviation(portfolio_values) * np.sqrt(yearly_trading_days)
    annualized_return = calc_annualized_return(portfolio_values)
    if down_deviation is None or down_deviation == 0:
        return 0
    sortino = (annualized_return - rf) / down_deviation
    return sortino

def calc_max_drawdown(portfolio_values):
    cumulative_max = portfolio_values.cummax()
    drawdown = (cumulative_max - portfolio_values) / cumulative_max
    return drawdown.max()

def calc_calmar(portfolio_values):
    max_drawdown = calc_max_drawdown(portfolio_values)
    annualized_return = calc_annualized_return(portfolio_values)
    return annualized_return / max_drawdown

def evaluate_strategy(b_df, strat_name):
    total_return = calc_total_return(b_df['portfolio_value'])
    annualized_return = calc_annualized_return(b_df['portfolio_value'])
    annualized_sharpe = calc_annualized_sharpe(b_df['portfolio_value'])
    sortino_ratio = calc_sortino(b_df['portfolio_value'])
    max_drawdown = calc_max_drawdown(b_df['portfolio_value'])
    calmar_ratio = calc_calmar(b_df['portfolio_value'])
    
    print(f"Results for {strat_name}:")
    print(f"Total Return: {total_return:.2%}")
    print(f"Annualized Return: {annualized_return:.2%}")
    print(f"Annualized Sharpe Ratio: {annualized_sharpe:.2f}")
    print(f"Sortino Ratio: {sortino_ratio:.2f}")
    print(f"Max Drawdown: {max_drawdown:.2%}")
    print(f"Calmar Ratio: {calmar_ratio:.2f}")

In [30]:
example_strategy = Strategy(bb_window=14, rsi_window=14, bb_window_min=1, bb_threshold=0.5, rsi_window_min=1, rsi_limits=[30.0, 70.0], sell_all=False, qty_scale=0.1, commission_type="scalar", slippage_factor=1.0)
tempDF = example_strategy.backtest(coinsDFs[0].copy(), balance)

In [31]:
tempDF

Unnamed: 0,index,Date,Open,High,Low,Close,Adj Close,Volume,log_returns,Search_Volume,...,Normalized_Searches,log_searches,Normalized_Searches_delayed,log_searches_delayed,qty,balance,norm_search_MA_upper_band,norm_search_MA_lower_band,strategy,portfolio_value
0,8,2019-07-08,11446.596680,12345.833008,11393.374023,12285.958008,12285.958008,23482551458,0.070393,284665.003145,...,6.298411,0.440920,10.119808,0.218669,0.000000,10000.000000,,,0,10000.000000
1,9,2019-07-09,12284.326172,12779.131836,12233.261719,12573.812500,12573.812500,28167921523,0.023159,324385.701258,...,7.963752,0.234603,10.200057,0.007899,0.000115,9998.549613,10.188305,10.131560,1,10000.000000
2,10,2019-07-10,12571.537109,13129.529297,11710.978516,12156.512695,12156.512695,33627574244,-0.033751,364106.399371,...,9.553251,0.181981,8.972541,-0.128224,0.000115,9998.549613,10.107493,9.420778,0,9999.951865
3,11,2019-07-11,12139.713867,12144.623047,11158.922852,11358.662109,11358.662109,28595327690,-0.067884,337625.933962,...,8.392463,-0.129547,7.745025,-0.147118,0.000000,9999.859833,9.836763,8.681953,-1,9999.859833
4,12,2019-07-12,11354.299805,11905.487305,11179.144531,11815.986328,11815.986328,23534692797,0.039473,304525.352201,...,7.093161,-0.168203,6.984886,-0.103302,0.000000,9999.859833,9.517702,8.091225,0,9999.859833
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1814,1822,2024-06-25,60266.281250,62258.261719,60239.750000,61804.640625,61804.640625,29201215431,0.025021,336946.076087,...,8.348660,-0.110554,7.453743,0.055626,3.542072,-98191.512122,7.908932,6.313679,0,120724.985091
1815,1823,2024-06-26,61789.675781,62434.136719,60695.187500,60811.277344,60811.277344,22506003064,-0.016203,293469.163043,...,6.570504,-0.239511,6.647134,-0.114531,3.542072,-98191.512122,7.568608,6.188879,0,117206.420663
1816,1824,2024-06-27,60811.226562,62293.863281,60585.332031,61604.800781,61604.800781,21231745045,0.012965,271730.706522,...,5.768274,-0.130218,6.243830,-0.062592,3.542072,-98191.512122,7.260522,6.061894,0,120017.137943
1817,1825,2024-06-28,61612.804688,62126.097656,59985.402344,60320.136719,60320.136719,24952866877,-0.021074,271730.706522,...,5.768274,0.000000,6.647134,0.062592,3.542072,-98191.512122,6.901861,6.016571,0,115466.765127


# Previously used chunks

In [4]:
# Calculate correlations for each category
volt_del_corr_general = [trend_corr(stock)[0] for stock in general_stocks]
volt_del_corr_tech = [trend_corr(stock)[0] for stock in tech_stocks]
volt_del_corr_finance = [trend_corr(stock)[0] for stock in finance_stocks]
volt_del_corr_crypto = [trend_corr(crypto)[0] for crypto in decentralized_currencies]

# Combine the results
volt_del_corr = volt_del_corr_general + volt_del_corr_tech + volt_del_corr_finance + volt_del_corr_crypto

# Create labels for the scatter plot
labels = general_stocks + tech_stocks + finance_stocks + decentralized_currencies

Trend Data for ['Shiba Inu'] retrieved successfully.
Stock Data for SHIB-USD retrieved successfully.


In [None]:
# Plot the scatter plot
plt.figure(figsize=(16, 8))
plt.scatter(range(len(general_stocks)), volt_del_corr_general, color = color_map['general'], label = 'General Stocks')
for i in range(len(general_stocks)):
    plt.axvline(x = i, color = color_map['general'], linestyle = ':', alpha = 0.3)  # Add vertical lines to separate the stocks
plt.scatter(range(len(general_stocks), len(general_stocks) + len(tech_stocks)), volt_del_corr_tech, color = color_map['tech'], label = 'Tech Stocks')
for i in range(len(general_stocks), len(general_stocks) + len(tech_stocks)):
    plt.axvline(x = i, color = color_map['tech'], linestyle = ':', alpha = 0.3)  # Add vertical lines to separate the stocks
plt.scatter(range(len(general_stocks) + len(tech_stocks), len(general_stocks) + len(tech_stocks) + len(finance_stocks)), volt_del_corr_finance, color = color_map['finance'], label = 'Finance Stocks')
for i in range(len(general_stocks) + len(tech_stocks), len(general_stocks) + len(tech_stocks) + len(finance_stocks)):
    plt.axvline(x = i, color = color_map['finance'], linestyle = ':', alpha = 0.3)  # Add vertical lines to separate the stocks
plt.scatter(range(len(general_stocks) + len(tech_stocks) + len(finance_stocks), len(general_stocks) + len(tech_stocks) + len(finance_stocks) + len(decentralized_currencies)), volt_del_corr_crypto, color = color_map['crypto'], label = 'Decentralized Currencies')
for i in range(len(general_stocks) + len(tech_stocks) + len(finance_stocks), len(general_stocks) + len(tech_stocks) + len(finance_stocks) + len(decentralized_currencies)):
    plt.axvline(x = i, color = color_map['crypto'], linestyle = ':', alpha = 0.3)  # Add vertical lines to separate the stocks
plt.axhline(y = 0, color = 'black', linestyle = '--')  # Add a horizontal line at y = 0
plt.xlabel('Assets')
plt.ylabel('Correlation with 7-Day Delayed Trend')
plt.title('Correlation of Close Price and 7-Day Delayed Trend')
legend = plt.legend()
legend.get_frame().set_alpha(0.3)
plt.xticks(range(len(labels)), labels, rotation = 60)
plt.tight_layout(pad = 2)
plt.savefig('Correlation_Scatter_Plot.png')
plt.show()

# Print the correlation values and their mean
print(volt_del_corr, np.mean(volt_del_corr))

In [None]:
# Plot for each stock and decentralized currency
for stock in general_stocks + tech_stocks + finance_stocks + decentralized_currencies:
    plot_stock_data(stock, download = True)