In [None]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from scipy import stats
import yfinance as yf
import datetime
from scipy.stats import linregress
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import Ridge
from sklearn.pipeline import make_pipeline

In [None]:
symbol = 'NVDA'
start_date = datetime.datetime.now() - datetime.timedelta(days=7)
end_date = datetime.datetime.now()
data = yf.download(symbol, start=start_date, end=end_date, interval='1m')
data.reset_index(inplace=True)
data

In [None]:
def isPivot(candle, window):
    if candle - window < 0 or candle + window >= len(data):
        return 0

    pivotHigh = 1
    pivotLow = 2

    for i in range(candle - window, candle + window + 1):
        if data.iloc[candle].Low > data.iloc[i].Low:
            pivotLow = 0
        if data.iloc[candle].High < data.iloc[i].High:
            pivotHigh = 0

    if pivotLow and pivotHigh:
        return 3
    elif pivotHigh:
        return pivotHigh
    elif pivotLow:
        return pivotLow
    else:
        return 0

In [None]:
window = 3
data['ispivot'] = data.apply(lambda x: isPivot(x.name, window), axis = 1)

In [None]:
 def pointspos(x):
        if x['ispivot'] == 2:
            return x['Low']-1e-3
        elif x['ispivot'] == 1:
            return x['High']+1e-3
        else:
            return np.nan
data['pointspos'] = data.apply(lambda row: pointspos(row), axis = 1)

In [None]:
dfpl = data[0:800]
fig = go.Figure(data=[go.Candlestick(x = dfpl.index,
                                    open = dfpl['Open'],
                                    high = dfpl['High'],
                                    low = dfpl['Low'],
                                    close = dfpl['Close'])])
fig.add_scatter(x=dfpl.index, y=dfpl['pointspos'], mode='markers',
               marker=dict(size=7, color='MediumPurple'),
               name='Pivot')

In [None]:
def collect_channel(candle, backcandles, window):
    localdf = data[candle-backcandles-window:candle-window]
    localdf['isPivot'] = localdf.apply(lambda x: isPivot(x.name,window), axis=1)
    highs = localdf[localdf['isPivot']==1].High.values
    idxhighs = localdf[localdf['isPivot']==1].High.index
    lows = localdf[localdf['isPivot']==2].Low.values
    idxlows = localdf[localdf['isPivot']==2].Low.index
    
    if len(lows)>=2 and len(highs)>=2:
        sl_lows, interc_lows, r_value_l, _, _ = stats.linregress(idxlows,lows)
        sl_highs, interc_highs, r_value_h, _, _ = stats.linregress(idxhighs,highs)
    
        return(sl_lows, interc_lows, sl_highs, interc_highs, r_value_l**2, r_value_h**2)
    else:
        return(0,0,0,0,0,0)

In [None]:
candle = 100
backcandle = 50
window = 3

fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                                    open=dfpl['Open'],
                                    high = dfpl['High'],
                                    low = dfpl['Low'],
                                    close = dfpl['Close'])])
fig.add_scatter(x=dfpl.index, y=dfpl['pointspos'], mode='markers',
               marker=dict(size=5, color='MediumPurple'),
               name='Pivot')
            
sl_lows, interc_lows, sl_highs, interc_highs, r_sq_l, r_sq_h = collect_channel(candle, backcandle, window)
print(r_sq_l, r_sq_h)
x = np.array(range(candle-backcandle-window, candle+1))
fig.add_trace(go.Scatter(x=x, y=sl_lows*x+interc_lows, mode='lines', name='Lower Slope'))
fig.add_trace(go.Scatter(x=x, y=sl_highs*x+interc_highs, mode='lines', name='Higher Slope'))
fig.show()

In [None]:
def isBreakout(candle, backcandle, window):
    if (candle-backcandle-window)<0:
        return 0
    sl_lows, interc_lows, sl_highs, interc_highs, r_sq_l, r_sq_h = collect_channel(candle, backcandle, window)
    prev_index = candle-1
    prev_high = data.iloc[candle-1].High
    prev_low = data.iloc[candle-1].Low
    prev_close = data.iloc[candle-1].Close
    
    current_idx = candle
    current_high = data.iloc[candle].High
    current_low = data.iloc[candle].Low
    current_close = data.iloc[candle].Close
    current_open = data.iloc[candle].Open
    
    if (prev_high > (sl_lows*prev_index+interc_lows) and
       prev_close < (sl_lows*prev_index+interc_lows) and
       current_open < (sl_lows*current_idx+interc_lows) and
       current_close < (sl_lows*prev_index+interc_lows)):
        return 1
    
    elif (prev_low < (sl_highs*prev_index+interc_highs) and 
          prev_close > (sl_highs*prev_index+interc_highs) and
          current_open > (sl_highs*current_idx+interc_highs) and
          current_close > (sl_highs*prev_index+interc_highs)):
        return 2
    else:
        return 0

In [None]:
def breakpointpos(x):
    if (x['isBreakout']==2):
        return x['Low']-3e-3
    elif (x['isBreakout']==1):
        return x['High']+3e-3
    else:
        return np.nan
candle = 75
backcandle = 40
window = 3
dfpl = data[candle-backcandle-window-5:candle+20]
dfpl['isBreakout'] = [isBreakout(candle, backcandle, window) for candle in dfpl.index]
dfpl['breakpointpos'] = dfpl.apply(lambda row: breakpointpos(row), axis = 1)

In [None]:
candle = 60
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                                    open=dfpl['Open'],
                                    high=dfpl['High'],
                                    low=dfpl['Low'],
                                    close=dfpl['Close'])])
fig.add_scatter(x=dfpl.index, y=dfpl['pointspos'], mode='markers',
               marker=dict(size=5, color='MediumPurple'), 
               name='Pivot')
fig.add_scatter(x=dfpl.index, y=dfpl['breakpointpos'], mode='markers', 
               marker=dict(size=8, color='Black'), marker_symbol='hexagram',
               name='Pivot')
sl_lows, interc_lows, sl_highs, interc_highs, r_sq_l, r_sq_h = collect_channel(candle, backcandle, window)
print(r_sq_l, r_sq_h)
x = np.array(range(candle-backcandle-window, candle+1))
fig.add_trace(go.Scatter(x=x, y=sl_lows*x + interc_lows, mode='lines', name='Lower Slope'))
fig.add_trace(go.Scatter(x=x, y=sl_highs*x + interc_highs, mode='lines', name='Higher Slope'))
fig.show()

In [None]:
data["isBreakOut"] = [isBreakout(candle, backcandle, window) for candle in data.index]
def SIGNAL():
    return data.isBreakOut

In [None]:
from backtesting import Strategy
from backtesting import Backtest
import backtesting

class BreakOut(Strategy):
    initsize = 0.1
    mysize = initsize
    def init(self):
        super().init()
        self.signal1 = self.I(SIGNAL)

    def next(self):
        super().next()
        TPSLRatio = 1.2

        if self.signal1==2 and len(self.trades)==0:   
            sl1 = self.data.Low[-2]
            tp1 = self.data.Close[-1] + abs(self.data.Close[-1]-sl1)*TPSLRatio
            tp2 = self.data.Close[-1] + abs(self.data.Close[-1]-sl1)*TPSLRatio/3
            self.buy(sl=sl1, tp=tp1, size=self.mysize)
            self.buy(sl=sl1, tp=tp2, size=self.mysize)
        
        elif self.signal1==1 and len(self.trades)==0:         
            sl1 = self.data.High[-2]
            tp1 = self.data.Close[-1] - abs(sl1-self.data.Close[-1])*TPSLRatio
            tp2 = self.data.Close[-1] - abs(sl1-self.data.Close[-1])*TPSLRatio/3
            self.sell(sl=sl1, tp=tp1, size=self.mysize)
            self.sell(sl=sl1, tp=tp2, size=self.mysize)

bt = Backtest(data, BreakOut, cash=1000, margin=1/50, commission=.000)
stat = bt.run()
stat

In [None]:
bt.plot()

In [None]:
df =data.reset_index()