In [25]:
import numpy as np
import pandas as pd
from collections import Counter
from datetime import datetime

In [26]:
class CandleStick:
    def __init__(self,date_time: str, open: float, high: float, low: float, close: float, volume: int=None):
        self.open = open
        self.high = high
        self.low = low
        self.close = close
        self.date_time = date_time
        self.volume = volume
        self._validate_candle()

    def __repr__(self):
        return f"Candle({self.date_time}, {self.open}, {self.high}, {self.low}, {self.close}, {self.volume})"

    def __str__(self):
        return f"DT: {self.date_time}, O: {self.open}, H: {self.high}, L: {self.low}, C: {self.close}, V: {self.volume}"

    @property
    def date_time(self) -> str:
        """
        forwards the date and time of the candlestick
        :return: str: date and time
        """
        return self._date_time

    @property
    def open(self) -> float:
        """
        forwards the open price of the candlestick
        :return: float: open price
        """
        return self._open

    @property
    def high(self) -> float:
        """
        forwards the high of the candlestick
        :return: float: high
        """
        return self._high

    @property
    def low(self) -> float:
        """
        forwards the low of the candlestick
        :return: float: low
        """
        return self._low

    @property
    def close(self) -> float:
        """
        forwards the close price of the candlestick
        :return: float: close price
        """
        return self._close

    @property
    def volume(self) -> int:
        """
        forwards the open Volume of the candlestick
        :return: int: Volume
        """
        return self._volume

    @date_time.setter
    def date_time(self, value: str) -> None:
        """
        method to validate the date_time
        :param value: str: date_time
        :return: None
        """
        if not isinstance(value, (str, datetime)):
            raise TypeError("date_time must be str or datetime not {}".format(type(value)))
        self._date_time = value
        return None

    @open.setter
    def open(self, value: float) -> None:
        """
        method to validate the open price
        :param value: float: open price
        :return: None
        """
        if not isinstance(value, (int, float)):
            raise TypeError("open must be int or float not {}".format(type(value)))
        if value < 0:
            raise ValueError("open must be positive")
        self._open = value
        return None

    @high.setter
    def high(self, value: float) -> None:
        """
        method to validate the high
        :param value: float: high
        :return: None
        """
        if not isinstance(value, (int, float)):
            raise TypeError("high must be int or float not {}".format(type(value)))
        if value < 0:
            raise ValueError("high must be positive")
        self._high = value
        return None

    @low.setter
    def low(self, value: float) -> None:
        """
        method to validate the low
        :param value: float: low
        :return: None
        """
        if not isinstance(value, (int, float)):
            raise TypeError("low must be int or float not {}".format(type(value)))
        if value < 0:
            raise ValueError("Low must be positive")
        self._low = value
        return None

    @close.setter
    def close(self, value: float) -> None:
        """
        method to validate the close price
        :param value: float: close price
        :return: None
        """
        if not isinstance(value, (int, float)):
            raise TypeError("close must be int or float not {}".format(type(value)))
        if value < 0:
            raise ValueError("close must be positive")
        self._close = value
        return None

    @volume.setter
    def volume(self, value: int) -> None:
        """
        method to validate the volume
        :param value: int: volume
        :return: None
        """
        if not isinstance(value, (int, float, type(None))):
            raise TypeError("volume must be int, float or NoneType not {}".format(type(value)))
        if value is not None:
            if value < 0:
                raise ValueError("volume must be positive")
        self._volume = value
        return None

    def _validate_candle(self) -> None:
        """
        method to validate the candlestick
        :return: None
        """
        if self.open > self.high:
            raise ValueError("open cannot be greater than high")
        if self.open < self.low:
            raise ValueError("open cannot be less than low")
        if self.close > self.high:
            raise ValueError("close cannot be greater than high")
        if self.close < self.low:
            raise ValueError("close cannot be less than low")

    def type(self) -> str:
        """
        method to determine the type of the candlestick
        :return: str: type of the candlestick
        """
        if self.open < self.close:
            return 'bullish'
        elif self.open > self.close:
            return 'bearish'
        else:
            return 'doji'

    def cs_size(self) -> float:
        """
        method to determine the size of the candlestick
        :return: float: size of the candlestick
        """
        return abs(self.high - self.low)

    def upper_shadow_size(self) -> float:
        """
        method to determine the size of the upper shadow
        :return: float: size of the upper shadow
        """
        return self.high - max(self.open, self.close)

    def lower_shadow_size(self) -> float:
        """
        method to determine the size of the lower shadow
        :return: float: size of the lower shadow
        """
        return min(self.open, self.close) - self.low

    def body_size(self) -> float:
        """
        method to determine the size of the body
        :return: float: size of the body
        """
        return abs(self.close - self.open)

    def is_bullish(self) -> bool:
        """
        method to determine if the candlestick is bullish
        :return: bool: True if bullish, False otherwise
        """
        return self.type() == 'bullish'

    def is_bearish(self) -> bool:
        """
        method to determine if the candlestick is bearish
        :return: bool: True if bearish, False otherwise
        """
        return self.type() == 'bearish'

    def is_doji(self) -> bool:
        """
        method to determine if the candlestick is a doji
        :return: bool: True if doji, False otherwise
        """
        return self.type() == 'doji'

    def cs_body_ratio(self) -> float:
        """
        method to determine the ratio of the body to the candlestick
        :return: float: ratio of the body to the candlestick, range [0, 1]
        """
        return self.body_size() / self.cs_size() if self.cs_size() > 0 else 0

    def body_upper_shadow_ratio(self) -> float:
        """
        method to determine the ratio of the upper shadow to the body
        can be between 0 and infinity if the body is 0 (doji)
        shows how big the upper shadow is compared to the body size
        :return: float: ratio of the upper shadow to the body, range [0, ∞]
        """
        return self.upper_shadow_size() / self.body_size() if self.body_size() > 0 else np.inf

    def body_lower_shadow_ratio(self) -> float:
        """
        method to determine the ratio of the lower shadow to the body
        can be between 0 and infinity if the body is 0 (doji)
        shows how big the lower shadow is compared to the body size
        :return: float: ratio of the lower shadow to the body, range [0, ∞)
        """
        return self.lower_shadow_size() / self.body_size() if self.body_size() > 0 else np.inf

    def body_position(self) -> float:
        """
        method to determine the position of the body
        can be between -1 and 1
        -1 = body totally at the bottom of cs, 0 = middle, 1 = body totally at the top of cs
        :return: float: position of the body, range [-1, 1]
        """
        upper_shadow = self.high - max(self.open, self.close)
        lower_shadow = min(self.open, self.close) - self.low
        shadows = upper_shadow + lower_shadow
        return  (2 * lower_shadow) / shadows -1 if shadows != 0 else 0 # -1 = lower shadow, 0 = middle, 1 = upper shadow

In [57]:
class CandleStickFrame:
    def __init__(self,date_time:list, open: list, high: list, low: list, close: list, volume: list = None):
        date_time, open, high, low, close, volume = self._validate_input(date_time, open, high, low, close, volume)
        self.candle_sticks = [CandleStick(dt, o, h, l, c, v) for dt, o, h, l, c, v in zip(date_time, open, high, low, close, volume)]
        self.df = pd.DataFrame({"open": open, "high": high, "low": low, "close": close})
        self._bullish_count, self._bearish_count, self._doji_count = self._type_count()

    def __repr__(self):
        return f"CandleFrame({self.df})"

    def __str__(self):
        return f"{self.df}"

    def __len__(self):
        return len(self.candle_sticks)

    def __getitem__(self, index):
        return self.candle_sticks[index]

    def __iter__(self):
        return iter(self.candle_sticks)

    def __reversed__(self):
        return reversed(self.candle_sticks)

    @staticmethod
    def _validate_input(date_time: str, open: float, high: float, low: float, close: float, volume: int) -> tuple:
        """
        method to validate the input
        :param date_time: list, np.ndarray, or pd.core.series.Series of date_time of the candlesticks
        :param open: list, np.ndarray, or pd.core.series.Series of open of the candlesticks
        :param high: list, np.ndarray, or pd.core.series.Series of high of the candlesticks
        :param low: list, np.ndarray, or pd.core.series.Series of low of the candlesticks
        :param close: list, np.ndarray, or pd.core.series.Series of close of the candlesticks
        :param volume: list, np.ndarray, or pd.core.series.Series of volume of the candlesticks or None
        :return: touple: date_time, open, high, low, close, volume
        """
        if not isinstance(date_time, (list, np.ndarray, pd.core.series.Series)):
            raise TypeError("date_time must be list, np.ndarry, or pd.core.series.Series not {}".format(type(date_time)))
        if not isinstance(open, (list, np.ndarray, pd.core.series.Series)):
            raise TypeError("open must be list, np.ndarry, or pd.core.series.Series not {}".format(type(open)))
        if not isinstance(high, (list, np.ndarray, pd.core.series.Series)):
            raise TypeError("high must be list, np.ndarry, or pd.core.series.Series not {}".format(type(high)))
        if not isinstance(low, (list, np.ndarray, pd.core.series.Series)):
            raise TypeError("low must be list, np.ndarry, or pd.core.series.Series not {}".format(type(low)))
        if not isinstance(close, (list, np.ndarray, pd.core.series.Series)):
            raise TypeError("close must be list, np.ndarry, or pd.core.series.Series not {}".format(type(close)))
        if not isinstance(volume, (list, np.ndarray, pd.core.series.Series, type(None))):
            raise TypeError("volume must be list, np.ndarry, pd.core.series.Series or NoneType not {}".format(type(volume)))
        date_time = list(date_time)
        open = list(open)
        high = list(high)
        low = list(low)
        close = list(close)
        volume = list(volume) if volume is not None else [None] * len(date_time)
        if len(date_time) != len(open) != len(high) or len(open) != len(low) or len(open) != len(close):
            raise ValueError("date_time, open, high, low, and close must be the same length")
        if not all(isinstance(x, (str, datetime)) for x in date_time):
            raise TypeError("date_time must be list of str or time.datetime not {}".format(type(date_time)))
        if not all(isinstance(x, (int, float)) for x in open):
            raise TypeError("open must be list of int or float not {}".format(type(open)))
        if not all(isinstance(x, (int, float)) for x in high):
            raise TypeError("high must be list of int or float not {}".format(type(high)))
        if not all(isinstance(x, (int, float)) for x in low):
            raise TypeError("low must be list of int or float not {}".format(type(low)))
        if not all(isinstance(x, (int, float)) for x in close):
            raise TypeError("close must be list of int or float not {}".format(type(close)))
        if not all(isinstance(x, (int, float, type(None))) for x in volume):
            raise TypeError("volume must be list of int, float or None not {}".format(type(volume)))
        return date_time, open, high, low, close, volume

    def _type_count(self) -> tuple:
        """
        method to count the number of bullish, bearish, and doji candlesticks
        :return: touple: bullish, bearish, doji count
        """
        bullish = 0
        bearish = 0
        doji = 0
        for candle in self.candle_sticks:
            if candle.type() == 'bullish':
                bullish += 1
            elif candle.type() == 'bearish':
                bearish += 1
            elif candle.type() == 'doji':
                doji += 1
        return bullish, bearish, doji

    def _bullish_ratio(self) -> float:
        """
        method to calculate the ratio of bullish candlesticks
        :return: float: bullish ratio range [0, 1]
        """
        return self._bullish_count / len(self.candle_sticks)

    def _bearish_ratio(self) -> float:
        """
        method to calculate the ratio of bearish candlesticks
        :return: float: bearish ratio range [0, 1]
        """
        return self._bearish_count / len(self.candle_sticks)

    def _doji_ratio(self) -> float:
        """
        method to calculate the ratio of doji candlesticks
        :return: float: doji ratio range [0, 1]
        """
        return self._doji_count / len(self.candle_sticks)

    def type_ratio(self) -> str:
        """
        method to return the ratio of bullish, bearish, and doji candlesticks
        :return: str: bullish, bearish, and doji ratio
        """
        return 'bullish: {:.2f}, bearish: {:.2f}, doji: {:.2f}'.format(self._bullish_ratio(), self._bearish_ratio(), self._doji_ratio())

In [58]:
class Parameter:
    candle_stick_pattern = dict(
        bullish=dict(
            hammer = dict(
                trend_window = 10,
                trend_strength = 0.25,
                cs_body_position = 0.25,
                body_ls_ratio = 0.25,
                body_us_ratio = 0.25,
            ),
            piercing = dict(
                trend_window = 10,
                trend_strength = 0.25,
            ),
            bullish_engulfing = dict(
                trend_window = 10,
                trend_strength = 0.25,
            ),
            morning_star = dict(
                trend_window = 10,
                trend_strength = 0.25,
                cs_body_ratio = 0.25,
                cs_m2_body_ratio = 0.25,
            ),
            three_white_soldiers = dict(
                trend_window = 10,
                trend_strength = 0.25,
                cs_body_ratio = 0.25,
                cs_m2_body_ratio = 0.25,
                cs_m3_body_ratio = 0.25,
            ),
            bullish_marubozu = dict(
                trend_window = 10,
                trend_strength = 0.25,
                cs_body_ratio = 0.25,
            ),
        ),
        bearish=dict(
            hanging_man = dict(),
            dark_cloud_cover = dict(),
            bearish_engulfing = dict(),
            evening_star = dict(),
            three_black_crows = dict(),
            bearish_marubozu = dict(),
        ),
    )

In [59]:
class CandleStickPattern:
    def __init__(self, candle_stick_frame: CandleStickFrame):
        self.candle_stick_frame = self._validate_csf(candle_stick_frame)

    def _validate_csf(self, candle_stick_frame: CandleStickFrame):
        """
        method to validate the candle stick frame
        :param candle_stick_frame: CandleStickFrame: candle stick frame to validate
        :return: CandleStickFrame: validated candle stick frame
        """
        if not isinstance(candle_stick_frame, CandleStickFrame):
            raise TypeError("candle_stick_frame must be CandleStickFrame not {}".format(type(candle_stick_frame)))
        if len(candle_stick_frame) < 1:
            raise ValueError("candle_stick_frame must have at least 1 candle stick")
        return candle_stick_frame

    def _prepare_window(self, index:int, window: int) -> CandleStickFrame:
        """
        method to prepare the window for the candle stick frame
        :param index: int: index of the candle stick
        :param window: int: window to look back
        :return: CandleStickFrame: window of candle sticks
        """
        if window < 1:
            raise ValueError("window must be greater than 0")
        if window > len(self.candle_stick_frame):
            raise ValueError("window must be less than or equal to the length of the candle stick frame")
        return self.candle_stick_frame[index-window:index]

    def z_score_cs(self, index: int, window: int=10) -> float:
        """
        method to calculate the z score of the candle stick at index
        cs size normal scaled in respect to the cs of the candle sticks in the window
        can be in range [-inf, inf] with a mean of 0 and standard deviation of 1
        :param index: int: index of the candle stick to calculate the z score of
        :param window: int: window to calculate the z score over
        :return: float: z score of the candle stick at index [-inf, inf]
        """
        cs_window = self._prepare_window(index, window)
        if len(cs_window) < window or index < 0:
            return None
        else:
            mean = np.mean([cs.cs_size() for cs in cs_window])
            std = np.std([cs.cs_size() for cs in cs_window])
            # cs size normal scaled in respect to the cs of the candle sticks in the window
            return (self.candle_stick_frame[index].cs_size() - mean) / std

    def z_score_body(self, index: int, window: int=11) -> float:
        """
        method to calculate the z score of the body of a candle stick at index
        body size normal scaled in respect to the bodies of the candle sticks in the window
        can be in range [-inf, inf] with a mean of 0 and standard deviation of 1
        :param index: int: index of the candle stick to calculate the z score of
        :param window: int: window to calculate the z score over
        :return: float or None: z score of the body of the candle stick at index [-inf, inf]
        """
        # the current cs should be included in the window -> += 1
        index += 1
        cs_window = self._prepare_window(index, window)
        if len(cs_window) < window or index < 0:
            return None
        else:
            mean = np.mean([cs.body_size() for cs in cs_window])
            std = np.std([cs.body_size() for cs in cs_window])
            # body size normal scaled in respect to the bodies of the candle sticks in the window
            return (self.candle_stick_frame[index].body_size() - mean) / std

    def min_max_cs(self, index: int, window: int=11) -> float:
        """
        method to calculate the min max of the candle stick at index
        cs size min max scaled in respect to the cs of the candle sticks in the window
        can be in range [0, 1]
        :param index: int: index of the candle stick to calculate the min max of
        :param window: int: window to calculate the min max over
        :return: float: min max of the candle stick at index [0, 1]
        """
        # the current cs should be included in the window -> += 1
        index += 1
        cs_window = self._prepare_window(index, window)
        if len(cs_window) < window or index < 0:
            return None
        else:
            min = np.min([cs.cs_size() for cs in cs_window])
            max = np.max([cs.cs_size() for cs in cs_window])
            # cs size min max scaled in respect to the cs of the candle sticks in the window
            return (self.candle_stick_frame[index].cs_size() - min) / (max - min)

    def min_max_body(self, index: int, window: int=11) -> float:
        """
        method to calculate the min max of the body of a candle stick at index
        body size min max scaled in respect to the bodies of the candle sticks in the window
        can be in range [0, 1]
        :param index: int: index of the candle stick to calculate the min max of
        :param window: int: window to calculate the min max over
        :return: float or None: min max of the body of the candle stick at index [0, 1]
        """
        # the current cs should be included in the window -> += 1
        index += 1
        cs_window = self._prepare_window(index, window)
        if len(cs_window) < window or index < 0:
            return None
        else:
            min = np.min([cs.body_size() for cs in cs_window])
            max = np.max([cs.body_size() for cs in cs_window])
            # body size min max scaled in respect to the bodies of the candle sticks in the window
            return (self.candle_stick_frame[index].body_size() - min) / (max - min)

    def trend(self, index: int, window:int=10) -> float:
        """
        method to calculate the trend of the candle stick at index
        can be in range [-1, 1] with -1 being down and 1 being up
        :param index: int: index of the candle stick to calculate the trend of
        :param window: int: window to calculate the trend over
        :return: float or None: trend of the candle stick at index [-1, 1]
        """
        cs_window = self._prepare_window(index, window)
        if len(cs_window) < window or index < 0:
            return None
        trend, sum = 0, 0
        for cs in cs_window:
            sum += cs.cs_size()
            if cs.type() == 'bullish':
                trend += cs.cs_size()
            elif cs.type() == 'bearish':
                trend -= cs.cs_size()
        # weighted average of the trend
        return trend / sum


    # Bullish Reversal Candlestick Patterns classes:
    # Hammer (1)
    class Hammer:
        def __init__(self, trend, cs, param:dict):
            self.param = param
            self.pattern_name = 'hammer'
            self.pattern_type = 'bullish'
            self.trend_strength = trend
            self.pattern = [cs]
            self._cs = cs
            self.is_pattern = self._is_hammer()

        def __str__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def __repr__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def _is_hammer(self):
            if self.trend_strength is None:
                return None
            if self.trend_strength >= self.param['trend_strength']:
                if self._cs.type() == 'bullish' and self._cs.body_position() > self.param['cs_body_position'] and self._cs.body_lower_shadow_ratio() < self.param['body_ls_ratio'] and self._cs.body_upper_shadow_ratio() < self.param['body_us_ratio']:
                    return True
            return False

    def is_hammer(self, param:dict = None, is_boolean = False) -> list:
        if param is None:
            param = Parameter.candle_stick_pattern['bullish']['hammer']
        result = []
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-1, param['trend_window'])
            if is_boolean:
                result.append(self.Hammer(trend, self.candle_stick_frame[i], param).is_pattern())
            else:
                result.append(self.Hammer(trend, self.candle_stick_frame[i], param))
        return result

    # Bullish Piercing (2)
    class Piercing:
        def __init__(self, trend, cs, cs_m1, param:dict):
            self.param = param
            self.pattern_name = 'piercing'
            self.pattern_type = 'bullish'
            self.trend_strength = trend
            self.pattern = [cs_m1, cs]
            self._cs = cs
            self._cs_m1 = cs_m1
            self.is_pattern = self._is_piercing()

        def __str__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def __repr__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def _is_piercing(self):
            if self.trend_strength is None:
                return None
            if self.trend_strength >= self.param['trend_strength']:
                if self._cs.type() == 'bullish' and self._cs_m1.type() == 'bearish':
                    if self._cs.open < self._cs_m1.close and self._cs_m1.open - self._cs_m1.body_size() / 2 < self._cs.close < self._cs_m1.open:
                        return True
            return False

    def is_piercing(self, param:dict = None, is_boolean = False) -> list:
        if param is None:
            param = Parameter.candle_stick_pattern['bullish']['piercing']
        result = []
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-1, param['trend_window'])
            if is_boolean:
                result.append(self.Piercing(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1], param).is_pattern())
            else:
                result.append(self.Piercing(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1], param))
        return result

    # Bullish Engulfing (3)
    class BullishEngulfing:
        def __init__(self, trend, cs, cs_m1, param:dict):
            self.param = param
            self.pattern_name = 'bullish_engulfing'
            self.pattern_type = 'bullish'
            self.trend_strength = trend
            self.pattern = [cs_m1, cs]
            self._cs = cs
            self._cs_m1 = cs_m1
            self.is_pattern = self._is_bullish_engulfing()

        def __str__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def __repr__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def _is_bullish_engulfing(self):
            if self.trend_strength is None:
                return None
            if self.trend_strength >= self.param['trend_strength']:
                if self._cs.type() == 'bullish' and self._cs_m1.type() == 'bearish':
                    if self._cs.open < self._cs_m1.close and self._cs.close > self._cs_m1.open:
                        return True
            return False

    def is_bullish_engulfing(self, param:dict = None, is_boolean = False) -> list:
        if param is None:
            param = Parameter.candle_stick_pattern['bullish']['bullish_engulfing']
        result = []
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-1, param['trend_window'])
            if is_boolean:
                result.append(self.BullishEngulfing(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1], param).is_pattern())
            else:
                result.append(self.BullishEngulfing(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1], param))

    # Morning Star (4)
    class MorningStar:
        def __init__(self, trend, cs, cs_m1, cs_m2, param:dict):
            self.param = param
            self.pattern_name = 'morning_star'
            self.pattern_type = 'bullish'
            self.trend_strength = trend
            self.pattern = [cs_m2, cs_m1, cs]
            self._cs = cs
            self._cs_m1 = cs_m1
            self._cs_m2 = cs_m2
            self.is_pattern = self._is_morning_star()

        def __str__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def __repr__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def _is_morning_star(self):
            if self.trend_strength is None:
                return None
            if self.trend_strength >= self.param['trend_strength']: # adding relative_size
                if self._cs_m2.type() == 'bearish' and self._cs_m2.cs_body_ratio() >= self.param['cs_m2_body_ratio']: # adding relative_size
                    if self._cs.type() == 'bullish' and self._cs.cs_body_ratio() >= self.param['cs_body_ratio']: # adding relative_size
                        return True
            return False

    def is_morning_star(self, param:dict = None, is_boolean = False) -> list:
        if param is None:
            param = Parameter.candle_stick_pattern['bullish']['morning_star']
        result = []
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-2, param['trend_window'])

            if is_boolean:
                result.append(self.MorningStar(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1], self.candle_stick_frame[i-2], param).is_pattern())
            else:
                result.append(self.MorningStar(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1], self.candle_stick_frame[i-2], param))
        return result

    # Three White Soldiers (5)
    class ThreeWhiteSoldiers:
        def __init__(self, trend, cs, cs_m1, cs_m2, param:dict):
            self.param = param
            self.pattern_name = 'three_white_soldiers'
            self.pattern_type = 'bullish'
            self.trend_strength = trend
            self.pattern = [cs_m2, cs_m1, cs]
            self._cs = cs
            self._cs_m1 = cs_m1
            self._cs_m2 = cs_m2
            self.is_pattern = self._is_three_white_soldiers()

        def __str__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def __repr__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def _is_three_white_soldiers(self):
            if self.trend_strength is None:
                return None
            if self.trend_strength >= self.param['trend_strength']:
                if self._cs_m2.type() == 'bullish' and self._cs_m2.cs_body_ratio() >= self.param['cs_m2_body_ratio']: # adding relative_size
                    if self._cs_m1.type() == 'bullish' and self._cs_m1.cs_body_ratio() >= self.param['cs_m1_body_ratio']: # adding relative_size
                        if self._cs.type() == 'bullish' and self._cs.cs_body_ratio() >= self.param['cs_body_ratio']: # adding relative_size
                            return True
            return False

    def is_three_white_soldiers(self, param:dict = None, is_boolean = False) -> list:
        if param is None:
            param = Parameter.candle_stick_pattern['bullish']['three_white_soldiers']
        result = []
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-2, param['trend_window'])
            if is_boolean:
                result.append(self.ThreeWhiteSoldiers(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1], self.candle_stick_frame[i-2], param).is_pattern())
            else:
                result.append(self.ThreeWhiteSoldiers(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1], self.candle_stick_frame[i-2], param))
        return result

    # Bullish Marubozu (6)
    class BullishMarubozu:
        def __init__(self, trend, cs, param:dict):
            self.param = param
            self.pattern_name = 'bullish_marubozu'
            self.pattern_type = 'bullish'
            self.trend_strength = trend
            self.pattern = [cs]
            self._cs = cs
            self.is_pattern = self._is_bullish_marubozu()

        def __str__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def __repr__(self):
            return f'{self.pattern_name} -> ({self.is_pattern})'

        def _is_bullish_marubozu(self):
            if self.trend_strength is None:
                return None
            if self.trend_strength >= self.param['trend_strength']:
                if self._cs.type() == 'bullish' and self._cs.cs_body_ratio() >= self.param['cs_body_ratio']: # adding relative_size
                    return True
            return False

    def is_bullish_marubozu(self, param:dict = None, is_boolean = False) -> list:
        if param is None:
            param = Parameter.candle_stick_pattern['bullish']['bullish_marubozu']
        result = []
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i, param['trend_window'])
            if is_boolean:
                result.append(self.BullishMarubozu(trend, self.candle_stick_frame[i], param).is_pattern())
            else:
                result.append(self.BullishMarubozu(trend, self.candle_stick_frame[i], param))
        return result


    # Bearish Reversal Candlestick Patterns Classes:
    # Hanging Man (14)
    class HangingMan:
        pass



    def _is_hanging_man(self, trend, cs, cs_body_position=0.25, body_ls_ratio=0.35, body_us_ratio=1.5):
        if trend is None:
            return None
        if trend == 'up':
            if cs.type() == 'bearish' and cs.body_position() > cs_body_position \
            and cs.body_lower_shadow_ratio() < body_ls_ratio and cs.body_upper_shadow_ratio() < body_us_ratio:
                return True
        return False

    def is_hanging_man(self, trend_window=10, cs_body_position=0.25, body_ls_ratio=0.35, body_us_ratio=1.5):
        result = []
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-1, trend_window)
            result.append(self._is_hanging_man(trend, self.candle_stick_frame[i], cs_body_position, body_ls_ratio, body_us_ratio))
        return result

    # Dark Cloud (15)
    def _is_dark_cloud(self, trend, cs, cs_m1):
        if trend is None:
            return None
        if trend == 'up':
            if cs.type() == 'bearish' and cs_m1.type() == 'bullish':
                if cs.open > cs_m1.close and cs_m1.open + cs_m1.body_size() / 2 > cs.close > cs_m1.open:
                    return True
        return False

    def is_dark_cloud(self, trend_window=10):
        result = []
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-1, trend_window)
            result.append(self._is_dark_cloud(trend, self.candle_stick_frame[i], self.candle_stick_frame[i-1]))
        return result

    # Bearish Engulfing (16)
    def _is_bearish_engulfing(self, trend, cs, cs_m1):
        if trend is None:
            return None
        if trend == 'up':
            if cs.type() == 'bearish' and cs_m1.type() == 'bullish':
                if cs.open > cs_m1.close and cs.close < cs_m1.open:
                    return True
        return False

    def is_bearish_engulfing(self, trend_window=10):
        result = []
        for i in range(len(self.candle_stick_frame)):
            cs = self.candle_stick_frame[i]
            cs_m1 = self.candle_stick_frame[i-1]
            trend = self.trend(i-1, trend_window)
            result.append(self._is_bearish_engulfing(trend, cs, cs_m1))
        return result

    # Evening Star (17)
    def _is_evening_star(self, trend, cs_m2, cs_m1, cs, cs_m2_body_ratio=0.6, cs_body_ratio=0.6,
                         cs_m2_relative_size=0.6, cs_m1_relative_size=0.6, cs_relative_size=0.3):
        if trend is None:
            return None
        elif trend == 'up':
            if cs_m2.is_bullish() and cs_m2.cs_body_ratio() >= cs_m2_body_ratio and cs_m2.cs_size() >= cs_m2_relative_size * self._average_candle_stick_size:
                if cs_m1.cs_size() <= cs_m1_relative_size * self._average_candle_stick_size:
                    if cs.is_bearish() and cs.cs_body_ratio() >= cs_body_ratio and cs.cs_size() >= cs_relative_size * self._average_candle_stick_size:
                        return True
        return False

    def is_evening_star(self,trend_window=10, cs_m2_body_ratio=0.6, cs_body_ratio=0.6,
                        cs_m2_relative_size=0.6, cs_m1_relative_size=0.6, cs_relative_size=0.3):
        results = list()
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-3, trend_window)
            cs_m2 = self.candle_stick_frame[i-2]
            cs_m1 = self.candle_stick_frame[i-1]
            cs = self.candle_stick_frame[i]
            results.append(self._is_evening_star(trend, cs_m2, cs_m1, cs, cs_m2_body_ratio, cs_body_ratio,
                                                 cs_m2_relative_size, cs_m1_relative_size, cs_relative_size))
        return results

    # Three Black Crows (18)
    def _is_three_black_crows(self, trend, cs_m2, cs_m1, cs, cs_body_ratio=0.6, cs_relative_size=0.5):
        if trend is None:
            return None
        elif trend == 'up':
            if cs_m2.is_bearish() and cs_m2.cs_body_ratio() >= cs_body_ratio and cs_m2.cs_size() >= cs_relative_size * self._average_candle_stick_size:
                if cs_m1.is_bearish() and cs_m1.cs_body_ratio() >= cs_body_ratio and cs_m1.cs_size() >= cs_relative_size * self._average_candle_stick_size:
                    if cs.is_bearish() and cs.cs_body_ratio() >= cs_body_ratio and cs.cs_size() >= cs_relative_size * self._average_candle_stick_size:
                        return True
        return False

    def is_three_black_crows(self, trend_window=10, cs_body_ratio=0.6, cs_relative_size=0.5):
        results = list()
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-3, trend_window)
            cs_m2 = self.candle_stick_frame[i-2]
            cs_m1 = self.candle_stick_frame[i-1]
            cs = self.candle_stick_frame[i]
            results.append(self._is_three_black_crows(trend, cs_m2, cs_m1, cs, cs_body_ratio, cs_relative_size))
        return results

    # Bearish Marubozu (19)
    def _is_bearish_marubozu(self, trend, cs, cs_body_ratio=0.9):
        if trend is None:
            return None
        else:
            return cs.is_bearish() and cs.cs_body_ratio() >= cs_body_ratio and trend == 'up'

    def is_bearish_marubozu(self, trend_window=10, cs_body_ratio=0.9):
        results = list()
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-1, trend_window)
            cs = self.candle_stick_frame[i]
            results.append(self._is_bearish_marubozu(trend, cs, cs_body_ratio))
        return results

In [60]:
# load data
df = pd.read_csv('EURUSD_Daily.csv')
df

Unnamed: 0,<DATE>,<OPEN>,<HIGH>,<LOW>,<CLOSE>,<TICKVOL>,<VOL>,<SPREAD>
0,2006.01.02,1.18490,1.18690,1.18010,1.18210,5870,0.000000e+00,20
1,2006.01.03,1.18220,1.20330,1.18100,1.20150,9455,0.000000e+00,20
2,2006.01.04,1.20140,1.21460,1.20120,1.21080,10123,0.000000e+00,20
3,2006.01.05,1.21070,1.21230,1.20650,1.20970,9584,0.000000e+00,20
4,2006.01.06,1.21010,1.21820,1.20780,1.21430,9219,0.000000e+00,20
...,...,...,...,...,...,...,...,...
4406,2022.12.23,1.05934,1.06323,1.05861,1.06150,762777,1.144170e+11,5
4407,2022.12.26,1.06196,1.06359,1.06140,1.06335,224659,3.369885e+10,5
4408,2022.12.27,1.06324,1.06689,1.06110,1.06385,757183,1.135770e+11,5
4409,2022.12.28,1.06385,1.06738,1.06058,1.06115,742676,1.114010e+11,5


In [61]:
# create candlestick frame
candle_stick_frame = CandleStickFrame(df['<DATE>'], df['<OPEN>'], df['<HIGH>'], df['<LOW>'], df['<CLOSE>'], df['<VOL>'])
len(candle_stick_frame)

4411

In [62]:
res = CandleStickPattern(candle_stick_frame).trend(1053, 10)
res

-0.061730000000000285 0.12205000000000021


-0.5057763211798457

In [66]:
a = CandleStickPattern(candle_stick_frame)
b = a.is_piercing()
b

0.022199999999999775 0.11499999999999977
0.02179999999999982 0.11539999999999973
0.008999999999999675 0.10259999999999958
0.004799999999999693 0.0983999999999996
0.0031999999999998696 0.09999999999999942
0.0030999999999998806 0.09989999999999943
0.03079999999999994 0.10679999999999956
0.03170000000000006 0.10589999999999944
0.012400000000000189 0.10459999999999936
0.02069999999999994 0.09629999999999961
-0.00670000000000015 0.09929999999999972
-0.0048000000000001375 0.09739999999999971
-0.0029999999999998916 0.09919999999999995
-0.024199999999999555 0.10199999999999987
-0.008999999999999675 0.10240000000000005
-0.0338999999999996 0.10670000000000002
-0.061799999999999855 0.10000000000000009
-0.047599999999999865 0.10080000000000022
-0.04590000000000005 0.09910000000000041
-0.033199999999999896 0.09680000000000044
-0.03159999999999985 0.0952000000000004
-0.02179999999999982 0.09440000000000026
-0.026299999999999768 0.08990000000000031
-0.02279999999999993 0.08640000000000048
-0.02429999

[piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (None),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),
 piercing -> (False),

In [None]:
candle_pattern = CandleStickPattern(candle_stick_frame)
df['is_bullish_marubozu'] = candle_pattern.is_bullish_marubozu()
df['is_bearish_marubozu'] = candle_pattern.is_bearish_marubozu()
df['is_morning_star'] = candle_pattern.is_morning_star()
df['is_evening_star'] = candle_pattern.is_evening_star()
df['is_bullish_engulfing'] = candle_pattern.is_bullish_engulfing()
df['is_bearish_engulfing'] = candle_pattern.is_bearish_engulfing()
df['is_hammer'] = candle_pattern.is_hammer()
df['is_hanging_man'] = candle_pattern.is_hanging_man()
df['is_piercing'] = candle_pattern.is_piercing()
df['is_dark_cloud'] = candle_pattern.is_dark_cloud()
df['is_three_white_soldiers'] = candle_pattern.is_three_white_soldiers()
df['is_three_black_crows'] = candle_pattern.is_three_black_crows()
df

In [None]:
import itertools
import tqdm


class CandelStickPatternFinder(CandleStickPattern):
    def __init__(self, candle_stick_frame):
        super().__init__(candle_stick_frame)
        self.patterns = list()

    def _get_n_cs_pattern(self, n_cs, trend_window, permutation, ts_confirm):
        cs_body_ratio = list()
        cs_relative_size = list()
        cs_body_position = list()
        body_ls_ratio = list()
        body_us_ratio = list()

        for n in permutation:
            cs_body_ratio.append(n[0])
            cs_relative_size.append(n[1])
            cs_body_position.append(n[2])
            body_ls_ratio.append(n[3])
            body_us_ratio.append(n[4])

        bearish_results = list()
        bullish_results = list()
        for i in range(len(self.candle_stick_frame)):
            trend = self.trend(i-n_cs, trend_window)
            last_n_cs = self.candle_stick_frame[i-n_cs:i]
            boolean_results = list()
            for n in range(len(last_n_cs)):
                try:
                    if last_n_cs[n].body_position() >= cs_body_position[n] and \
                            last_n_cs[n].cs_body_ratio() >= cs_body_ratio[n] and \
                            last_n_cs[n].cs_size() >= cs_relative_size[n] * self._average_candle_stick_size and \
                            last_n_cs[n].body_lower_shadow_ratio() >= body_ls_ratio[n] and \
                            last_n_cs[n].body_upper_shadow_ratio() >= body_us_ratio[n]:
                        boolean_results.append(True)
                    else:
                        boolean_results.append(False)
                except:
                    boolean_results.append(False)

            if trend == 'up' and all(boolean_results):
                bearish_results.append(True)
                bullish_results.append(False)
            elif trend == 'down' and all(boolean_results):
                bullish_results.append(True)
                bearish_results.append(False)
            else:
                bearish_results.append(False)
                bullish_results.append(False)
        confirmations = self._evaluate_cs_pattern((bearish_results, bullish_results), ts_confirm)
        return confirmations

    def _evaluate_cs_pattern(self, results, ts_confirm):
        bearish_confirmation_counter = 0
        bullish_confirmation_counter = 0
        for i in range(len(self.candle_stick_frame)):
            if results[0][i] is True:
                if self.trend(i-ts_confirm, ts_confirm) == 'down':
                    bearish_confirmation_counter += 1
            elif results[1][i] is True:
                if self.trend(i-ts_confirm, ts_confirm) == 'up':
                    bullish_confirmation_counter += 1
        return bearish_confirmation_counter, bullish_confirmation_counter


    def find_n_cs_pattern(self,
                          n_cs=1,
                          trend_window=10,
                          cs_body_ratio=[(0, 1, 0.2)],
                          cs_relative_size=[(0, 1, 0.2)],
                          cs_body_position=[(-1, 1, 0.2)],
                          body_ls_ratio=[(0, 1, 0.2)],
                          body_us_ratio=[(0, 1, 0.2)],
                          ts_confirm=10,
                          ):
        permutations = list()
        for n in range(n_cs):
            # permutation of all parametes with the given bordes and step size
            permutations.append(list(itertools.product(*[np.arange(*cs_body_ratio[n]),
                                                         np.arange(*cs_relative_size[n]),
                                                         np.arange(*cs_body_position[n]),
                                                         np.arange(*body_ls_ratio[n]),
                                                         np.arange(*body_us_ratio[n])])))

        # combine all permutations
        permutations = list(itertools.product(*permutations))
        print(len(permutations))

        for permutation in tqdm.tqdm(permutations):
            confirmations = self._get_n_cs_pattern(n_cs, trend_window, permutation, ts_confirm)
            self.patterns.append(('bearish', permutation, confirmations[0]))
            self.patterns.append(('bullish', permutation, confirmations[1]))
        return self.patterns

In [10]:
#save the df
df.to_csv('EURUSD_Daily_with_patterns.csv', index=False)

In [11]:
CandelStickPatternFinder(candle_stick_frame).find_n_cs_pattern(n_cs=1)

200000


  0% 744/200000 [01:55<8:33:25,  6.47it/s] 


KeyboardInterrupt: 