![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.signal import argrelextrema
from collections import deque
from matplotlib.lines import Line2D
from datetime import timedelta

'''
    Much of this code is sourced at the following link: https://raposa.trade/blog/higher-highs-lower-lows-and-calculating-price-trends-in-python/
'''

def getHigherLows(data: np.array, order, K):
  '''
  Finds consecutive higher lows in price pattern.
  Must not be exceeded within the number of periods indicated by the width 
  parameter for the value to be confirmed.
  K determines how many consecutive lows need to be higher.
  '''
  # Get lows
  low_idx = argrelextrema(data, np.less, order=order)[0]
  lows = data[low_idx]
  # Ensure consecutive lows are higher than previous lows
  extrema = []
  ex_deque = deque(maxlen=K)
  for i, idx in enumerate(low_idx):
    if i == 0:
      ex_deque.append(idx)
      continue
    if lows[i] < lows[i-1]:
      ex_deque.clear()

    ex_deque.append(idx)
    if len(ex_deque) == K:
      extrema.append(ex_deque.copy())

  return extrema

def getLowerHighs(data: np.array, order=5, K=2):
  '''
  Finds consecutive lower highs in price pattern.
  Must not be exceeded within the number of periods indicated by the width 
  parameter for the value to be confirmed.
  K determines how many consecutive highs need to be lower.
  '''
  # Get highs
  high_idx = argrelextrema(data, np.greater, order=order)[0]
  highs = data[high_idx]
  # Ensure consecutive highs are lower than previous highs
  extrema = []
  ex_deque = deque(maxlen=K)
  for i, idx in enumerate(high_idx):
    if i == 0:
      ex_deque.append(idx)
      continue
    if highs[i] > highs[i-1]:
      ex_deque.clear()

    ex_deque.append(idx)
    if len(ex_deque) == K:
      extrema.append(ex_deque.copy())

  return extrema

def getHigherHighs(data: np.array, order, K):
  '''
  Finds consecutive higher highs in price pattern.
  Must not be exceeded within the number of periods indicated by the width 
  parameter for the value to be confirmed.
  K determines how many consecutive highs need to be higher.
  '''
  # Get highs
  high_idx = argrelextrema(data, np.greater, order = order)[0]
  highs = data[high_idx]
  # Ensure consecutive highs are higher than previous highs
  extrema = []
  ex_deque = deque(maxlen=K)
  for i, idx in enumerate(high_idx):
    if i == 0:
      ex_deque.append(idx)
      continue
    if highs[i] < highs[i-1]:
      ex_deque.clear()

    ex_deque.append(idx)
    if len(ex_deque) == K:
      extrema.append(ex_deque.copy())

  return extrema

def getLowerLows(data: np.array, order, K):
  '''
  Finds consecutive lower lows in price pattern.
  Must not be exceeded within the number of periods indicated by the width 
  parameter for the value to be confirmed.
  K determines how many consecutive lows need to be lower.
  '''
  # Get lows
  low_idx = argrelextrema(data, np.less, order=order)[0]
  lows = data[low_idx]
  # Ensure consecutive lows are lower than previous lows
  extrema = []
  ex_deque = deque(maxlen=K)
  for i, idx in enumerate(low_idx):
    if i == 0:
      ex_deque.append(idx)
      continue
    if lows[i] > lows[i-1]:
      ex_deque.clear()

    ex_deque.append(idx)
    if len(ex_deque) == K:
      extrema.append(ex_deque.copy())

  return extrema

def get_trend(close_data, order, K, plot):
    '''
    Get the trend of the stock
    '''

    #close_data = [x for x in close_data]

    # data set to dataframe empty
    data = pd.DataFrame()
    data['Close'] = close_data
    close = data['Close'].values

    hh = getHigherHighs(close, order, K)
    hl = getHigherLows(close, order, K)
    ll = getLowerLows(close, order, K)
    lh = getLowerHighs(close, order, K)

    # plot the above patterns
    
    if plot == True:
      fig, ax = plt.subplots()
      for pattern in hh:
          ax.plot(pattern, close[pattern], color='green')
          # plot a do on the second point
          ax.plot(pattern[1], close[pattern[1]], 'go')
      for pattern in hl:
          ax.plot(pattern, close[pattern], color='blue')
          ax.plot(pattern[1], close[pattern[1]], 'bo')
      for pattern in ll:
          ax.plot(pattern, close[pattern], color='red')
          ax.plot(pattern[1], close[pattern[1]], 'ro')
      for pattern in lh:
          ax.plot(pattern, close[pattern], color='orange')
          ax.plot(pattern[1], close[pattern[1]], 'yo')
      ax.plot(close, color='black')
      ax.set_title('Price Patterns')
      ax.set_xlabel('Period')
      ax.set_ylabel('Price')
      ax.legend(['Close', 'HH', 'HL', 'LL', 'LH'])
      plt.show()


    

    # format for tuples inside patterns: [type, location first price, location second price, first price, second price]
    patterns = []
    for pattern in hh:
    # append a tuple with date and "hh"
        patterns.append(('hh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]]))
    for pattern in hl:
        patterns.append(('hl', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]]))
    for pattern in ll:
        patterns.append(('ll', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]]))
    for pattern in lh:
        patterns.append(('lh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]]))

    # sort by the second date
    #patterns.sort(key=lambda x: x[2], reverse=True)

    trend = 0

    recent_movements = patterns
    recent_swing_up = 0
    recent_swing_down = 0
    for x in recent_movements:
      if x[0] == 'hh' or x[0] == 'hl':
         recent_swing_up += (x[4] - x[3])
      else:
          recent_swing_down += (x[4] - x[3])


    recent_swing = recent_swing_up + recent_swing_down
    if plot == True:
      print("recent_swing: ", recent_swing)

    return (trend, recent_swing)





In [2]:
# QuantBook Analysis Tool
import numpy as np
import matplotlib.pyplot as plt
# For more information see https://www.quantconnect.com/docs/research/overview
from AlgorithmImports import *



def run(fast_length, slow_length, trailing_stop, ticker, price_smoothing, rebuy_period, atr_period, atr_multiple, atr_define_interval, trend_length, trend_thresh, ma_band_size):

    qb = QuantBook()
    # Locally Lean installs free sample data, to download more data please visit https://www.quantconnect.com/docs/v2/lean-cli/datasets/downloading-data
    qb.SetStartDate(2020, 3, 25)
    AAPL = qb.AddEquity(ticker, Resolution.Minute).Symbol
    sma_fast = qb.sma(AAPL,fast_length*390, Resolution.Minute)
    sma_slow = qb.sma(AAPL, slow_length*390, Resolution.Minute)
    atr = qb.atr(AAPL, atr_period*390, MovingAverageType.Simple, Resolution.Minute)


    sma_slow_list = []
    sma_slow_band_lower_list = []
    sma_slow_band_upper_list = []
    sma_fast_list = []
    atr_list = []
    price_list = []

    history_bars = qb.History[TradeBar](AAPL, 180*390, Resolution.Minute) 

    for bar in history_bars:
        sma_slow.Update(bar.EndTime, bar.Close)
        sma_fast.Update(bar.EndTime, bar.Close)
        atr.Update(bar)
        atr_list.append(atr.Current.Value)
        price_list.append(bar.Close)
        sma_slow_list.append(sma_slow.Current.Value)
        sma_slow_band_lower_list.append(sma_slow.Current.Value * ( 1 - ma_band_size))
        sma_slow_band_upper_list.append(sma_slow.Current.Value * ( 1 + ma_band_size))
        sma_fast_list.append(sma_fast.Current.Value)
    
    print("sma_slow_list: ", sma_slow_list)

    #sma_slow_derivative = np.gradient([x for x in sma_slow])
    #sma_slow_derivative = np.gradient([x for x in sma_slow])
    # make plot bigger


    above_below = 0
    buy_price = None
    short_price = None

    profit = 0



    #buys = [None] * len(price_list)
    #shorts = [None] * len(price_list)
    peak_while_long = None
    peak_while_short = None
    #trailing_stop_sells = [None] * len(price_list)
    #trailing_stop_covers = [None] * len(price_list)
    #atr_band = [None] * len(price_list)
    sell_price = None
    cover_price = None
    #trailing_stop_short_band = [None] * len(price_list)
    #trailing_stop_long_band = [None] * len(price_list)
    #peak_while_shorts = [None] * len(price_list)
    #total_profit = [0] * len(price_list)
    #trends = [0] * len(price_list)

    #relations = [None] * len(price_list)


    trend_index = trend_length * 390

    for i in range(trend_index, len(price_list)):
        

        if sma_fast_list[i] > sma_slow_band_upper_list[i]:
            relation = 1
        elif sma_fast_list[i] < sma_slow_band_lower_list[i]:
            relation = -1
        else:
            relation = 0

        if relation and above_below == 0:
            trend_info = get_trend(price_list[i-trend_index:i], 5*60, 2, False)
            recent_swing = trend_info[1]/ price_list[i]
            if recent_swing > trend_thresh:
                trend = 1
            elif recent_swing < -trend_thresh:
                trend = -1
            else:
                trend = 0

            if short_price != None:
                    short_profit = (short_price - price_list[i])/short_price
                    profit += short_profit
                    short_price = None
                    peak_while_short = None

            if trend == 1:
                buy_price = price_list[i]
                peak_while_long = price_list[i]
                
        elif not relation and above_below == 0:
            
            trend_info = get_trend(price_list[i-trend_index:i], 5*60, 2, False)
            recent_swing = trend_info[1]/price_list[i]
            if recent_swing > trend_thresh:
                trend = 1
            elif recent_swing < -trend_thresh:
                trend = -1
            else:
                trend = 0

            if buy_price != None:
                    long_profit = (price_list[i] - buy_price)/buy_price
                    profit += long_profit
                    buy_price = None
                    peak_while_long = None

            if trend == -1:
                short_price = price_list[i]
                peak_while_short = price_list[i]
            
        else:
            if buy_price != None:
                smoothed_price = []
                for j in range(price_smoothing):
                    smoothed_price.append(price_list[i-j])
                smoothed_price = sum(smoothed_price)/price_smoothing

                peak_while_long = max(peak_while_long, price_list[i]) if peak_while_long != None else price_list[i]
                if smoothed_price < (peak_while_long * (1-trailing_stop)): #or price_list[i] < begin_price - atr_stop:
                    profit += (price_list[i] - buy_price)/buy_price
                    sell_price = (i, peak_while_long)
                    buy_price = None
                    peak_while_long = None
                
            if short_price != None:
                smoothed_price = []
                for j in range(price_smoothing):
                    smoothed_price.append(price_list[i-j])
                smoothed_price = sum(smoothed_price)/price_smoothing
                peak_while_short = min(peak_while_short, price_list[i]) if peak_while_short != None else price_list[i]

                if smoothed_price > (peak_while_short * (1 + trailing_stop)): #or price_list[i] > begin_price + atr_stop:
                    profit += (short_price - price_list[i])/short_price
                    cover_price = (i, peak_while_short)
                    short_price = None
                    peak_while_short = None
        
        if sell_price is not None and i > sell_price[0] + rebuy_period[0] and i < sell_price[0] + rebuy_period[1] and buy_price == None and short_price == None:
            if price_list[i] > sell_price[1] and price_list[i] > sma_fast_list[i] and price_list[i] > sma_slow_list[i]:
                # buy back in
                buy_price = price_list[i]
                peak_while_long = price_list[i]
                sell_price = None
        if cover_price is not None and i > cover_price[0] + rebuy_period[0] and i < cover_price[0] + rebuy_period[1] and buy_price == None and short_price == None:
            if price_list[i] < cover_price[1] and price_list[i] < sma_fast_list[i] and price_list[i] < sma_slow_list[i]:
                # short back in
                peak_while_short = price_list[i]
                short_price = price_list[i]
                cover_price = None
        
        above_below = relation
    
        if i == len(price_list)-1:
            if buy_price != None:
                profit += (price_list[i] - buy_price)/buy_price
            if short_price != None:
                profit += (short_price - price_list[i])/short_price

    return profit#, price_list, sma_slow_list, sma_fast_list, buys, shorts, trailing_stop_sells, trailing_stop_covers, atr_band, trailing_stop_long_band, trailing_stop_short_band, peak_while_shorts, total_profit, trends, sma_slow_band_lower_list, sma_slow_band_upper_list, relations



In [6]:
# grid search for optimal parameters
# fast_length, slow_length, trailing_stop, ticker, price_smoothing, rebuy_period, atr_period, atr_multiple, atr_define_interval, trend_length, trend_thresh
fast_lengths = [9, 18, 27]
slow_lengths = [25, 35, 45]
trailing_stops =[ .08, .1, .12]
price_smoothings = [4*60]
rebuy_periods = [[10 * 390, 30*390]] # 390 minutes in a trading day
tickers = ["AAPL", "INTC", "COST", "TSLA", "MSFT", "DIS"] #"NVDA", "AMZN", "FB", "INTC", "AMD"]
atr_periods = [1]
atr_multiples = [1]
atr_define_intervals = [1]
trend_lengths = [5*60]
trend_threshs = [.1, .2, .3]
ma_band_size = [.02]

grid_search_results = {}
total_combos = len(fast_lengths) * len(slow_lengths) * len(trailing_stops) * len(price_smoothings) * len(rebuy_periods) * len(tickers) * len(trend_threshs)# * len(atr_period) * len(atr_multiples) * len(atr_define_interval)
count = 0

for ticker in tickers:
    grid_search_results[ticker] = []
    for fast_length in fast_lengths:
        for slow_length in slow_lengths:
            for trailing_stop in trailing_stops:
                for price_smoothing in price_smoothings:
                    for rebuy_period in rebuy_periods:
                        for atr_period in atr_periods:
                            for atr_multiple in atr_multiples:
                                for atr_define_interval in atr_define_intervals:
                                    for trend_length in trend_lengths:
                                        for trend_thresh in trend_threshs:
                                            profit = run(fast_length, slow_length, trailing_stop, ticker, price_smoothing, rebuy_period, atr_period, atr_multiple, atr_define_interval, trend_length, trend_thresh, ma_band_size[0])[0]
                                            grid_search_results[ticker].append((fast_length, slow_length, trailing_stop, price_smoothing, rebuy_period, trend_threshs, profit))
                                            count += 1
                                            #print(f"Finished {count}/{total_combos}")
for key in grid_search_results:
    print("- " + str(key) + " -")
    # sort by profit
    grid_search_results[key] = sorted(grid_search_results[key], key=lambda x: x[11], reverse=True)
    for i in range(min(5, len(grid_search_results[key]))):
        result = grid_search_results[key][i]
        print("***** ", result)

#import pandas as pd
#import plotly.express as px

# Convert to DataFrame
#df = pd.DataFrame(grid_search_results, columns=['Var1', 'Var2', 'Var3', 'Output'])

# 3D Scatter Plot
#fig = px.scatter_3d(df, x='Var1', y='Var2', z='Var3', color='Output', title='Grid Search Output 3D Scatter Plot')
#fig.show()

In [None]:
for key in grid_search_results:
    print("- " + str(key) + " -")
    # sort by profit
    grid_search_results[key] = sorted(grid_search_results[key], key=lambda x: x[5], reverse=True)
    for i in range(15):
        result = grid_search_results[key][i]
        print("***** ", result)

In [9]:
vals = run(fast_length = 15, slow_length = 35, trailing_stop = .04, ticker =  "SPY", price_smoothing = 3*60, rebuy_period =  [390*5, 390*20], atr_period = 28, atr_multiple = 10, atr_define_interval = 7, trend_length = 25, trend_thresh = 0, ma_band_size = .01)
# fast_length, slow_length, trailing_stop, ticker, price_smoothing, rebuy_period, atr_period, atr_multiple, atr_define_interval, trend_length, trend_thresh

print("profit: ", vals[0])
# vals structure: profit, price_list, sma_slow_list, sma_fast_list, buys, shorts, trailing_stop_sells, trailing_stop_covers



fig = plt.figure(figsize=(40, 20))
plt.plot(vals[1], label='Price', color='black')
plt.plot(vals[2], label='sma slow', color='yellow')
plt.plot(vals[3], label='sma fast', color='green')
plt.plot(vals[9], label='trailing_long', color='red')
plt.plot(vals[10], label='trailing_short', color='blue')
plt.plot(vals[14], label='sma slow lower band', color='purple')
plt.plot(vals[15], label='sma slow upper band', color='purple')

print("peak_while_shorts: ", vals[11])

# plot buys and shorts as triangles over price but make them big
plt.scatter(range(len(vals[1])), vals[4], color='green', s=700, marker='^')
plt.scatter(range(len(vals[1])), vals[5], color='red', s=700, marker='v')
plt.scatter(range(len(vals[1])), vals[6], color='purple', s=700, marker='x')
plt.scatter(range(len(vals[1])), vals[7], color='orange', s=700, marker='x')

#plt.plot(sma_fast_list[slice*7:], label='sma fast')
plt.legend()
plt.show()


# plot trends
fig = plt.figure(figsize=(40, 10))
plt.plot(vals[13])
plt.show()

# plot total profit over time
fig = plt.figure(figsize=(40, 10))
plt.plot(vals[12])
plt.show()

# plot relations
fig = plt.figure(figsize=(40, 10))
plt.plot(vals[16])
plt.show()

