In [1]:
from yahoo_fin import stock_info as si

def get_tickers():
    return si.tickers_sp500()

In [2]:
def add_trend_signal(df):
    df['Trend'] = 0
    for i in range(5, len(df)):
        window = df.iloc[i-5:i]
        if all(window['Close'] > window['EMA_200']):
            df.loc[i, 'Trend'] = 1
        elif all(window['Close'] < window['EMA_200']):
            df.loc[i, 'Trend'] = -1

    return df

In [3]:
def add_macd_signal(df):
    df['MACD_cross'] = 0
    for i in range(2, len(df)):
        before = df.iloc[i-2:i]
        if all(before['MACD_12_26_9'] < before['MACDs_12_26_9']) and df.loc[i, 'MACD_12_26_9'] > df.loc[i, 'MACDs_12_26_9']:
            df.loc[i, 'MACD_cross'] = 1
        elif all(before['MACD_12_26_9'] > before['MACDs_12_26_9']) and df.loc[i, 'MACD_12_26_9'] < df.loc[i, 'MACDs_12_26_9']:
            df.loc[i, 'MACD_cross'] = -1

    return df
            

In [4]:
def add_sma_signal(df):
    df['SMA_cross'] = 0
    for i in range(5, len(df)):
        window = df.iloc[i-5:i]
        if all(window['SMA_50'] < window['SMA_200']):
            if df.loc[i, 'SMA_50'] > df.loc[i, 'SMA_200']:
                df.loc[i, 'SMA_cross'] = 1
        elif all(window['SMA_50'] > window['SMA_200']):
            if df.loc[i, 'SMA_50'] < df.loc[i, 'SMA_200']:
                df.loc[i, 'SMA_cross'] = -1

    return df

In [5]:
def add_total_signal(df):
    # Strategy: Trade with trend, rsi > 50, and engulfing candle
    # Trade Management: Stop loss @ 2x candle width, 1.5 risk to reward ratio
    # -------------------------------------------------------------------------------
    # Trial 1: Very negative returns, going to try just buys
    # Trial 2: Only did buy positions, positive returns, not as high as buy & hold returns
    # Results: ~35% win rate with positive decent returns (14.5%) for 2.0 ratio
    
    df['engulfing_signal'] = 0
    df.loc[ (df['Close'] > df['EMA_200']) & (df['RSI'] > 50) & (df['Engulfing'] == 100), 'engulfing_signal'] = 1

    # Strategy: Trade with trend, if candle closes outside of bollinger bands, make a limit order at 3% below closing price
    # Trade Management: Limit order is valid for 5 days, stop loss @ 3% below limit, 1.5 risk to reward ratio
    # -------------------------------------------------------------------------------
    # Trial 1: ~40% win rate with positive decent returns (11.5%) for 1.9 ratio
    # Trail 2: Sell when RSI crosses 50 line, much better returns (~27%)
    # Trial 3: Same trade management, only buy signals. ~45% return rate and 72% win rate
    df['bb_signal'] = 0
    df.loc[ (df['Trend'] == 1) & (df['Close'] < df['BBL_20_2.0']), 'bb_signal'] = 1
    # df.loc[ (df['Trend'] == -1) & (df['Close'] > df['BBU_20_2.0']), 'bb_signal'] = -1

    # Strategy: Trade with trend, look for MACD crossover above or below the zero line
    # Trade Management: Optimize parameters for sl and tp
    # -------------------------------------------------------------------------------
    # Trial 1: ~ 60% win rate with ~16% return rate, begin paper trade testing
    df['macd_signal'] = 0
    df.loc[ (df['Trend'] == 1) & (df['MACD_cross'] == 1) & (df['MACD_12_26_9'] < 0), 'macd_signal'] = 1
    # df.loc[ (df['Trend'] == -1) & (df['MACD_cross'] == -1) & (df['MACD_12_26_9'] > 0), 'macd_signal'] = -1

    #Strategy: Trade with trend, buy if 50 SMA crosses above 200 SMA, sell is 50 SMA crosses below 200 SMA
    #Trade Management: Trailing stop loss @ 2x ATR
    # -------------------------------------------------------------------------------
    # Trial 1:

    return df

In [6]:
import yfinance as yf
from yahoo_fin import stock_info as si
import pandas_ta as ta
import numpy as np
import pandas as pd

def import_data(stock, startDate):
    data = yf.download(stock, start=startDate, interval='1d')

    data['EMA_200'] = ta.ema(data['Close'], length=200)
    data['SMA_50'] = ta.sma(data['Close'], length=50)
    data['SMA_200'] = ta.sma(data['Close'], length=200)
    data['RSI'] = ta.rsi(data['Close'], length=14)
    data['ATR'] = ta.atr(data['High'], data['Low'], data['Close'], length=14)
    data['Engulfing'] = data.ta.cdl_pattern(name="engulfing")
    bbands = ta.bbands(data['Close'], length=20, std=2)
    macd = ta.macd(data['Close'])
    data = data.join(bbands)
    data = data.join(macd)
    
    data.reset_index(inplace=True)
    
    data = add_trend_signal(data)
    data = add_macd_signal(data)
    data = add_total_signal(data)
    data = add_sma_signal(data)

    return data

In [7]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def print_candles(df):
    # Create a figure with two rows for subplots
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, 
                        vertical_spacing=0.3, 
                        row_heights=[0.7, 0.3],
                        subplot_titles=("Candlestick Chart with Indicators", "RSI"))

    # Candlestick chart
    fig.add_trace(go.Candlestick(x=df.index,
                                 open=df['Open'],
                                 high=df['High'],
                                 low=df['Low'],
                                 close=df['Close']),
                  row=1, col=1)

    # EMA and Bollinger Bands on the first plot
    fig.add_trace(go.Scatter(x=df.index, y=df['SMA_50'], 
                             line=dict(color='red', width=1), 
                             name="EMA 200"),
                  row=1, col=1)
    fig.add_trace(go.Scatter(x=df.index, y=df['SMA_200'], 
                             line=dict(color='blue', width=1), 
                             name="BB Upper"),
                  row=1, col=1)

    # Buy and sell signals on the first plot
    buys = df.loc[(df['SMA_cross'] == 1)].index
    sells = df.loc[(df['SMA_cross'] == -1)].index

    for date in buys:
        fig.add_vline(x=date, line_width=1, line_dash="dash", line_color="green", row=1, col=1)
    for date in sells:
        fig.add_vline(x=date, line_width=1, line_dash="dash", line_color="red", row=1, col=1)

    # RSI on the second plot
    fig.add_trace(go.Scatter(x=df.index, y=df['MACD_12_26_9'], 
                             line=dict(color='blue', width=1), 
                             name="RSI"),
                  row=2, col=1)
    fig.add_trace(go.Scatter(x=df.index, y=df['MACDs_12_26_9'], 
                             line=dict(color='orange', width=1), 
                             name="RSI"),
                  row=2, col=1)

    # Update layout
    fig.update_layout(width=1200, height=800)
    fig.show()

In [8]:
# Strategy #1 Backtest
from backtesting import Strategy
from backtesting import Backtest
def optimize_engulfing(df):
    def SIGNAL():
        return df['engulfing_signal']
    
    class MyStrat(Strategy):
        mysize = 0.1
        ratio = 1

        def init(self):
            super().init()
            self.signal1 = self.I(SIGNAL)

        def next(self):
            super().next()        
            if self.signal1==1 and len(self.trades)==0:
                candle_width = self.data.Close[-1] - self.data.Open[-1]
                stop_loss = self.data.Close[-1] - (candle_width * 2)
                take_profit = self.data.Close[-1] + (candle_width * 2 * self.ratio)
                self.buy(sl=stop_loss, tp=take_profit, size=self.mysize)
            
            elif self.signal1==-1 and len(self.trades)==0:         
                candle_width = self.data.Open[-1] - self.data.Close[-1]
                stop_loss = self.data.Close[-1] + (candle_width * 2)
                take_profit = self.data.Close[-1] - (candle_width * 2 * self.ratio)
                self.sell(sl=stop_loss, tp=take_profit, size=self.mysize)

    bt = Backtest(df, MyStrat, cash=10000, margin=1/30)
    # results = bt.run()
    # print(results)
    # print(results._trades)

    stats, heatmap = bt.optimize(ratio=[i/10 for i in range(10, 21)],
                        maximize='Return [%]', max_tries=300,
                            random_state=0,
                            return_heatmap=True)
    # print(stats)
    # print(stats['_strategy'])
    # print(stats['_trades'])

    return stats['_strategy']

  from .autonotebook import tqdm as notebook_tqdm


In [9]:
# Strategy #2 Backtest
from backtesting import Strategy
from backtesting import Backtest
def get_parameters(df, percent=0.03):
    def SIGNAL():
        return df['bb_signal']
    
    class MyStrat(Strategy):
        mysize = 0.1

        def init(self):
            super().init()
            self.signal1 = self.I(SIGNAL)
            self.order_date = None

        def next(self):
            super().next()

            if len(self.trades) == 0 and len(self.orders) > 0 and (len(self.data) - self.order_date > 5):
                self.orders[0].cancel()

            if self.signal1==1 and len(self.trades)==0:
                if len(self.orders) == 0: 
                    limit_price = self.data.Close[-1] - (self.data.Close[-1] * percent)
                    self.buy(limit=limit_price, size=self.mysize)
                    self.order_date = len(self.data)
            
            # elif self.signal1==-1 and len(self.trades)==0:         
            #     limit_price = self.data.Close[-1] + (self.data.Close[-1] * percent)
            #     # stop_loss = limit_price + (self.data.Close[-1] * percent)
            #     # take_profit = limit_price - (self.data.Close[-1] * percent * ratio)
            #     if len(self.orders) > 0:
            #         for i in range (0, len(self.orders)):
            #             self.orders[i].cancel()
            #     self.sell(limit=limit_price, size=self.mysize)
            #     self.order_date = len(self.data)

            for trade in self.trades:
                if trade.is_long and self.data.RSI[-1] > 50:
                    trade.close()
                elif trade.is_short and self.data.RSI[-1] < 50:
                    trade.close()
            
    bt = Backtest(df, MyStrat, cash=10000, margin=1/30)
    results = bt.run()
    # print(results)
    # print(results._trades)

    return results

In [10]:
# # Strategy #3 Backtest
# from backtesting import Strategy
# from backtesting import Backtest
# def get_parameters(df, sl_best, ratio_best):
#     def SIGNAL():
#         return df['macd_signal']
    
#     class MyStrat(Strategy):
#         mysize = 0.1
#         sl = sl_best
#         ratio = ratio_best

#         def init(self):
#             super().init()
#             self.signal1 = self.I(SIGNAL)

#         def next(self):
#             super().next()        
#             if self.signal1==1 and len(self.trades)==0:
#                 sl_value = self.sl * self.data.ATR[-1]
#                 stop_loss = self.data.Close[-1] - sl_value
#                 take_profit = self.data.Close[-1] + (sl_value * self.ratio)
#                 self.buy(sl=stop_loss, tp=take_profit, size=self.mysize)
            
#             elif self.signal1==-1 and len(self.trades)==0:         
#                 sl_value = self.sl * self.data.ATR[-1]
#                 stop_loss = self.data.Close[-1] + sl_value
#                 take_profit = self.data.Close[-1] - (sl_value * self.ratio)
#                 self.sell(sl=stop_loss, tp=take_profit, size=self.mysize)

#     bt = Backtest(df, MyStrat, cash=10000, margin=1/30)
#     results = bt.run()
#     # print(results)
#     # print(results._trades)

#     # stats, heatmap = bt.optimize(sl=[i/10 for i in range(10, 36)],
#     #                     ratio=[i/10 for i in range(10, 26)],
#     #                     maximize='Return [%]', max_tries=300,
#     #                         random_state=0,
#     #                         return_heatmap=True)
#     # print(stats)
#     # print(stats['_strategy'])
#     # print(stats['_trades'])

#     return results

In [11]:
# Strategy #3 Backtest
from backtesting import Strategy
from backtesting import Backtest
def optimize(df):
    def SIGNAL():
        return df['macd_signal']
    
    class MyStrat(Strategy):
        mysize = 0.1
        sl = 1.5
        ratio = 2

        def init(self):
            super().init()
            self.signal1 = self.I(SIGNAL)

        def next(self):
            super().next()        
            if self.signal1==1 and len(self.trades)==0:
                sl_value = self.sl * self.data.ATR[-1]
                stop_loss = self.data.Close[-1] - sl_value
                take_profit = self.data.Close[-1] + (sl_value * self.ratio)
                self.buy(sl=stop_loss, tp=take_profit, size=self.mysize)
            
            elif self.signal1==-1 and len(self.trades)==0:         
                sl_value = self.sl * self.data.ATR[-1]
                stop_loss = self.data.Close[-1] + sl_value
                take_profit = self.data.Close[-1] - (sl_value * self.ratio)
                self.sell(sl=stop_loss, tp=take_profit, size=self.mysize)

    bt = Backtest(df, MyStrat, cash=10000, margin=1/30)
    # results = bt.run()
    # print(results)
    # print(results._trades)

    stats, heatmap = bt.optimize(sl=[i/10 for i in range(10, 26)],
                        ratio=[i/10 for i in range(10, 21)],
                        maximize='Return [%]', max_tries=300,
                            random_state=0,
                            return_heatmap=True)
    # print(stats)
    # print(stats['_strategy'])
    # print(stats['_trades'])

    return stats['_strategy']

In [12]:
import math

def test_full_strategy():
    # for ratio in range(10, 30):
        my_results = []
        buy_hold = []
        win_rates = []
        sp500_url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
        sp500_table = pd.read_html(sp500_url)
        TICKERS = sp500_table[0]['Symbol'].tolist()
        print(TICKERS)
        print(len(TICKERS))
        for ticker in TICKERS:
            try:
                df = import_data(ticker, '2023-09-27')
                # results = get_parameters(df)

                # Strategy 3 testing
                # half = df.shape[0] // 2
                # train = df.iloc[:half]
                # test = df.iloc[half:]
                # optimal = optimize(train)
                # sl_best = optimal.sl
                # ratio_best = optimal.ratio
                results = get_parameters(df)

                my_results.append(results['Return [%]'])
                buy_hold.append(results['Buy & Hold Return [%]'])
                win_rates.append(results['Win Rate [%]'])
            except Exception as e:
                print(ticker, e)

        print(f'My average results: {sum(my_results) / len(my_results)} \n')
        print(f'Buy and Hold average results: {sum(buy_hold) / len(buy_hold)} \n')
        win_rates = [rate for rate in win_rates if rate is not None and not math.isnan(rate)]
        print(f'Win Rate: {sum(win_rates) / len(win_rates)} \n \n')

        with open('equity.txt', 'a') as f:
            f.write(f'My average results: {sum(my_results) / len(my_results)} \n')
            f.write(f'Buy and Hold average results: {sum(buy_hold) / len(buy_hold)} \n')
            win_rates = [rate for rate in win_rates if rate is not None and not math.isnan(rate)]
            f.write(f'Win Rate: {sum(win_rates) / len(win_rates)} \n \n')

In [13]:
# def calc_bb(ticker, risk):
#     df = import_data(ticker, '2022-01-01')
#     today = df.iloc[-1]
#     curr = today.Close
#     order = curr - (curr * .03)
#     stop = order - (order * .03)
#     shares = risk / (order - stop)

#     order = round(order, 2)
#     shares = round(shares, 2)

#     print(f'Buy {ticker} at {order}, Shares: {shares}')

In [14]:
def calc_engulfing(ticker):
    df = import_data(ticker, '2022-01-01')
    # results = get_parameters(df)

    optimal = optimize_engulfing(df)
    ratio_best = optimal.ratio

    today = df.iloc[-1]
    close = today.Close
    open = today.Open

    sl_value = (close - open) * 2
    stop = close - (sl_value)
    tp = close + (sl_value * ratio_best)

    shares = 10 / (close - stop)

    print(f'Buy {ticker} at {round(close, 2)}, stop = {round(stop, 2)}, tp = {round(tp, 2)}, shares = {round(shares, 2)}')

In [15]:
def calc_macd(ticker):
    df = import_data(ticker, '2022-01-01')
    # results = get_parameters(df)

    optimal = optimize(df)
    sl_best = optimal.sl
    ratio_best = optimal.ratio

    today = df.iloc[-1]
    close = today.Close
    atr = today.ATR

    sl_value = atr * sl_best
    stop = close - (sl_value)
    tp = close + (sl_value * ratio_best)

    shares = 10 / (close - stop)

    print(f'Buy {ticker} at {round(close, 2)}, stop = {round(stop, 2)}, tp = {round(tp, 2)}, shares = {round(shares, 2)}')

In [16]:
# def find_buys():
#     bb_buys = []
#     engulfing_buys = []
#     macd_buys = []
#     sp500_url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
#     sp500_table = pd.read_html(sp500_url)
#     TICKERS = sp500_table[0]['Symbol'].tolist()

#     for ticker in TICKERS:
#         try:
#             df = import_data(ticker, '2023-01-01')
#             engulfing_signal = df.iloc[-1]['engulfing_signal']
#             bb_signal = df.iloc[-1]['bb_signal']
#             macd_signal = df.iloc[-1]['macd_signal']
#             if engulfing_signal == 1:
#                 engulfing_buys.append(ticker)
#             if bb_signal != 0:
#                 bb_buys.append(ticker)
#             if macd_signal != 0:
#                 macd_buys.append(ticker)
#         except Exception as e:
#             print(ticker, e)

#     print(f'BB Buy Signals: {bb_buys}')
#     for ticker in bb_buys:
#         calc_bb(ticker, 20)
#         calc_bb(ticker, 1000)
#     print(f'Engulfing Buy Signals: {engulfing_buys}')
#     for ticker in engulfing_buys:
#         calc_engulfing(ticker)
#     print(f'MACD Signals: {macd_buys}')
#     for ticker in macd_buys:
#         calc_macd(ticker)

In [17]:
find_buys()

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%*******

AMTM 'NoneType' object is not iterable


[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%*******

In [18]:
df = import_data('ADBE', '2023-01-01')
today = df.iloc[-1]
close = today.Close
atr = today.ATR
stop_loss = close - 2*atr
print(f'Stop loss: {round(stop_loss, 2)}')

[*********************100%%**********************]  1 of 1 completed


Stop loss: 487.99
