In [3]:
import pandas as pd
from pandas.core.indexes.datetimes import DatetimeIndex

In [4]:
data_5m = pd.read_csv("BTCUSDT-5m-2025-04.csv")
data_5m["date"] = pd.to_datetime(data_5m["date"], unit="us") # Micro-seconds
data_5m

Unnamed: 0,date,open,high,low,close,volume,close_time,quote_asset_volume,num_trades,taker_buy_base_asset_volume,taker_buy_quote_asset_volume,unused
0,2025-04-01 00:00:00,82550.00,82617.15,82486.40,82617.15,48.69432,1743465899999999,4.019235e+06,7314,18.96382,1.565362e+06,0
1,2025-04-01 00:05:00,82617.15,82712.54,82612.00,82660.01,36.94084,1743466199999999,3.053496e+06,9704,21.29983,1.760501e+06,0
2,2025-04-01 00:10:00,82660.01,82660.01,82483.49,82483.49,28.18349,1743466499999999,2.326894e+06,7970,13.32000,1.099706e+06,0
3,2025-04-01 00:15:00,82483.50,82599.62,82432.74,82596.36,48.77984,1743466799999999,4.025642e+06,6889,38.41559,3.170880e+06,0
4,2025-04-01 00:20:00,82596.35,82697.14,82596.35,82643.76,29.77446,1743467099999999,2.461204e+06,6771,21.48218,1.775804e+06,0
...,...,...,...,...,...,...,...,...,...,...,...,...
8635,2025-04-30 23:35:00,94333.33,94333.34,94233.98,94296.21,17.00259,1746056399999999,1.603034e+06,6008,8.90212,8.392952e+05,0
8636,2025-04-30 23:40:00,94296.21,94300.00,94173.94,94173.95,24.39299,1746056699999999,2.298223e+06,7145,10.59523,9.981500e+05,0
8637,2025-04-30 23:45:00,94173.94,94231.79,94142.85,94231.79,21.10164,1746056999999999,1.987349e+06,7039,13.86686,1.306002e+06,0
8638,2025-04-30 23:50:00,94231.78,94283.04,94201.75,94217.39,13.32211,1746057299999999,1.255641e+06,5741,5.70634,5.378154e+05,0


In [5]:
from dataclasses import dataclass
from pandas import Timestamp
from typing import Literal

@dataclass
class BreakerBlock:
    direction: Literal["bullish","bearish"]
    key_points: tuple[Timestamp, Timestamp, Timestamp, Timestamp] # Main high/low timestamps in ascending order
    breaker_candle: Timestamp

    @property
    def highs(self):
        if self.direction == "bullish":
            return [self.key_points[1], self.key_points[3]]
        else:
            return [self.key_points[0], self.key_points[2]]
        
    @property
    def lows(self):
        if self.direction == "bullish":
            return [self.key_points[0], self.key_points[2]]
        else:
            return [self.key_points[1], self.key_points[3]]




In [6]:
import plotly.graph_objects as go

def plot_ohlc(
    df: pd.DataFrame,
    fig: go.Figure | None = None,
    breaker: BreakerBlock | None = None,
    plot_breaker_candle: bool = False,
    breaker_candle_extension_bars: int = 10,
    high_indices: list[pd.Timestamp] | None = None,
    low_indices: list[pd.Timestamp] | None = None,
):
    
    assert isinstance(df.index, DatetimeIndex), "Index of data must be datetime"
    
    if fig is None:
        # Create OHLC plot
        fig = go.Figure(data=go.Candlestick(
            x=df.index,
            open=df['open'],
            high=df['high'],
            low=df['low'],
            close=df['close'],
        ))
    else:
        fig=fig

    if breaker is not None:
        high_indices = breaker.highs
        low_indices = breaker.lows

        breakthrough_pos = df.index.get_loc(breaker.key_points[-1])
        assert isinstance(breakthrough_pos, int), "get_loc did not return an int"
        breaker_extension_end = df.index[breakthrough_pos + breaker_candle_extension_bars]

        if plot_breaker_candle == True:
            fig.add_shape(
                type="rect",
                x0=breaker.breaker_candle, x1=breaker_extension_end,
                y0=df["low"].loc[breaker.breaker_candle], y1=df["high"].loc[breaker.breaker_candle],
                fillcolor="rgba(0, 200, 0, 0.2)",
                line=dict(color="rgba(0, 200, 0, 0.4)"),
                layer="below",
            )

    if high_indices is not None:
        # Process highlights
        highlight_df = df[df.index.isin(high_indices)].copy()

        if highlight_df.empty:
            print("No valid highlights found.")
        else:
            # Determine Y position (above or below candle)
            highlight_df['Y'] = highlight_df['high'] + 1

            # Add scatter plot for highlight markers
            fig.add_trace(go.Scatter(
                x=highlight_df.index,
                y=highlight_df['Y'],
                mode='markers',
                marker=dict(size=8, symbol='circle', color='red'),
                name='Highlight'
            ))

    if low_indices is not None:
        # Process highlights
        highlight_df = df[df.index.isin(low_indices)].copy()

        if highlight_df.empty:
            print("No valid highlights found.")
        else:
            # Determine Y position (above or below candle)
            highlight_df['Y'] = highlight_df['low'] - 1

            # Add scatter plot for highlight markers
            fig.add_trace(go.Scatter(
                x=highlight_df.index,
                y=highlight_df['Y'],
                mode='markers',
                marker=dict(size=8, symbol='circle', color='blue'),
                name='Highlight'
            ))

    fig.update_layout(
        xaxis_rangeslider_visible=False
    )

    fig.update_layout(
        margin=dict(l=0, r=0, t=0, b=0),
        xaxis=dict(visible=False),
        yaxis=dict(visible=False),
        showlegend=False,
        template='plotly_dark',
        width=600, 
        height=400,
    )
    return fig

## Breaker Detection

- Bullish = low, high, lower low, large displacement to higher high

In [7]:
from pandas import DatetimeIndex
from typing import cast

def detect_bullish_breakers(
    df: pd.DataFrame,
    swing_length: int = 2,
    min_price_body_gap: float = 0.5,
    displacement_timer: int = 3,
) -> list[BreakerBlock]:
    
    assert isinstance(df.index, DatetimeIndex), "Index of data must be datetime"
    
    window = 2 * swing_length + 1

    df['swing_high'] = df['high'] == df['high'].rolling(window=window, center=True).max()
    df['swing_low'] = df['low'] == df['low'].rolling(window=window, center=True).min()

    df['body_size'] = (df['close'] - df['open']).abs()
    df['avg_body_size'] = df['body_size'].rolling(window=20).mean()
    df['min_price_gap'] = df['avg_body_size'] * min_price_body_gap

    high_df = df[df["swing_high"] == True]["high"].rename("price").to_frame()
    low_df = df[df["swing_low"] == True]["low"].rename("price").to_frame()

    high_df["high"] = True
    low_df["high"] = False

    combined_df = pd.concat([high_df, low_df]).sort_index()

    # Remove all entries that are both highs and lows
    combined_df = combined_df[~combined_df.index.duplicated(keep=False)]

    assert combined_df.index.has_duplicates == False
    combined_df = combined_df[combined_df['high'] != combined_df['high'].shift()]

    bullish_breakers = []
    min_price_gap = 0 

    # Bullish
    for i in range(len(combined_df)):

        if i + 3 > len(combined_df) - 1:
            continue
    
        # We must start on a low
        if combined_df["high"].iloc[i] != False:
            continue

        min_price_gap = df['min_price_gap'].loc[combined_df.index[i]]

        # Check for lower low
        if combined_df["price"].iloc[i] <= combined_df["price"].iloc[i+2] + min_price_gap:
            continue

        # Check the "high" isn't lower than any lows
        if combined_df["price"].iloc[i+1] < combined_df["price"].iloc[i] + min_price_gap:
            continue
        if combined_df["price"].iloc[i+1] < combined_df["price"].iloc[i+2] + min_price_gap:
            continue

        # Identify breakthrough candle
        breakthrough = False
        ll_index = combined_df.index[i+2]
        high_value = combined_df["price"].iloc[i+1]
        chopped_df = df[ df.index > ll_index]
        breakthrough_index = pd.Timestamp(0)
        for j in range(swing_length, swing_length + displacement_timer):
            if chopped_df["close"].iloc[j] > high_value + min_price_gap:
                breakthrough = True
                breakthrough_index = chopped_df.index[j]
                break
        
        if breakthrough == False:
            continue

        # Identify Breaker Candle
        chopped_df = df[ df.index <= combined_df.index[i+1]]
        breaker_index = -1
        while (chopped_df["close"].iloc[breaker_index] - chopped_df["open"].iloc[breaker_index]) < 0:
            breaker_index -= 1

        breaker_index = chopped_df.index[breaker_index]

        bullish_breakers.append(
            BreakerBlock(
                direction="bullish",
                key_points=(
                    combined_df.index[i],
                    combined_df.index[i+1],
                    combined_df.index[i+2],
                    breakthrough_index
                ),
                breaker_candle=breaker_index
            )
        )

    return bullish_breakers


In [8]:
data = data_5m.set_index("date")

bullish_breakers = detect_bullish_breakers(
    df = data,
    swing_length=2
)

fig = plot_ohlc(df=data)

for breaker in bullish_breakers:
    plot_ohlc(
        df = data,
        breaker = breaker,
        fig = fig,
        plot_breaker_candle=True,
    )

fig.show()