In [1]:
import yahooquery as yq
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import itertools
import operator
import kaleido
from datetime import timedelta, datetime

import warnings
warnings.filterwarnings("ignore")

In [29]:
# put images into 'images' folder in the same directory
def ants_indicator(tickers:list, period:str='1y', purpose:str='present', verbose:bool=True, graph:bool=True, **kwargs) -> list:
    buy_list = []  # final result

    main_df = yq.Ticker(tickers, formatted=True, validate=True).history(period=period, interval="1d")
    main_df = main_df.reset_index()
    main_df['date'] = [datetime.strptime(x.strftime('%Y-%m-%d'), ('%Y-%m-%d')) for x in main_df['date']]

    for n, ticker in enumerate(tickers):
        df = main_df[main_df['symbol'] == ticker]
        df.reset_index(drop=True, inplace=True)

        df['momentum'] = df['adjclose'] > df['adjclose'].shift(1)
        df['momentum'] = [df['momentum'][x:15+x].value_counts()[True] if True in df['momentum'][x:15+x].value_counts().keys() 
                        else 0 for x in range(len(df))]

        df['avg_volume'] = df['volume'].rolling(30).mean()

        df['last_15_price'] = df['adjclose'].shift(15)

        # momentum only
        grey_index = set()
        for x in range(len(df) - 15):
            if df['momentum'][x] >= 12:
                grey_index.add(x)

        # momentum and volume
        orange_index = set()
        for x in grey_index:
            if df['volume'][x:15+x].mean() > df['avg_volume'][x] * 1.25:
                orange_index.update(range(x, x+15))

        # momentum and price
        blue_index = set()
        for x in grey_index:
            if df['adjclose'][x:15+x].mean() > df['last_15_price'][x] * 1.2:
                blue_index.update(range(x, x+15))

        green_index = orange_index.intersection(blue_index)

        # add on for momentum only
        temp = grey_index.copy()
        for x in temp:
            grey_index.update(range(x, x+15))

        # make sure indexes are not duplicated
        blue_index -= green_index
        orange_index -= green_index
        grey_index = grey_index - green_index - blue_index - orange_index

        indexes = [grey_index, blue_index, orange_index, green_index]
        new_indexes = []

        for index in indexes:
            consec_count = [0] + list(itertools.accumulate([sum(1 for _ in g) for _, g in 
                                                            itertools.groupby([e-i for i, e in enumerate(sorted(list(index)))])], operator.add))
            temp = sorted(list(index.copy()))
            cnt = 0
            for i in range(1, len(consec_count)):
                if consec_count[i] - consec_count[i-1] < 15:
                    del temp[consec_count[i-1] - cnt : consec_count[i] - cnt]
                    cnt += consec_count[i] - consec_count[i-1]
            new_indexes.append(temp)

        buy_signal = ""
        if purpose == 'present':
            if len(new_indexes[3]) > 0:
                if max(new_indexes[3]) > len(df) - 30:
                    buy_signal = "[BUY]"
                    buy_list.append([ticker,
                                    df['date'][max(new_indexes[3]).strftime('%Y-%m-%d')],
                                    round(df['adjclose'][max(new_indexes[3])], 4)])
              
        elif purpose == 'backtest':  # require at least 2 years of data
            if len(new_indexes[3]) > 0 and len(df) >= 252 * 2:
                if min(new_indexes[3])+15 < len(df) - 252:  # get the oldest green ant trend
                    buy_signal = "[BUY]"
                    # ticker, green ant date, 1yo price, 1yl price, returns, current price
                    buy_list.append([ticker, 
                                    df['date'][min(new_indexes[3])+15].strftime('%Y-%m-%d'),
                                    df['date'][min(new_indexes[3])+15+252].strftime('%Y-%m-%d'),
                                    round(df['adjclose'][min(new_indexes[3])+15], 4), 
                                    round(df['adjclose'][min(new_indexes[3])+15+252], 4),
                                    (round(df['adjclose'][min(new_indexes[3])+15+252], 4) - round(df['adjclose'][min(new_indexes[3])+15], 4)) 
                                    / round(df['adjclose'][min(new_indexes[3])+15], 4),
                                    round(df['adjclose'].iloc[-1], 4)])
    
        ant_dates, ant_price = [[] for _ in range(4)], [[] for _ in range(4)]

        for i in range(4):
            for x in new_indexes[i]:
                ant_dates[i].append(df['date'][x])
                ant_price[i].append(df['high'][x] * 1.03)

        missing_dates = sorted(set(df.date[0] + timedelta(x) for x in range((df.date[len(df.date)-1] - df.date[0]).days)) - set(df.date))
        
        if graph:
            fig = make_subplots(rows=2, cols=1, shared_xaxes=True, 
                    vertical_spacing=0.03, row_width=[0.2, 0.7])
            
            # price candlestick
            fig.add_trace(go.Candlestick(x=df['date'],
                            open=df['open'],
                            high=df['high'],
                            low=df['low'],
                            close=df['close']),
                            row=1, col=1)
            
            # volume
            fig.add_trace(go.Bar(x=df['date'], y=df['volume'], 
                                showlegend=False,
                                marker_color='skyblue',
                                marker_line=dict(width=1,
                                            color='darkslategrey')), 
                                row=2, col=1)

            fig.update_layout(
                xaxis=dict(
                    rangeselector=dict(
                        buttons=list([
                            dict(count=1,
                                label="1m",
                                step="month",
                                stepmode="backward"),
                            dict(count=6,
                                label="6m",
                                step="month",
                                stepmode="backward"),
                            dict(count=1,
                                label="YTD",
                                step="year",
                                stepmode="todate"),
                            dict(count=1,
                                label="1y",
                                step="year",
                                stepmode="backward"),
                            dict(step="all")
                        ])
                    ),
                    rangeslider=dict(
                        visible=True
                    ),
                    type="date"
                ),
                autosize=False,
                width=1500,
                height=750,
                margin=dict(l=20, r=20, t=20, b=20),
                xaxis_rangeslider_visible=False)

            fig.update_xaxes(
                    rangebreaks=[
                        dict(bounds=["sat", "mon"]),
                        dict(values=missing_dates)
                        # dict(bounds=[16, 9.5], pattern="hour"),  # hide hours outside of 9.30am-4pm
                    ]
                )

            color = ['grey', 'navy', 'goldenrod', 'lightgreen']

            for i in range(4):
                fig.add_trace(
                    go.Scatter(
                        x=ant_dates[i],
                        y=ant_price[i],
                        mode="markers",
                        marker=dict(symbol='circle', 
                                    size=6, 
                                    color=color[i],
                                    line=dict(width=1,
                                                color='darkslategrey'))))
                
            fig.write_image(f"images/{buy_signal} {ticker}.png", format='png', engine='kaleido')

        # fig.show()
        if verbose:
            print(f'{n+1}: {ticker}')

        if purpose == 'present':
            df = pd.DataFrame(buy_list, columns=['Ticker', 'Buy_Date', 'Close'])
        elif purpose == 'backtest':
            df = pd.DataFrame(buy_list, columns=['Ticker', 'Buy_Date', 'Sell_Date', 'Buy_Close', 'Sell_Close', 'Returns', 'Close'])
  
    return df

In [3]:
market_ret = yq.Ticker('SPY', formatted=True, validate=True).history(period='5y', interval='1d')['adjclose']
market_ret = (market_ret[-1] - market_ret[0]) / market_ret[0]
market_ret

0.9917615717560463

In [24]:
AAPL = yq.Ticker('AAPL', formatted=True, validate=True).history(period='5y', interval='1d')['adjclose']
AAPL = (AAPL[-1] - AAPL[0]) / AAPL[0]

MSFT = yq.Ticker('MSFT', formatted=True, validate=True).history(period='5y', interval='1d')['adjclose']
MSFT = (MSFT[-1] - MSFT[0]) / MSFT[0]

AAPL, MSFT

(3.245008665455029, 2.888285540486125)

In [30]:
tickers = pd.read_csv('large_cap.csv').Symbol.to_list()
result = ants_indicator(tickers=tickers, period='5y', purpose='backtest', verbose=True, graph=False)
result.to_csv('result.csv')

1: A


KeyError: -1

In [None]:
rets = (np.sum(result.Close) - np.sum(result.Buy_Close)) / np.sum(result.Buy_Close)
stdev = np.std(result.Returns)
sharpe = (rets - market_ret) / stdev

rets, stdev, sharpe

(0.19637775059414558, 1.0895356694050335, -0.7300209102802835)

In [None]:
result

Unnamed: 0,Ticker,Buy_Date,Sell_Date,Buy_Close,Sell_Close,Returns
0,AAL,2020-12-08,2021-12-08,17.6300,18.2300,0.034033
1,ACGL,2020-06-08,2021-06-08,35.6500,39.5700,0.109958
2,AEM,2020-08-07,2021-08-09,74.1234,54.0410,-0.270932
3,ALB,2021-08-10,2022-08-10,234.6565,256.3238,0.092336
4,ALGN,2023-02-03,2024-02-06,343.1000,288.4400,-0.159312
...,...,...,...,...,...,...
119,WSM,2021-03-29,2022-03-28,168.3673,141.5217,-0.159447
120,WSO,2020-08-04,2021-08-04,210.4821,259.7401,0.234025
121,Z,2020-05-26,2021-05-25,58.8700,112.5500,0.911840
122,ZG,2020-05-28,2021-05-27,57.2100,116.6700,1.039329
