# Triangle Price Patterns

In [685]:
import pandas as pd
import yfinance as yf
import numpy as np
from scipy.signal import argrelextrema
from scipy.stats import linregress
import numpy as np
from matplotlib import pyplot
from scipy.stats import linregress
import plotly.graph_objects as go
import pandas_ta as ta
import talib

In [686]:

def find_extrema(df, n):
    # Find local peaks
    df['min'] = df.iloc[argrelextrema(df['RSI'].values, np.less_equal, order=n)[0]]['low']
    df['max'] = df.iloc[argrelextrema(df['RSI'].values, np.greater_equal, order=n)[0]]['high']
    return df

In [687]:
def plot_chart(df, window, xxmin, xxmax, slmin, slmax, intercmin, intercmax):
#def plot_chart(df, window, xxmin, xxmax, yymin, yymax):
    candle_id = len(df)
    dfpl = df[candle_id-window-30:candle_id+window+30]
    # Candle sticks
    fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                    open=dfpl['open'],
                    high=dfpl['high'],
                    low=dfpl['low'],
                    close=dfpl['close']),
                    go.Scatter(x=dfpl.index, y=dfpl.EMA8, line=dict(color='green', width=1), name="EMA12"),
                    go.Scatter(x=dfpl.index, y=dfpl.EMA12, line=dict(color='black', width=1), name="EMA12"),
                    go.Scatter(x=dfpl.index, y=dfpl.EMA26, line=dict(color='orange', width=1), name="EMA26")
                    ])

    # Pivots
    fig.add_scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
    marker=dict(size=4, color="Yellow"),
    name="pivot", hovertext=df['Datetime'])

    # Breakout point
    breakout_bars = dfpl[dfpl['breakout'] != 0]
    fig.add_scatter(x=breakout_bars.index, y=breakout_bars['close'], mode="markers",
                    marker=dict(size=4 * (abs(breakout_bars['breakout']) *2), color="Blue"),
                    name="breakout",
                    hovertext=breakout_bars['Datetime']
                    )

    # lines
    xxmin = xxmin[1:]
    xxmin = xxmin[1:]
    xxmin = np.append(xxmin, xxmin[-1]+40)
    xxmax = np.append(xxmax, xxmax[-1]+40)
    
    fig.add_trace(go.Scatter(x=xxmin, y=slmin*xxmin + intercmin, mode='lines', name='min slope'))
    #fig.add_trace(go.Scatter(x=xxmin, y=yymin, mode='lines', name='min slope'))
    fig.add_trace(go.Scatter(x=xxmax, y=slmax*xxmax + intercmax, mode='lines', name='max slope'))
    #fig.add_trace(go.Scatter(x=xxmax, y=yymax, mode='lines', name='max slope'))
    fig.update_layout(xaxis_rangeslider_visible=False)
    fig.show()


In [688]:
def make_best_selection(xx, yy):
    sl, inter, corr, p, s = linregress(xx, yy)
    x1, y1 = xx, yy
    remove_idx = 0
    max_corr = 0
    max_id = -1
    while corr < 0.99 and remove_idx < len(xx):
        x1 = xx[0:remove_idx] + xx[remove_idx+1:]
        y1 = yy[0:remove_idx] + yy[remove_idx+1:]
        sl, inter, corr, p, s = linregress(x1, y1)
        if max_corr < corr:
            max_corr = max(max_corr, corr)
            max_id = remove_idx
        remove_idx = remove_idx + 1
    
    if max_id >= 0:
        xx = xx[0:max_id] + xx[max_id+1:]
        yy = yy[0:max_id] + yy[max_id+1:]
    
    # print(xx, yy, max_corr)
    return np.array(xx), np.array(yy)

# Test
# x = [1,2,3,4,5,6]
# y = [10,20,30,70,80,60]
# x2,y2 = make_best_selection(x, y)


In [689]:
def applyBreakout(l, id):
    return l[id]
    
def formed_tri_signal(frame, back_candles):
    # print(print(frame[frame['pivot']==1]))
    # print(print(frame[frame['pivot']==2]))
    df = frame
    breakout = [0]*len(df)
    df['breakout'] = breakout
    candle_id = len(df) - 2
    nearest_pivot_id = None
    maxim = np.array([])
    minim = np.array([])
    xxmin = np.array([])
    xxmax = np.array([])
    detected = False
    min_correlation = 0.80
    for i in range(candle_id, candle_id - back_candles, -1):
        if minim.size < 4 and df.iloc[i].pivot == 1 and (df.iloc[i].low  or df.iloc[i].low != np.nan) :
            minim = np.append(minim, df.iloc[i].low)
            xxmin = np.append(xxmin, i)     

        if maxim.size < 4 and df.iloc[i].pivot == 2 and (df.iloc[i].high  or df.iloc[i].high != np.nan) :
            maxim = np.append(maxim, df.iloc[i].high)
            xxmax = np.append(xxmax, i) 
        
        if (xxmin.size < 4 or xxmax.size < 4):
            continue
        
        # select best 3 points out of 4.
        xxmin, minim = make_best_selection(xxmin.tolist(), minim.tolist())
        xxmax, maxim = make_best_selection(xxmax.tolist(), maxim.tolist())


        # Finally, compute all the variable one more time with the remains.
        slmin, intercmin, rmin, pmin, semin = linregress(xxmin, minim)
        slmax, intercmax, rmax, pmax, semax = linregress(xxmax, maxim)
        correlated_bottom = abs(rmin) >= min_correlation
        correlated_top = abs(rmax)>=min_correlation 
        correlated = correlated_bottom or correlated_top
        if not correlated:
            continue

        nearest_pivot_id = int(max(xxmax[0], xxmin[0]))
        horizontal_bottom = (-0.0001 <= abs(slmin)<=0.0001)
        horizontal_top = (-0.0001 <= abs(slmax)<=0.0001)
        converging = slmin>=0.00001 and slmax<=-0.00001
        diverging = slmin<=-0.00001 and slmax>=0.00001
        rising = slmin>=0.00001 and slmax>=0.00001 and slmin > slmax
        falling = slmin<=-0.00001 and slmax<=0.00001 and slmin < slmax
        highLine = slmax * i + intercmax
        lowLine = slmin * i + intercmin
        
        if (converging or diverging or rising or falling):
            if xxmax.size > 2 and correlated_top and (horizontal_top or converging or falling):
                for inner_i in range(nearest_pivot_id + 1, candle_id + 1):
                    if ( 
                        df.iloc[i].close > highLine
                        and df.iloc[i].close > df.iloc[i].EMA8 
                        # and df.iloc[i].EMA8 > df.iloc[i].EMA12
                        # and df.iloc[i].EMA12 > df.iloc[i].EMA26
                        and df.iloc[i].SLOPE_EMA8 > 0
                        # and df.iloc[i].SLOPE_EMA12 > 0
                        # and df.iloc[i].SLOPE_EMA26 > 0
                    ): # closed above EMA8 for bullish breakout.
                        detected = True
                        breakout[inner_i] = breakout[inner_i]  + 1 # bullish triangle breakout ready to go!
                        break
                break
            if (
                xxmin.size > 2 
                and correlated_bottom 
                and (horizontal_bottom or converging or rising)
                ):
                for inner_i in range(nearest_pivot_id + 1, candle_id + 1):
                    if (
                        df.iloc[i].close < lowLine
                        and  df.iloc[i].close < df.iloc[i].EMA8 
                        # and df.iloc[i].EMA8 < df.iloc[i].EMA12
                        # and df.iloc[i].EMA12 < df.iloc[i].EMA26
                        and df.iloc[i].SLOPE_EMA8 < 0
                        # and df.iloc[i].SLOPE_EMA12 < 0
                        # and df.iloc[i].SLOPE_EMA26 < 0
                        ) : # closed below EMA8 for bearish breakout.
                        detected = True
                        breakout[inner_i] = breakout[inner_i]  - 1 # bearish triangle breakout ready to go!
                        break
                break
            
    if detected:
        df['breakout'] = df.apply(lambda x: applyBreakout(breakout, x.name), axis=1)
        print(f"min points: {xxmin}, max points: {xxmax}")
        print(slmin, slmax, intercmin, intercmax)
        # plot_chart(df, 50, xxmin, xxmax, slmin, slmax, intercmin, intercmax) 
        #plot_chart(df, 50, xxmin, xxmax, minim, maxim) 
        frame = df
        #print(df.tail(1))
        return {
            "frame": frame, 
            "xxmin": xxmin, 
            "xxmax": xxmax, 
            "slmin": slmin, 
            "slmax": slmax, 
            "intercmin": intercmin, 
            "intercmax": intercmax
        }


In [690]:
def get_data(symbol, interval, period):
    # print(f"Detecting Triangle for {symbol}")
    df = yf.download(symbol, period=period, interval=interval, progress=False)
    df.rename(
        columns={"Datetime": "time", 'Date': 'time', 'Close': 'close', "Low": "low", "High": "high", "Open": "open",
                 "Volume": "volume"}, inplace=True)

    if not df.shape[0]:
        return None

    df = df[df['volume'] != 0]
    df.reset_index(inplace=True)
    df['RSI'] = ta.rsi(df.close, length=14)
    df['EMA8'] = ta.sma(df.close, length=8)#sma ema
    df['EMA12'] = ta.sma(df.close, length=12)#sma ema
    df['EMA26'] = ta.sma(df.close, length=26)#sma ema
    df['SLOPE_EMA8'] = talib.LINEARREG_ANGLE(pd.to_numeric(df['EMA8']), 6)
    df['SLOPE_EMA12'] = talib.LINEARREG_ANGLE(pd.to_numeric(df['EMA12']), 6)
    df['SLOPE_EMA26'] = talib.LINEARREG_ANGLE(pd.to_numeric(df['EMA26']), 6)
    dfCopy = df.copy()
    df = find_extrema(df,5)
    df['pivot'] = np.where(df["max"].notna(), 2, np.where(df["min"].notna(), 1, 0))
    df['pointpos'] = np.where(df["max"].notna(), df["max"], np.where(df["min"].notna(), df["min"], np.nan))
    return df
    

SyntaxError: invalid syntax (134767324.py, line 13)

In [None]:
# detect_pattern("TMUS", "15m", "60d")

In [None]:
# resDF = get_data("SPY", "15m", "60d")
# resDF.set_index('Datetime')
# resDF

In [None]:
tickers = [
    'MU', 'NVDA', 'SPY', 'AAPL', 'AMZN', 'CAT', 'SPOT', 'BA', 'DIS', 'CCL', 'CHTR',
    'AMD', 'BABA', 'EFX', 'ABNB', 'OXY', 'ZS', 'NET', 'DDOG', 'DASH', 'CRWD','SNOW',
    'OKTA','PANW','ROKU','HOOD','ZM','WBD', 'F', 'GM', 'SQ', 'COIN', 'TSM', 'LRCX', 
    'ASML', 'INTC', 'ENPH', 'FSLR', 'AMD', 'PYPL','MCD','WING','UPS','ABNB','SBUX',
    'WMT', 'NFLX', 'UBER', 'DIS', 'BA', 'GOOG', 'TSLA', 'RIVN', 'GE', 'WFC','V','AXP',
    'CRM','LVS', 'PTON','LOW','HD','COST','NKE','TGT','TTD','ADBE','RBLX','LULU','AAL',
    'UAL', 'PLUG', 'FDX','ALGN','ALLY','SNAP','AFRM','UPST','GS','MTCH','NIO','LCID',
    'XPEV','PINS','QS','RKLB','PLTR','DKNG','BMBL','MTTR','LAZR','MCW','MA', 'QCOM','XLF',
    'BIDU','JD','PATH','Z','PFE','ABBV','XLE', 'SU','CVX','OXY','T', 'TMUS', 'VZ'
]


In [None]:
#tickers = ['AMZN', 'SPY', "QQQ", "IWM", 'TSLA', 'GOOG', 'PINS',  'BABA', "DIS", "BA", "WFC", "SPOT"]
symbol_file = "/Users/arifman/dev/RSI-divergence-detector/input/options_symbol.csv"
tickers = pd.read_csv(symbol_file)['ticker'].to_list()
timeframe = [
    # ['1m', '7d'], 
    ['5m', '60d'], 
    ['15m', '60d'], 
    ['1h', '60d'],
    # ['1d', '6mo']
]
for t in tickers:
    for tf in timeframe:
        resDF = get_data(t, tf[0], tf[1])
        #print(resDF["pointpos"])
        if resDF is None:
            continue
        res = formed_tri_signal(resDF.copy(), 100)
        if not res:
            continue

        df = res["frame"]
        if df.shape[0]:
            breakoutDF = df[df['breakout'] != 0]
            breakout_max = abs(breakoutDF.tail(10)['breakout'].max())
            breakout_min = abs(breakoutDF.tail(10)['breakout'].min())
            breakout = "Bullish" if breakout_max > breakout_min else "Bearish"
            print(f"{breakout} breakout detected for {t} on {tf[0]} timeframe")
            plot_chart(df, 50, res["xxmin"], res["xxmax"], res["slmin"], res["slmax"], res["intercmin"], res["intercmax"]) 
    




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

min points: [389. 373. 334.], max points: [388. 349. 340.]
0.49604946710029674 0.4389181063838085 298.4966060792541 327.35789403021795
Bearish breakout detected for UNH on 1h timeframe




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

min points: [356. 344. 317.], max points: [403. 360. 347. 334.]
0.672391963184328 0.3398401253285461 -45.81874417183096 80.40521246757649
Bearish breakout detected for TSLA on 1h timeframe




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

min points: [410. 402. 395. 389.], max points: [399. 387. 358. 334.]
-0.17853639183974848 -0.08522955811687274 255.12101866559283 220.54982065606924
Bearish breakout detected for UPS on 1h timeframe




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

min points: [400. 387. 373.], max points: [406. 396. 375.]
0.0291225426375757 0.011644528486757243 68.81428300484443 76.40146200126719
Bearish breakout detected for SCHW on 1h timeframe




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy





A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

min points: [1509. 1497. 1481. 1471.], max points: [1487. 1477. 1469. 1462.]
-0.17015256780855245 -0.05457858723019557 419.05225036119043 251.23019109944602
Bearish breakout detected for CRM on 15m timeframe




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

min points: [1538. 1531. 1497.], max points: [1544. 1533. 1523. 1516.]
0.054968788321449455 -0.05408091181596833 215.21416229649873 384.55846569249445
Bearish breakout detected for CI on 15m timeframe




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

min points: [1538. 1530. 1522.], max points: [1543. 1524. 1515. 1490.]
0.029062271118164062 -0.015825300465915685 35.16305923461914 104.35030549690845
Bearish breakout detected for TJX on 15m timeframe




A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

KeyboardInterrupt: 