The True Strength Index (TSI) is a momentum oscillator that is primarily used by traders to determine whether a market is an upward or a downward momentum and trade along with it.

In [43]:
from datetime import date, datetime
from typing import Any, Dict, Iterator, List, Optional, Union

import pandas as pd
import pytz
from polygon.rest import RESTClient
from polygon.rest.models import (
    Agg,
    DailyOpenCloseAgg,
    GroupedDailyAgg,
    PreviousCloseAgg,
    Sort,
)
from polygon.rest.models.request import RequestOptionBuilder
from urllib3 import HTTPResponse


def get_client():
    return RESTClient("ocunxnOqC0pnltRqT3VkOiKeCmPE49L7")


# TODO: Account for market holidays and half-days
# TODO: Determine timezone from stock/equity ticker
def within_trading_hours(
    timestamp: Union[str, int, datetime, date], timezone: str
) -> bool:
    """
    Determine whether a given instant is within trading hours for a particular exchange.

    :param timestamp: The timestamp (in milliseconds) to be checked.
    :param timezone: The timezone to check trading hours in, as a IANA Time Zone Database Name.
    :return: True if the timestamp is within trading hours, False otherwise.
    """
    dt = datetime.fromtimestamp(timestamp / 1000, tz=pytz.timezone(timezone))
    if dt.weekday() < 5:
        opening_time = dt.replace(hour=9, minute=30, second=0, microsecond=0)
        closing_time = dt.replace(hour=16, minute=0, second=0, microsecond=0)
        return opening_time <= dt <= closing_time
    else:
        return False


# TODO: Determine timezone from stock/equity ticker
def parse_timestamp(timestamp: Union[str, int, datetime, date], timezone: str) -> str:
    """
    Parse a timestamp to a readable format for easy comparisons.

    :param timestamp: The timestamp (in milliseconds) to be formatted.
    :param timezone: The timezone to check trading hours in, as a IANA Time Zone Database Name.
    :return: Date-time representation in the following format: %Y-%m-%d %H:%M:%S %Z.
    """
    dt = datetime.fromtimestamp(timestamp / 1000, tz=pytz.timezone(timezone))
    return dt.strftime("%Y-%m-%d %H:%M:%S %Z")


def agg_to_dict(agg: Agg) -> dict[str, Union[None, float, int, bool]]:
    """
    Convert an aggregate object to a dictionary, and add any extra data that may be useful.

    :param agg: The aggregate object to parse.
    :return: Dictionary with keys corresponding to attributes of the object.
    """
    return {
        "open": agg.open,
        "high": agg.high,
        "low": agg.low,
        "close": agg.close,
        "volume": agg.volume,
        "vwap": agg.vwap,
        "timestamp": agg.timestamp,
        "datetime": parse_timestamp(agg.timestamp, "America/New_York"),
        "transactions": agg.transactions,
        "otc": agg.otc,
    }


def list_aggs(
    client: RESTClient,
    ticker: str,
    multiplier: int,
    timespan: str,
    # "from" is a keyword in python https://www.w3schools.com/python/python_ref_keywords.asp
    from_: Union[str, int, datetime, date],
    to: Union[str, int, datetime, date],
    include_extended_hours: bool = True,
    adjusted: Optional[bool] = None,
    sort: Optional[Union[str, Sort]] = None,
    limit: Optional[int] = None,
    params: Optional[Dict[str, Any]] = None,
    raw: bool = False,
    options: Optional[RequestOptionBuilder] = None,
) -> pd.DataFrame:
    """
    List aggregate bars for a ticker over a given date range in custom time window sizes.

    :param client: The RESTClient object to perform the request with.
    :param ticker: The ticker symbol.
    :param multiplier: The size of the timespan multiplier.
    :param timespan: The size of the time window.
    :param from_: The start of the aggregate time window as YYYY-MM-DD, a date, Unix MS Timestamp, or a datetime.
    :param to: The end of the aggregate time window as YYYY-MM-DD, a date, Unix MS Timestamp, or a datetime.
    :param include_extended_hours: True if pre-market and after-hours trading data are to be included, False otherwise.
    :param adjusted: Whether or not the results are adjusted for splits. By default, results are adjusted. Set this to false to get results that are NOT adjusted for splits.
    :param sort: Sort the results by timestamp. asc will return results in ascending order (oldest at the top), desc will return results in descending order (newest at the top).The end of the aggregate time window.
    :param limit: Limits the number of base aggregates queried to create the aggregate results. Max 50000 and Default 5000. Read more about how limit is used to calculate aggregate results in Polygon's on Aggregate Data API Improvements.
    :param params: Any additional query params.
    :param raw: Return raw object instead of results object.
    :return: Pandas DataFrame representation of aggregate objects.
    """
    aggs = []
    for a in client.list_aggs(
        ticker,
        multiplier,
        timespan,
        from_,
        to,
        adjusted=adjusted,
        sort=sort,
        limit=limit,
        params=params,
        raw=raw,
        options=options,
    ):
        if include_extended_hours or within_trading_hours(
            a.timestamp, "America/New_York"
        ):
            aggs.append(agg_to_dict(a))
    return pd.DataFrame(aggs)

# Set the display option to show all columns
pd.set_option('display.width', None)  # Allow display of wide DataFrames

data = list_aggs(
        get_client(), "SPY", 1, "day", "2000-01-01", "2018-12-31", limit=50000
    )

data

Unnamed: 0,open,high,low,close,volume,vwap,timestamp,datetime,transactions,otc
0,102.530,102.8000,101.55,101.96,45417600.0,102.2064,1063166400000,2003-09-10 00:00:00 EDT,25951,
1,102.100,102.7600,101.84,102.26,38038300.0,102.3182,1063252800000,2003-09-11 00:00:00 EDT,22580,
2,101.910,102.6400,101.35,102.45,41159600.0,101.9165,1063339200000,2003-09-12 00:00:00 EDT,26091,
3,102.520,102.6300,101.95,102.09,20710600.0,102.2676,1063598400000,2003-09-15 00:00:00 EDT,15670,
4,102.232,103.6400,102.17,103.58,37103300.0,103.0466,1063684800000,2003-09-16 00:00:00 EDT,19074,
...,...,...,...,...,...,...,...,...,...,...
3849,239.040,240.8355,234.27,234.34,147311588.0,237.2807,1545627600000,2018-12-24 00:00:00 EST,683157,
3850,235.970,246.1800,233.76,246.18,218930378.0,239.5597,1545800400000,2018-12-26 00:00:00 EST,1032190,
3851,242.570,248.2900,238.96,248.07,186492032.0,243.4997,1545886800000,2018-12-27 00:00:00 EST,978525,
3852,249.580,251.4000,246.45,247.75,153456587.0,248.5814,1545973200000,2018-12-28 00:00:00 EST,800102,


In [47]:


import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import numpy as np
from math import floor


def get_tsi(close, long, short, signal):
    diff = close - close.shift(1)
    abs_diff = abs(diff)
    
    diff_smoothed = diff.ewm(span = long, adjust = False).mean()
    diff_double_smoothed = diff_smoothed.ewm(span = short, adjust = False).mean()
    abs_diff_smoothed = abs_diff.ewm(span = long, adjust = False).mean()
    abs_diff_double_smoothed = abs_diff_smoothed.ewm(span = short, adjust = False).mean()
    
    tsi = (diff_double_smoothed / abs_diff_double_smoothed) * 100
    signal = tsi.ewm(span = signal, adjust = False).mean()
    tsi = tsi[tsi.index >= '2020-01-01'].dropna()
    signal = signal[signal.index >= '2020-01-01'].dropna()
    
    return tsi, signal




def implement_tsi_strategy(prices, tsi, signal_line):
    buy_price = []
    sell_price = []
    tsi_signal = []
    signal = 0
    
    for i in range(len(prices)):
        if tsi[i-1] < signal_line[i-1] and tsi[i] > signal_line[i]:
            if signal != 1:
                buy_price.append(prices[i])
                sell_price.append(np.nan)
                signal = 1
                tsi_signal.append(signal)
            else:
                buy_price.append(np.nan)
                sell_price.append(np.nan)
                tsi_signal.append(0)
        elif tsi[i-1] > signal_line[i-1] and tsi[i] < signal_line[i]:
            if signal != -1:
                buy_price.append(np.nan)
                sell_price.append(prices[i])
                signal = -1
                tsi_signal.append(signal)
            else:
                buy_price.append(np.nan)
                sell_price.append(np.nan)
                tsi_signal.append(0)
        else:
            buy_price.append(np.nan)
            sell_price.append(np.nan)
            tsi_signal.append(0)
            
    return buy_price, sell_price, tsi_signal

# STOCK POSITION for TSI or RSI
def get_strategy(tsi_signal, close, tsi, signal_line):
    position = []
    for i in range(len(tsi_signal)):
        if tsi_signal[i] > 1:
            position.append(0)
        else:
            position.append(1)
            
    for i in range(len(close)):
        if tsi_signal[i] == 1:
            position[i] = 1
        elif tsi_signal[i] == -1:
            position[i] = 0
        else:
            position[i] = position[i-1]
            
    close_price = close
    tsi = tsi
    signal_line = signal_line
    tsi_signal = pd.DataFrame(tsi_signal).rename(columns={0:'tsi_signal'}).set_index(close.index)
    position = pd.DataFrame(position).rename(columns={0:'tsi_position'}).set_index(close.index)

    frames = [close_price, tsi, signal_line, tsi_signal, position]
    strategy = pd.concat(frames, join='inner', axis=1)

    return strategy

   
    


class MyStrategy:
    def __init__(self, data):
        self.data = data
        self.tsi_signal = None

    def init(self):
        return None

    def next(self):
        self.data['tsi'], self.data['signal_line'] = get_tsi(self.data['close'], 25, 13, 12)
        self.data = self.data[self.data.index >= '2000-01-01']

        # Implement TSI strategy
        self.data['buy_price'], self.data['sell_price'], self.tsi_signal = implement_tsi_strategy(self.data['close'], self.data['tsi'], self.data['signal_line'])

        # Define your strategy logic here
        # Return True until you want to keep iterating
        self.strategy = get_strategy(self.tsi_signal, self.data['close'], self.data['tsi'], self.data['signal_line'])

        return None  # Adjust this based on your logic

    



class Backtest:
    def __init__(self, data, strategy):
        self.data = data
        self.strategy = strategy

    def run(self):
        # Initialize strategy
        self.strategy.init()

        # Run strategy
        while self.strategy.next():
            pass


        # Time calculation
        start_date = pd.to_datetime(self.data.index[0])
        end_date = pd.to_datetime(self.data.index[-1])
        duration = end_date - start_date
        duration_days = duration.days

        # # Calculate exposure time
        # total_days = len(self.data)
        # exposure_time = total_days  # Placeholder value, adjust as needed

        # # Calculate equity peak
        # equity_peak = self.strategy.equity.max()

        # # Calculate return
        # initial_equity = self.strategy.equity.iloc[0]
        # final_equity = self.strategy.equity.iloc[-1]
        # total_return = ((final_equity - initial_equity) / initial_equity) * 100

        # # Calculate buy & hold return
        # buy_hold_return = ((self.data['close'].iloc[-1] - self.data['close'].iloc[0]) / self.data['close'].iloc[0]) * 100

        # # Calculate daily returns
        # daily_returns = self.strategy.equity.pct_change().dropna()

        # # Calculate annualized return
        # annualized_return = (((1 + daily_returns.mean()) ** 252) - 1) * 100

        # # Calculate annualized volatility
        # annualized_volatility = (daily_returns.std() * np.sqrt(252)) * 100

        # # Calculate max drawdown
        # equity_drawdown = 1 - (self.strategy.equity / self.strategy.equity.cummax())
        # max_drawdown = equity_drawdown.max() * 100

        # # Calculate average drawdown
        # avg_drawdown = equity_drawdown.mean() * 100

        # # Calculate max drawdown duration
        # max_drawdown_duration = (equity_drawdown[equity_drawdown == 0].diff().fillna(0) * -1).max()

        # # Calculate average drawdown duration
        # avg_drawdown_duration = (equity_drawdown[equity_drawdown == 0].diff().fillna(0) * -1).mean()

        # Calculate metrics
        metrics = {
            "Start": self.data.index[0],
            "End": self.data.index[-1],
            "Duration": duration_days,
            # "Exposure Time [%]": exposure_time,
            # "Equity Final [$]": final_equity,
            # "Equity Peak [$]": equity_peak,
            # "Return [%]": total_return,
            # "Buy & Hold Return [%]": buy_hold_return,
            # "Return (Ann.) [%]": annualized_return,
            # "Volatility (Ann.) [%]": annualized_volatility,
            # "Max. Drawdown [%]": max_drawdown,
            # "Avg. Drawdown [%]": avg_drawdown,
            # "Max. Drawdown Duration": max_drawdown_duration,
            # "Avg. Drawdown Duration": avg_drawdown_duration,
            # "_strategy": self.strategy.__class__.__name__,
            # "_equity_curve": self.strategy.equity,
        }

        # Print metrics
        for key, value in metrics.items():
            print(f"{key.ljust(30)} {value}")

        return metrics

# Example usage:
data = list_aggs(
    get_client(), "SPY", 1, "day", "2000-01-01", "2018-12-31", limit=50000
)
df = pd.DataFrame(data)
df.set_index('datetime', inplace=True)
my_strategy = MyStrategy(df)
bt = Backtest(df, my_strategy)
bt.run()



Start                          2003-09-10 00:00:00 EDT
End                            2018-12-31 00:00:00 EST
Duration                       5591


  if tsi[i-1] < signal_line[i-1] and tsi[i] > signal_line[i]:
  elif tsi[i-1] > signal_line[i-1] and tsi[i] < signal_line[i]:
  start_date = pd.to_datetime(self.data.index[0])
  end_date = pd.to_datetime(self.data.index[-1])


{'Start': '2003-09-10 00:00:00 EDT',
 'End': '2018-12-31 00:00:00 EST',
 'Duration': 5591}