In [13]:
import yfinance as yf
import numpy as np
from collections import deque
import pandas as pd
from datetime import datetime

In [14]:
ticker = "MSFT"  # 005930.KS
df = yf.download(ticker, start="2010-01-01")

df = df.reset_index()
df['Date'] = df['Date'].dt.strftime('%Y-%m-%d')

date_array = df['Date'].values
open_array = df['Open'].values
high_array = df['High'].values
low_array = df['Low'].values
close_array = df['Close'].values

df

[*********************100%%**********************]  1 of 1 completed


Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2010-01-04,30.620001,31.100000,30.590000,30.950001,23.474915,38409100
1,2010-01-05,30.850000,31.100000,30.639999,30.959999,23.482504,49749600
2,2010-01-06,30.879999,31.080000,30.520000,30.770000,23.338396,58182400
3,2010-01-07,30.629999,30.700001,30.190001,30.450001,23.095680,50559700
4,2010-01-08,30.280001,30.879999,30.240000,30.660000,23.254957,51197400
...,...,...,...,...,...,...,...
3545,2024-02-05,409.899994,411.160004,403.989990,405.649994,405.649994,25352300
3546,2024-02-06,405.880005,407.970001,402.910004,405.489990,405.489990,18382600
3547,2024-02-07,407.440002,414.299988,407.399994,414.049988,414.049988,22340500
3548,2024-02-08,414.049988,415.559998,412.529999,414.109985,414.109985,21225300


In [15]:
from collections import deque

class Zigzag:
    def __init__(self, pivot_range, fluctuation_rate):
        self.pivot_range = pivot_range

        self.fluctuation_rate = fluctuation_rate

        self.buffer_size = pivot_range * 2 + 1

        self.opens = deque(maxlen = self.buffer_size)
        self.highs = deque(maxlen = self.buffer_size)
        self.lows = deque(maxlen = self.buffer_size)
        self.closes = deque(maxlen = self.buffer_size)
        self.dates = deque(maxlen = self.buffer_size)

        self.last_pivot = [None, None, None, None, None] # (date, is high pivot?, pivot price, is this pivot confirmed to zigzag pivot?, is pivot on bullish candle?)

    def update_zigzag_pivot(self, open, high, low, close, date):
        self.zigzag_pivot = []

        self.opens.append(open)
        self.highs.append(high)
        self.lows.append(low)
        self.closes.append(close)
        self.dates.append(date)

        if len(self.opens) == self.buffer_size:
            pivot_list = []
            pivot_high = self.is_pivot(self.highs, self.pivot_range, True)
            pivot_low = self.is_pivot(self.lows, self.pivot_range, False)

            bullish = self.opens[self.pivot_range] < self.closes[self.pivot_range]

            if pivot_high is not None:
                pivot_list.append([self.dates[self.pivot_range], True, pivot_high, False, bullish])
            if pivot_low is not None:
                pivot_list.append([self.dates[self.pivot_range], False, pivot_low, False, bullish])

            if len(pivot_list) == 2 and bullish:
                pivot_list = [pivot_list[1], pivot_list[0]]

            self.update_last_pivot(pivot_list, high, low, date)

            if self.last_pivot != [None, None, None, None, None]:
                self.last_pivot_to_zigzag(True)

        return self.zigzag_pivot


    def is_pivot(self, array, candle, is_high):
        if is_high and candle == np.argmax(array):
            return array[candle]

        elif is_high == False and candle == np.argmin(array):
            return array[candle]

        return None

    def is_valid_reversals(self, pivot_price, last_pivot_price, last_pivot_type):
        change_percentage = (pivot_price - last_pivot_price) / last_pivot_price * 100
        is_valid_reversals = (not last_pivot_type and change_percentage >= self.fluctuation_rate) or (last_pivot_type and change_percentage <= -self.fluctuation_rate)
        return is_valid_reversals


    def update_last_pivot(self, pivot_list, high, low, date):
        for pivot in pivot_list:
            if self.last_pivot == [None, None, None, None, None]:
                self.last_pivot = pivot
                self.last_pivot_to_zigzag(False)

            else:
                if self.last_pivot[1] == pivot[1]:
                    if (self.last_pivot[1] and self.last_pivot[2] < pivot[2]) or (not self.last_pivot[1] and self.last_pivot[2] > pivot[2]):
                        self.last_pivot = pivot
                        self.last_pivot_to_zigzag(False)

                else:
                    reversals = self.is_valid_reversals(pivot[2], self.last_pivot[2], self.last_pivot[1])
                    if reversals:
                        self.last_pivot = pivot
                        self.last_pivot_to_zigzag(False)


    def last_pivot_to_zigzag(self, one_candle):
        if self.last_pivot[3] == True:
            return

        if one_candle:
            start_idx = self.buffer_size - self.pivot_range - 1
            end_idx = self.buffer_size
            self.is_zigzag_pivot(start_idx, end_idx)

        else:
            start = 0 if (self.last_pivot[4] and self.last_pivot[1] == False) or (self.last_pivot[4] == False and self.last_pivot[1]) else 1
            for i in range(start,self.pivot_range):
                if self.last_pivot[3] == False:
                    start_idx = i
                    end_idx = i + self.pivot_range + 1
                    self.is_zigzag_pivot(start_idx, end_idx)



    def is_zigzag_pivot(self, start_idx, end_idx):
        if self.last_pivot[1]:
            lows_list = list(self.lows)
            pivot = self.is_pivot(lows_list[start_idx:end_idx], self.pivot_range, False)

        else:
            highs_list = list(self.highs)
            pivot = self.is_pivot(highs_list[start_idx:end_idx], self.pivot_range, True)

        if pivot is not None:
            reversals = self.is_valid_reversals(pivot, self.last_pivot[2], self.last_pivot[1])
            if reversals:
                self.zigzag_pivot.append([self.last_pivot[0], self.last_pivot[2], self.dates[-1]]) # zigzag pivot date, zigzag pivot price, zigzag pivot confirmed date
                self.last_pivot[3] = True

In [16]:
zigzag_array = []
range5_fluctuation5 = Zigzag(3, 3)
for i in range(len(date_array)):
    date = date_array[i]
    open = open_array[i]
    high = high_array[i]
    low = low_array[i]
    close = close_array[i]

    zigzag_pivot = range5_fluctuation5.update_zigzag_pivot(open, high, low, close, date)
    if zigzag_pivot:
        zigzag_array.extend(zigzag_pivot)


zigzag_array

[['2010-01-12', 29.90999984741211, '2010-01-15'],
 ['2010-01-15', 31.239999771118164, '2010-01-21'],
 ['2010-01-22', 28.84000015258789, '2010-01-28'],
 ['2010-01-29', 29.920000076293945, '2010-02-03'],
 ['2010-02-05', 27.56999969482422, '2010-02-11'],
 ['2010-02-18', 29.030000686645508, '2010-02-23'],
 ['2010-02-25', 28.020000457763672, '2010-03-02'],
 ['2010-03-25', 30.56999969482422, '2010-03-30'],
 ['2010-04-01', 28.6200008392334, '2010-04-08'],
 ['2010-04-23', 31.579999923706055, '2010-04-28'],
 ['2010-05-07', 27.31999969482422, '2010-05-13'],
 ['2010-05-13', 29.729999542236328, '2010-05-18'],
 ['2010-05-26', 24.559999465942383, '2010-06-02'],
 ['2010-06-03', 26.93000030517578, '2010-06-08'],
 ['2010-06-08', 24.649999618530273, '2010-06-11'],
 ['2010-06-21', 26.889999389648438, '2010-06-24'],
 ['2010-07-01', 22.729999542236328, '2010-07-07'],
 ['2010-07-29', 26.40999984741211, '2010-08-03'],
 ['2010-08-31', 23.31999969482422, '2010-09-03'],
 ['2010-09-17', 25.530000686645508, '2010

In [17]:
class HarmornicPattern(Zigzag):
    def __init__(self, pivot_range, fluctuation_rate, tolerance_percentage):
        super().__init__(pivot_range, fluctuation_rate)

        self.tolerance_percentage = tolerance_percentage

        self.ABCD_array = deque(maxlen = 4)
        self.XABCD_array = deque(maxlen = 5)

    def update_harmonic_pattern(self, open, high, low, close, date):
        self.harmornic_pattern = [] # pattern name, dates, prices, is bullish pattern?, confirmed date
        zigzag_pivot = self.update_zigzag_pivot(open, high, low, close, date)

        if zigzag_pivot:
            self.ABCD_array.extend(zigzag_pivot)
            self.XABCD_array.extend(zigzag_pivot)


        if len(self.ABCD_array) == 4 and self.ABCD_array[-1][-1] == date:
            A, B, C, D = self.ABCD_array
            bullish, pattern_name = self.is_ABCD_pattern(A[1], B[1], C[1], D[1])
            if pattern_name is not None:
                self.harmornic_pattern.append([pattern_name,
                                         [A[0], B[0], C[0], D[0]],
                                         [A[1], B[1], C[1], D[1]],
                                         bullish,
                                         D[2]])

        if len(self.XABCD_array) == 5 and self.XABCD_array[-1][-1] == date:
            X, A, B, C, D = self.XABCD_array
            bullish, pattern_name = self.is_XABCD_pattern(X[1], A[1], B[1], C[1], D[1], self.tolerance_percentage)

            if pattern_name is not None:
                self.harmornic_pattern.append([pattern_name,
                                         [X[0], A[0], B[0], C[0], D[0]],
                                         [X[1], A[1], B[1], C[1], D[1]],
                                         bullish,
                                         D[2]])

        return self.harmornic_pattern

    def is_ratio_valid(self, ratio, true_ratio):
        lower_bound = true_ratio * (1 - self.tolerance_percentage / 100)
        upper_bound = true_ratio * (1 + self.tolerance_percentage / 100)

        if lower_bound <= ratio <= upper_bound:
            return True
        else:
            return False

    def is_ABCD_pattern(self, A, B, C, D):

        if A - B != 0 and B - C !=0:
            ABC = abs((C - B)/(A - B))
            BCD = abs((D - C)/(B - C))

            if (self.is_ratio_valid(ABC, 0.618) or self.is_ratio_valid(ABC, 0.786)) and (self.is_ratio_valid(BCD,1.272) or self.is_ratio_valid(BCD,1.618)):
                if A < B:
                    return 0, 'ABCD'
                else:
                    return 1, 'ABCD'
        return None, None

    def is_XABCD_pattern(self, X, A, B, C, D, tolerance_percentage):

        XAB = abs((B - A)/(X - A))
        ABC = abs((C - B)/(A - B))
        BCD = abs((D - C)/(B - C))
        XAD = abs((D - A)/(X - A))
        if X - A != 0 and A - B !=0 and B - C != 0 and X - C != 0:
            if ((self.is_ratio_valid(XAB, 0.382) or self.is_ratio_valid(XAB, 0.5)) \
                    and (self.is_ratio_valid(ABC, 0.382) or self.is_ratio_valid(ABC, 0.886)) \
                    and (self.is_ratio_valid(BCD, 1.618) or self.is_ratio_valid(BCD, 2.618)) \
                    and (self.is_ratio_valid(XAD, 0.886))):
                return (1, 'Bat') if X < A else (0, 'Bat')

            if ((self.is_ratio_valid(XAB, 0.618)) \
                    and (self.is_ratio_valid(ABC, 0.382) or self.is_ratio_valid(ABC, 0.886)) \
                    and (self.is_ratio_valid(BCD, 1.272) or self.is_ratio_valid(BCD, 1.618)) \
                    and (self.is_ratio_valid(XAD, 0.786))):
                return (1, 'Gartley') if X < A else (0, 'Gartley')

            if ((self.is_ratio_valid(XAB, 0.382) or self.is_ratio_valid(XAB, 0.618)) \
                    and (self.is_ratio_valid(ABC, 0.382) or self.is_ratio_valid(ABC, 0.886)) \
                    and (self.is_ratio_valid(BCD, 2.242) or self.is_ratio_valid(BCD, 3.618)) \
                    and (self.is_ratio_valid(XAD, 1.618))):
                return (1, 'Crab') if X < A else (0, 'Crab')

            if ((self.is_ratio_valid(XAB, 0.786)) \
                    and (self.is_ratio_valid(ABC, 0.382) or self.is_ratio_valid(ABC, 0.886)) \
                    and (self.is_ratio_valid(BCD, 1.618) or self.is_ratio_valid(BCD, 2.242)) \
                    and (self.is_ratio_valid(XAD, 1.272) or self.is_ratio_valid(XAD, 1.618))):
                return (1, 'Butterfly') if X < A else (0, 'Butterfly')
        return None, None





In [18]:
harmornic_pattern_array = []
range5_fluctuation5_additional_range2 = HarmornicPattern(3, 3, 12)
for i in range(len(date_array)):
    date = date_array[i]
    open = open_array[i]
    high = high_array[i]
    low = low_array[i]
    close = close_array[i]

    harmornic_pattern = range5_fluctuation5_additional_range2.update_harmonic_pattern(open, high, low, close, date)

    if harmornic_pattern:
        harmornic_pattern_array.extend(harmornic_pattern)

result = pd.DataFrame(harmornic_pattern_array, columns = ['pattern_name', 'date', 'price', 'bullish', 'confirmed_date'])
result

Unnamed: 0,pattern_name,date,price,bullish,confirmed_date
0,ABCD,"[2010-02-25, 2010-03-25, 2010-04-01, 2010-04-23]","[28.020000457763672, 30.56999969482422, 28.620...",0,2010-04-28
1,ABCD,"[2010-08-31, 2010-09-17, 2010-10-04, 2010-10-18]","[23.31999969482422, 25.530000686645508, 23.780...",0,2010-10-21
2,ABCD,"[2011-09-20, 2011-09-22, 2011-09-28, 2011-10-04]","[27.5, 24.600000381469727, 26.3700008392334, 2...",1,2011-10-07
3,Bat,"[2012-02-14, 2012-03-16, 2012-03-23, 2012-03-2...","[29.850000381469727, 32.95000076293945, 31.719...",1,2012-04-16
4,ABCD,"[2012-06-21, 2012-06-28, 2012-07-05, 2012-07-12]","[31.139999389648438, 29.420000076293945, 30.78...",1,2012-07-17
5,ABCD,"[2013-09-05, 2013-09-19, 2013-09-24, 2013-10-02]","[30.950000762939453, 33.68000030517578, 32.150...",0,2013-10-08
6,ABCD,"[2013-09-24, 2013-10-02, 2013-10-08, 2013-10-15]","[32.150001525878906, 34.029998779296875, 32.79...",0,2013-10-23
7,ABCD,"[2013-12-10, 2013-12-18, 2013-12-27, 2014-01-14]","[38.900001525878906, 35.529998779296875, 37.61...",1,2014-01-17
8,ABCD,"[2014-01-14, 2014-01-16, 2014-01-23, 2014-01-24]","[34.630001068115234, 37.0, 35.52000045776367, ...",0,2014-02-05
9,ABCD,"[2014-04-02, 2014-04-14, 2014-04-28, 2014-05-07]","[41.65999984741211, 38.900001525878906, 41.290...",1,2014-05-12


In [19]:
result['pattern_name'].value_counts()

ABCD         36
Bat           3
Gartley       2
Butterfly     2
Name: pattern_name, dtype: int64

In [20]:
def back_test(stock, result, risk_reward_ratio, bull_only):

    result_copy = result.copy()
    result_copy['profit(%)'] = 0
    result_copy['holding_days'] = 0
    result_copy['buy_date'] = pd.NaT
    result_copy['sell_date'] = pd.NaT

    for i in range(len(result)):
        last_date = result.loc[i, 'date'][-1]
        confirmed_date = result.loc[i, 'confirmed_date']
        bullish = result.loc[i, 'bullish']

        if bull_only and not bullish: # if bull_only is True, consider only bullish patterns
            continue

        entry_price = stock.loc[stock['Date'] == confirmed_date, 'Close'].item()


        if bullish:
            stop_loss_price = stock.loc[stock['Date'] == last_date, 'Low'].item()
            risk_amount = abs(entry_price - stop_loss_price)
            profit_amount = risk_amount * risk_reward_ratio
            take_profit_price = entry_price + profit_amount
        else:
            stop_loss_price = stock.loc[stock['Date'] == last_date, 'High'].item()
            risk_amount = abs(entry_price - stop_loss_price)
            profit_amount = risk_amount * risk_reward_ratio
            take_profit_price = entry_price - profit_amount


        entry_idx = stock.index[stock['Date'] == confirmed_date].item()

        profit, sold_date = is_sold(stock, entry_idx+1, len(stock), entry_price, bullish, stop_loss_price, take_profit_price, risk_amount, profit_amount)
        if profit is not None and profit:
            result_copy.loc[i, 'profit(%)'] = profit
            result_copy.loc[i, 'buy_date'] = confirmed_date
            result_copy.loc[i, 'sell_date'] = sold_date
            delta = datetime.strptime(sold_date, '%Y-%m-%d') - datetime.strptime(confirmed_date, '%Y-%m-%d')
            result_copy.loc[i, 'holding_days'] = delta.days

    return result_copy



def is_sold(stock, start, end, entry_price, bullish, stop_loss_price, take_profit_price, risk_amount, profit_amount):
    for idx in range(start, end):
        low = stock.loc[idx, 'Low']
        high = stock.loc[idx, 'High']
        date = stock.loc[idx, 'Date']

        if (bullish and low <= stop_loss_price and high >= take_profit_price) or \
        (not bullish and high >= stop_loss_price and low <= take_profit_price):
            return None, None

        elif (bullish and low <= stop_loss_price) or (not bullish and high >= stop_loss_price):
            return round(-risk_amount / entry_price * 100, 2), date

        elif (bullish and high >= take_profit_price) or (not bullish and low <= take_profit_price):
            return round(profit_amount / entry_price * 100, 2), date

    return False, None

In [21]:
report = back_test(df, result, 3, True)
report

Unnamed: 0,pattern_name,date,price,bullish,confirmed_date,profit(%),holding_days,buy_date,sell_date
0,ABCD,"[2010-02-25, 2010-03-25, 2010-04-01, 2010-04-23]","[28.020000457763672, 30.56999969482422, 28.620...",0,2010-04-28,0.0,0,NaT,NaT
1,ABCD,"[2010-08-31, 2010-09-17, 2010-10-04, 2010-10-18]","[23.31999969482422, 25.530000686645508, 23.780...",0,2010-10-21,0.0,0,NaT,NaT
2,ABCD,"[2011-09-20, 2011-09-22, 2011-09-28, 2011-10-04]","[27.5, 24.600000381469727, 26.3700008392334, 2...",1,2011-10-07,22.74,146,2011-10-07,2012-03-01
3,Bat,"[2012-02-14, 2012-03-16, 2012-03-23, 2012-03-2...","[29.850000381469727, 32.95000076293945, 31.719...",1,2012-04-16,-2.73,22,2012-04-16,2012-05-08
4,ABCD,"[2012-06-21, 2012-06-28, 2012-07-05, 2012-07-12]","[31.139999389648438, 29.420000076293945, 30.78...",1,2012-07-17,-3.78,94,2012-07-17,2012-10-19
5,ABCD,"[2013-09-05, 2013-09-19, 2013-09-24, 2013-10-02]","[30.950000762939453, 33.68000030517578, 32.150...",0,2013-10-08,0.0,0,NaT,NaT
6,ABCD,"[2013-09-24, 2013-10-02, 2013-10-08, 2013-10-15]","[32.150001525878906, 34.029998779296875, 32.79...",0,2013-10-23,0.0,0,NaT,NaT
7,ABCD,"[2013-12-10, 2013-12-18, 2013-12-27, 2014-01-14]","[38.900001525878906, 35.529998779296875, 37.61...",1,2014-01-17,14.43,75,2014-01-17,2014-04-02
8,ABCD,"[2014-01-14, 2014-01-16, 2014-01-23, 2014-01-24]","[34.630001068115234, 37.0, 35.52000045776367, ...",0,2014-02-05,0.0,0,NaT,NaT
9,ABCD,"[2014-04-02, 2014-04-14, 2014-04-28, 2014-05-07]","[41.65999984741211, 38.900001525878906, 41.290...",1,2014-05-12,10.96,66,2014-05-12,2014-07-17


In [22]:
filtered_report=report.copy()
filtered_report.dropna(subset=['sell_date'], inplace=True)

filtered_report['buy_date'] = pd.to_datetime(filtered_report['buy_date'], format='%Y-%m-%d')
filtered_report['sell_date'] = pd.to_datetime(filtered_report['sell_date'], format='%Y-%m-%d')

filtered_report = filtered_report.sort_values(by='buy_date').reset_index(drop=True)

valid_indices = []
last_sell_date = None

for i in range(len(filtered_report)):
    buy_date = filtered_report.loc[i, 'buy_date']
    if last_sell_date is None or last_sell_date <= buy_date:
        valid_indices.append(i)
        last_sell_date = filtered_report.loc[i, 'sell_date']

filtered_report = filtered_report.loc[valid_indices]

filtered_report


Unnamed: 0,pattern_name,date,price,bullish,confirmed_date,profit(%),holding_days,buy_date,sell_date
0,ABCD,"[2011-09-20, 2011-09-22, 2011-09-28, 2011-10-04]","[27.5, 24.600000381469727, 26.3700008392334, 2...",1,2011-10-07,22.74,146,2011-10-07,2012-03-01
1,Bat,"[2012-02-14, 2012-03-16, 2012-03-23, 2012-03-2...","[29.850000381469727, 32.95000076293945, 31.719...",1,2012-04-16,-2.73,22,2012-04-16,2012-05-08
2,ABCD,"[2012-06-21, 2012-06-28, 2012-07-05, 2012-07-12]","[31.139999389648438, 29.420000076293945, 30.78...",1,2012-07-17,-3.78,94,2012-07-17,2012-10-19
3,ABCD,"[2013-12-10, 2013-12-18, 2013-12-27, 2014-01-14]","[38.900001525878906, 35.529998779296875, 37.61...",1,2014-01-17,14.43,75,2014-01-17,2014-04-02
4,ABCD,"[2014-04-02, 2014-04-14, 2014-04-28, 2014-05-07]","[41.65999984741211, 38.900001525878906, 41.290...",1,2014-05-12,10.96,66,2014-05-12,2014-07-17
5,ABCD,"[2014-12-23, 2015-01-07, 2015-01-13, 2015-01-16]","[48.79999923706055, 45.4900016784668, 47.90999...",1,2015-01-22,-4.16,5,2015-01-22,2015-01-27
6,ABCD,"[2015-02-24, 2015-03-13, 2015-03-24, 2015-04-02]","[44.29999923706055, 40.61000061035156, 43.1699...",1,2015-04-08,9.42,16,2015-04-08,2015-04-24
7,ABCD,"[2015-05-15, 2015-05-26, 2015-05-28, 2015-06-09]","[48.90999984741211, 46.189998626708984, 48.020...",1,2015-06-12,-1.11,3,2015-06-12,2015-06-15
8,ABCD,"[2015-05-28, 2015-06-09, 2015-06-11, 2015-06-15]","[48.02000045776367, 45.459999084472656, 46.919...",1,2015-06-18,-3.64,11,2015-06-18,2015-06-29
9,ABCD,"[2016-05-31, 2016-06-13, 2016-06-23, 2016-06-27]","[53.0, 49.060001373291016, 52.060001373291016,...",1,2016-06-30,18.35,116,2016-06-30,2016-10-24


In [23]:
start_price = df['Close'][0]
end_price = df['Close'][len(df)-1]

buyhold_profit = ((end_price - start_price) / start_price) * 100
print(f'buy&hold return: {round(buyhold_profit,2)}%')

start_day = df['Date'][0]
end_day = df['Date'][len(df)-1]
delta = datetime.strptime(end_day, '%Y-%m-%d') - datetime.strptime(start_day, '%Y-%m-%d')

total_return = 1
for r in filtered_report['profit(%)']:
    total_return *= (1 + r / 100)



total_return_percentage = (total_return - 1) * 100
print(f'strategy return: {round(total_return_percentage, 2)}%')


total_days= filtered_report['holding_days'].sum()
print(f'buy&hold holding period: {delta.days} days')
print(f'strategy holding period: {total_days} days')


buy_hold_daily_return = (((1 + buyhold_profit / 100) ** (1 / delta.days)) - 1) * 100
strategy_daily_return = (((1 + total_return_percentage / 100) ** (1 / total_days)) - 1) * 100

print(f'buy&hold daily return: {round(buy_hold_daily_return, 4)}%')
print(f'strategy daily return: {round(strategy_daily_return, 4)}%')

buy&hold return: 1258.8%
strategy return: 225.89%
buy&hold holding period: 5149 days
strategy holding period: 1312 days
buy&hold daily return: 0.0507%
strategy daily return: 0.0901%
