## Load data

In [6]:
import numpy as np
import mplfinance as mpf
import pandas as pd
df = pd.read_csv('AUD_NZD_H4_2005_202206.csv')

## Find pivots

In [7]:
def pivotid(data,l,n1,n2):#n1 and n2 are the number of candlesticks on the left and right of l respectively
    if l-n1 <0 or l+n2 >= len(data):
        return 0
    
    pivotlow = 1 #true
    pivothigh = 1 #true
    
    for i in range(l-n1,l+n2+1):
        if (data.low[l]>data.low[i]):
            pivotlow = 0 #false
        if (data.high[l]<data.high[i]):
            pivothigh = 0 #false

    if pivotlow:
        return 1
    elif pivothigh:
        return 2
    else:
        return 0

df['pivot'] = df.apply(lambda x: pivotid(df,x.name,3,3),axis = 1)

In [8]:
def pivot_y(x): #pivot coordinate 
    if x['pivot'] == 1:
        return x['low']- 1e-3
    elif x['pivot'] == 2:
        return x['high'] + 1e-3
    else:
        return np.nan

df['pivot position'] = df.apply(lambda row: pivot_y(row),axis = 1)


## Visualization

In [15]:
dfpl = df[0:500]
import plotly.graph_objects as go
from datetime import datetime

fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                open=dfpl['open'],
                high=dfpl['high'],
                low=dfpl['low'],
                close=dfpl['close'])])

fig.add_scatter(x=dfpl.index,y=dfpl['pivot position'],
                mode = "markers",
                marker = dict(size=5,color = "mediumpurple"),
                name="pivot")
fig.show()

## Rectangle detection

In [None]:
from scipy.stats import linregress
rect = np.array([0]*len(df))

for idx in range(0,len(df),2):
    ymax = np.array([])
    ymin = np.array([])
    xmin = np.array([])
    xmax = np.array([])
    for i in range(idx-15,idx+1):
        if (df.iloc[i].pivot == 1):
            ymin = np.append(ymin,df.iloc[i].low)
            xmin = np.append(xmin,i)
        if (df.iloc[i].pivot == 2):
            ymax = np.append(ymax,df.iloc[i].high)
            xmax = np.append(xmax,i)
    
    if (xmax.size <3 and xmin.size <3) or xmin.size ==0 or xmax.size ==0: #trendline contain at least two peaks or bottoms
        continue
        
    min_slope,intermin=np.polyfit(xmin,ymin,1)
    max_slope,intermax=np.polyfit(xmax,ymax,1)

    if abs(min_slope) <= 0.001 and abs(max_slope) <=0.001:
        rect[i-15:i+1] = 1

df["rectangle_pattern"]= rect

In [11]:
df.loc[df['rectangle_pattern']==1]

Unnamed: 0,datetime,open,high,low,close,pivot,pivot position,rectangle_pattern
3,2005-01-03T02:00:00.000000000Z,1.08905,1.09115,1.08365,1.08725,1,1.08265,1
4,2005-01-03T06:00:00.000000000Z,1.08735,1.08890,1.08440,1.08710,0,,1
5,2005-01-03T10:00:00.000000000Z,1.08700,1.08990,1.08590,1.08690,0,,1
6,2005-01-03T14:00:00.000000000Z,1.08680,1.09085,1.08520,1.08945,0,,1
7,2005-01-03T18:00:00.000000000Z,1.08945,1.09265,1.08945,1.09035,2,1.09365,1
...,...,...,...,...,...,...,...,...
28214,2022-06-05T21:00:00.000000000Z,1.10768,1.10902,1.10646,1.10708,0,,1
28215,2022-06-06T01:00:00.000000000Z,1.10710,1.10724,1.10612,1.10680,0,,1
28216,2022-06-06T05:00:00.000000000Z,1.10680,1.10727,1.10582,1.10640,0,,1
28217,2022-06-06T09:00:00.000000000Z,1.10634,1.10721,1.10566,1.10694,1,1.10466,1


In [17]:
candleid = 28218

xmin = np.array([])
ymin= np.array([])
xmax = np.array([])
ymax = np.array([])

for i in range(candleid-15,candleid+1):
    if (df.iloc[i].pivot == 1):
        ymin = np.append(ymin,df.iloc[i]['pivot position'])
        xmin = np.append(xmin,i)
    if (df.iloc[i].pivot == 2):
        ymax = np.append(ymax,df.iloc[i]['pivot position'])
        xmax = np.append(xmax,i)


min_slope,intermin=np.polyfit(xmin,ymin,1)
max_slope,intermax=np.polyfit(xmax,ymax,1)


dfpl = df[candleid-25:candleid+10]
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                open=dfpl['open'],
                high=dfpl['high'],
                low=dfpl['low'],
                close=dfpl['close'])])


fig.add_scatter(x=dfpl.index,y=dfpl['pivot position'],
                mode = "markers",
                marker = dict(size=5,color = "mediumpurple"),
                name="pivot")

fig.add_trace(go.Scatter(x=xmin, y=min_slope*xmin + intermin, mode='lines', name='min slope'))
fig.add_trace(go.Scatter(x=xmax, y=max_slope*xmax + intermax, mode='lines', name='max slope'))
fig.show()

## Symmetric triangle detection

In [None]:
from scipy.stats import linregress
tri = np.array([0]*len(df))

for idx in range(0,len(df),2):
    ymax = np.array([])
    ymin = np.array([])
    xmin = np.array([])
    xmax = np.array([])
    for i in range(idx-15,idx+1):
        if (df.iloc[i].pivot == 1):
            ymin = np.append(ymin,df.iloc[i].low)
            xmin = np.append(xmin,i)
        if (df.iloc[i].pivot == 2):
            ymax = np.append(ymax,df.iloc[i].high)
            xmax = np.append(xmax,i)
    
    if (xmax.size <3 and xmin.size <3) or xmin.size ==0 or xmax.size ==0: #trendline contain at least two peaks or bottoms
        continue
        
    min_slope,intermin=np.polyfit(xmin,ymin,1)
    max_slope,intermax=np.polyfit(xmax,ymax,1)

    if min_slope > 0 and max_slope <0:
        tri[i-15:i+1] = 1

df["triangle_pattern"]= tri

In [33]:
df.loc[df['triangle_pattern']==1]

Unnamed: 0,datetime,open,high,low,close,pivot,pivot position,rectangle_pattern,triangle_pattern
11,2005-01-04T10:00:00.000000000Z,1.09090,1.09170,1.08660,1.09120,0,,1,1
12,2005-01-04T14:00:00.000000000Z,1.09120,1.09160,1.08410,1.08810,1,1.08310,1,1
13,2005-01-04T18:00:00.000000000Z,1.08790,1.09110,1.08480,1.08900,0,,1,1
14,2005-01-04T22:00:00.000000000Z,1.08900,1.09070,1.08720,1.08970,0,,1,1
15,2005-01-05T02:00:00.000000000Z,1.08980,1.09550,1.08940,1.09235,2,1.09650,1,1
...,...,...,...,...,...,...,...,...,...
28178,2022-05-26T21:00:00.000000000Z,1.09562,1.09628,1.09355,1.09550,1,1.09255,1,1
28179,2022-05-27T01:00:00.000000000Z,1.09546,1.09670,1.09459,1.09536,0,,1,1
28180,2022-05-27T05:00:00.000000000Z,1.09536,1.09670,1.09450,1.09480,2,1.09770,1,1
28181,2022-05-27T09:00:00.000000000Z,1.09480,1.09576,1.09465,1.09522,0,,1,1


In [37]:
candleid = 28182

xmin = np.array([])
ymin= np.array([])
xmax = np.array([])
ymax = np.array([])

for i in range(candleid-25,candleid+1):
    if (df.iloc[i].pivot == 1):
        ymin = np.append(ymin,df.iloc[i]['pivot position'])
        xmin = np.append(xmin,i)
    if (df.iloc[i].pivot == 2):
        ymax = np.append(ymax,df.iloc[i]['pivot position'])
        xmax = np.append(xmax,i)

min_slope,intermin=np.polyfit(xmin,ymin,1)
max_slope,intermax=np.polyfit(xmax,ymax,1)


dfpl = df[candleid-30:candleid+20]
fig = go.Figure(data=[go.Candlestick(x=dfpl.index,
                open=dfpl['open'],
                high=dfpl['high'],
                low=dfpl['low'],
                close=dfpl['close'])])


fig.add_scatter(x=dfpl.index,y=dfpl['pivot position'],
                mode = "markers",
                marker = dict(size=5,color = "mediumpurple"),
                name="pivot")

fig.add_trace(go.Scatter(x=xmin, y=min_slope*xmin + intermin, mode='lines', name='min slope'))
fig.add_trace(go.Scatter(x=xmax, y=max_slope*xmax + intermax, mode='lines', name='max slope'))
fig.show()

In [6]:
df.to_csv("4h.csv")

## winning percetage

#### Rectangle

In [18]:
rect_start_pnt = []
rect_end_pnt = []

for i in range(0,len(df)-1):
    if df.loc[i,"rectangle_pattern"] == 0 and df.loc[i+1,"rectangle_pattern"] == 1:
        rect_start_pnt = np.append(rect_start_pnt,i+1)

    if df.loc[i,"rectangle_pattern"] == 1 and df.loc[i+1,"rectangle_pattern"] == 0:
        rect_end_pnt = np.append(rect_end_pnt,i)

In [21]:
rect_start_pnt = []
rect_end_pnt = []

for i in range(0,len(df)-1):
    if df.loc[i,"rectangle_pattern"] == 0 and df.loc[i+1,"rectangle_pattern"] == 1:
        rect_start_pnt = np.append(rect_start_pnt,i+1)

    if df.loc[i,"rectangle_pattern"] == 1 and df.loc[i+1,"rectangle_pattern"] == 0:
        rect_end_pnt = np.append(rect_end_pnt,i)

winning_rect_up= 0
total_up = 0
for i in range(0,len(rect_end_pnt)-1):
    x = list(range(int(rect_end_pnt[i]),int(rect_end_pnt[i])+17))
    y = list(df.loc[rect_end_pnt[i]:rect_end_pnt[i]+16,"high"])

    x1 = list(range(int(max(0,rect_start_pnt[i]-16)),int(rect_start_pnt[i])))
    y1 = list(df.loc[max(0,rect_start_pnt[i]-16):rect_start_pnt[i]-1,"high"])

    slope,intercept = np.polyfit(x,y,1)
    slope1,intercept1 = np.polyfit(x1,y1,1)

    if slope1>=0:
        total_up += 1

    if slope1>=0 and slope>=0:
        winning_rect_up +=1

print(winning_rect_up/total_up)


winning_rect_down= 0
total_down = 0
for i in range(0,len(rect_end_pnt)-1):
    x2 = list(range(int(rect_end_pnt[i]),int(rect_end_pnt[i])+17))
    y2 = list(df.loc[rect_end_pnt[i]:rect_end_pnt[i]+16,"high"])

    x1 = list(range(int(max(0,rect_start_pnt[i]-16)),int(rect_start_pnt[i])))
    y1 = list(df.loc[max(0,rect_start_pnt[i]-16):rect_start_pnt[i]-1,"high"])

    slope2,intercept2 = np.polyfit(x2,y2,1)
    slope1,intercept1 = np.polyfit(x1,y1,1)

    if slope1<0:
        total_down += 1

    if slope1<0 and slope2<0:
        winning_rect_down +=1

print(winning_rect_down/total_down)

0.5631067961165048
0.5043859649122807


#### Triangle

In [34]:
tri_start_pnt = []
tri_end_pnt = []

for i in range(0,len(df)-1):
    if df.loc[i,"triangle_pattern"] == 0 and df.loc[i+1,"triangle_pattern"] == 1:
        tri_start_pnt = np.append(tri_start_pnt,i+1)

    if df.loc[i,"triangle_pattern"] == 1 and df.loc[i+1,"triangle_pattern"] == 0:
        tri_end_pnt = np.append(tri_end_pnt,i)

In [35]:
winning_tri_up= 0
total_tri_up = 0
for i in range(0,len(tri_end_pnt)-1):
    x = list(range(int(tri_end_pnt[i]),int(tri_end_pnt[i])+17))
    y = list(df.loc[tri_end_pnt[i]:tri_end_pnt[i]+16,"high"])

    x1 = list(range(int(max(0,tri_start_pnt[i]-16)),int(tri_start_pnt[i])))
    y1 = list(df.loc[max(0,tri_start_pnt[i]-16):tri_start_pnt[i]-1,"high"])

    slope,intercept = np.polyfit(x,y,1)
    slope1,intercept1 = np.polyfit(x1,y1,1)

    if slope1>0:
        total_tri_up += 1

    if slope1>0 and slope>0:
        winning_tri_up +=1

print(winning_tri_up/total_tri_up)


winning_tri_updown= 0
total_tri_down = 0
for i in range(0,len(tri_end_pnt)-1):
    x2 = list(range(int(tri_end_pnt[i]),int(tri_end_pnt[i])+17))
    y2 = list(df.loc[tri_end_pnt[i]:tri_end_pnt[i]+16,"high"])

    x1 = list(range(int(max(0,tri_start_pnt[i]-16)),int(tri_start_pnt[i])))
    y1 = list(df.loc[max(0,tri_start_pnt[i]-16):tri_start_pnt[i]-1,"high"])

    slope2,intercept2 = np.polyfit(x2,y2,1)
    slope1,intercept1 = np.polyfit(x1,y1,1)

    if slope1<=0:
        total_tri_down += 1

    if slope1<=0 and slope2<=0:
        winning_tri_updown +=1

print(winning_tri_updown/total_tri_down)

0.5041322314049587
0.5590551181102362
