In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display

In [2]:
def load_binance_klines_csv(path, tz=None):
    """
    Load a Binance-style kline CSV into a pandas DataFrame.
    The function is flexible about column names: it lower-cases columns and
    tries to detect a time column (common names: 'open_time','timestamp','date','time').
    If a time column contains integer milliseconds since epoch it will be parsed accordingly.
    Returns a DataFrame with numeric OHLCV columns where available.
    """
    import pandas as pd
    df = pd.read_csv(path)
    df.columns = [c.lower() for c in df.columns]

    time_col = None
    for cand in ('open_time', 'timestamp', 'date', 'time', 'open time', 'close_time'):
        if cand in df.columns:
            time_col = cand
            break

    if time_col is not None:
        ser = df[time_col]
        parsed = None
        # try ms integers first when appropriate
        try:
            if pd.api.types.is_integer_dtype(ser) or pd.api.types.is_float_dtype(ser):
                maxv = int(ser.dropna().abs().max()) if len(ser.dropna())>0 else 0
                if maxv > 1e12:
                    parsed = pd.to_datetime(ser, unit='ms', errors='coerce')
        except Exception:
            parsed = None
        if parsed is None or parsed.isna().all():
            try:
                parsed = pd.to_datetime(ser, errors='coerce')
            except Exception:
                parsed = None
        if parsed is not None:
            if tz is not None:
                try:
                    parsed = pd.to_datetime(parsed).dt.tz_localize('UTC').dt.tz_convert(tz)
                except Exception:
                    parsed = pd.to_datetime(parsed)
            df[time_col] = parsed
            df = df.set_index(time_col)

    # coerce common numeric columns to numeric types
    for col in ('open', 'high', 'low', 'close', 'volume'):
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')

    return df


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ipywidgets as widgets


class StreamBOSConfig:
    def __init__(
        self,
        window=5,
        rr=1.0,
        use_next_open=True,
        fee=0.00075,
        require_prior_swing=True,
        allow_countertrend=False,
        allow_micro_structure=True
    ):
        self.window = window
        self.rr = rr
        self.use_next_open = use_next_open
        self.fee = fee
        self.require_prior_swing = require_prior_swing
        self.allow_countertrend = allow_countertrend
        self.allow_micro_structure = allow_micro_structure


class StreamBOSEngine:
    def __init__(self, cfg):
        self.cfg = cfg
        self.reset()

    def reset(self):
        self.t = -1
        self.swings = []
        self.last_swing_type = None

        self.last_lh = None
        self.last_ll = None

        self.broken_high = set()
        self.broken_low = set()

        self.bos = []
        self.trend = None

        self.open_trades = []
        self.closed_trades = []

    def step(self, df, t):
        self.t = t
        self._confirm_swing(df)
        self._detect_bos(df)
        self._update_open_trades(df)
        self._maybe_open_trades(df)

    def _confirm_swing(self, df):
        w = self.cfg.window
        i = self.t - w
        if i < 0:
            return

        closes = df["close"].values
        px = float(closes[i])
        past = closes[i:self.t + 1]

        is_high = px == past.max()
        is_low = px == past.min()

        if is_high and self.last_swing_type != "high":
            if self._allow_swing("high", px):
                self.swings.append((i, px, "high", self.t))
                self.last_lh = (i, px)
                self.last_swing_type = "high"

        if is_low and self.last_swing_type != "low":
            if self._allow_swing("low", px):
                self.swings.append((i, px, "low", self.t))
                self.last_ll = (i, px)
                self.last_swing_type = "low"

    def _allow_swing(self, typ, px):
        if self.cfg.allow_micro_structure:
            return True

        if typ == "high" and self.trend == "up":
            if self.last_lh is None:
                return True
            return px >= self.last_lh[1]

        if typ == "low" and self.trend == "down":
            if self.last_ll is None:
                return True
            return px <= self.last_ll[1]

        return True

    def _detect_bos(self, df):
        if self.t <= 0:
            return

        cl = float(df["close"].iloc[self.t])

        if self.last_lh is not None:
            i, px = self.last_lh
            if cl > px and i not in self.broken_high:
                self.bos.append((self.t, "up", i))
                self.broken_high.add(i)
                self.trend = "up"

        if self.last_ll is not None:
            i, px = self.last_ll
            if cl < px and i not in self.broken_low:
                self.bos.append((self.t, "down", i))
                self.broken_low.add(i)
                self.trend = "down"

    def _last_confirmed_swing(self, before):
        cands = [s for s in self.swings if s[3] <= before and s[0] < before]
        return cands[-1] if cands else None

    def _last_protective_swing(self, side, before):
        want = "low" if side == "long" else "high"
        cands = [s for s in self.swings if s[2] == want and s[3] <= before and s[0] < before]
        return cands[-1] if cands else None

    def _maybe_open_trades(self, df):
        if not self.bos:
            return

        t, d, _ = self.bos[-1]
        if t != self.t:
            return

        if not self.cfg.allow_countertrend and d != self.trend:
            return

        last_sw = self._last_confirmed_swing(t)
        if self.cfg.require_prior_swing and last_sw is None:
            return

        side = "long" if d == "up" else "short"
        if self.cfg.require_prior_swing:
            if side == "long" and last_sw[2] != "low":
                return
            if side == "short" and last_sw[2] != "high":
                return

        prot = self._last_protective_swing(side, t)
        if prot is None:
            return

        prot_idx, sl, _, _ = prot
        i_entry = t + 1 if self.cfg.use_next_open else t
        if i_entry >= len(df):
            return

        entry = float(df["open"].iloc[i_entry]) if self.cfg.use_next_open else float(df["close"].iloc[t])
        if side == "long" and sl >= entry:
            return
        if side == "short" and sl <= entry:
            return

        risk = abs(entry - sl)
        tp = entry + self.cfg.rr * risk if side == "long" else entry - self.cfg.rr * risk

        self.open_trades.append(dict(
            side=side,
            entry_idx=i_entry,
            entry=entry,
            sl=sl,
            tp=tp
        ))

    def _update_open_trades(self, df):
        still_open = []

        for tr in self.open_trades:
            if self.t < tr["entry_idx"]:
                still_open.append(tr)
                continue

            hi = float(df["high"].iloc[self.t])
            lo = float(df["low"].iloc[self.t])

            hit = None
            px = None

            if tr["side"] == "long":
                if hi >= tr["tp"]:
                    hit = "tp"
                    px = tr["tp"]
                elif lo <= tr["sl"]:
                    hit = "sl"
                    px = tr["sl"]
            else:
                if lo <= tr["tp"]:
                    hit = "tp"
                    px = tr["tp"]
                elif hi >= tr["sl"]:
                    hit = "sl"
                    px = tr["sl"]

            if hit is None:
                still_open.append(tr)
                continue

            tr["exit_idx"] = self.t
            tr["exit"] = px
            tr["hit"] = hit
            self.closed_trades.append(tr)

        self.open_trades = still_open


def stepper_stream_ui(df, cfg):
    widgets.Widget.close_all()
    # Remove any leftover widget DOM nodes in the frontend to avoid duplicate UIs
    from IPython.display import Javascript, display
    display(Javascript("document.querySelectorAll('.widget-area, .widget-subarea').forEach(n => n.remove())"))

    eng = StreamBOSEngine(cfg)
    state = dict(t=0)

    out = widgets.Output()
    btn = widgets.Button(description="Next")

    def compute_equity_history(t_max):
        # re-simulate engine up to each timestep to build equity series
        eng_hist = StreamBOSEngine(cfg)
        eqs = []
        for ti in range(0, t_max + 1):
            eng_hist.step(df, ti)
            # realized pnl from closed_trades up to ti
            realized = 0.0
            for tr in eng_hist.closed_trades:
                if tr.get('exit_idx', 0) <= ti:
                    entry = tr.get('entry', None)
                    exitp = tr.get('exit', None)
                    if entry is not None and exitp is not None:
                        if tr.get('side') == 'long':
                            realized += exitp - entry
                        else:
                            realized += entry - exitp
            # unrealized from open_trades at price at ti
            unreal = 0.0
            px_now = float(df['close'].iloc[ti])
            for tr in eng_hist.open_trades:
                entry = tr.get('entry')
                if entry is None:
                    continue
                if tr.get('side') == 'long':
                    unreal += px_now - entry
                else:
                    unreal += entry - px_now
            eqs.append(realized + unreal)
        return eqs

    def render():
        with out:
            out.clear_output(wait=True)
            t = state["t"]

            plt.close('all')
            fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), gridspec_kw={'height_ratios': [3, 1]})

            # top: price + swings/trades
            ax1.plot(df["close"].iloc[:t + 1].values)

            for i, px, typ, c in eng.swings:
                if c <= t:
                    ax1.scatter(i, px, marker='^' if typ == 'high' else 'v')

            for i, d, _ in eng.bos:
                if i <= t:
                    ax1.scatter(i, df["close"].iloc[i], s=120)

            for tr in eng.open_trades:
                ax1.axhline(tr["sl"], linestyle='--', alpha=0.4)
                ax1.axhline(tr["tp"], linestyle='--', alpha=0.4)

            ax1.axvline(t)
            trend_text = eng.trend if eng.trend is not None else 'none'
            ax1.set_title(f"t={t} | trend={trend_text} | swings={len(eng.swings)} | bos={len(eng.bos)} | open={len(eng.open_trades)} | closed={len(eng.closed_trades)}")
            ax1.grid(True)

            # bottom: equity history
            eqs = compute_equity_history(t)
            ax2.plot(eqs, color='tab:green')
            ax2.scatter([t], [eqs[-1]], color='black')
            ax2.set_ylabel('Equity')
            ax2.grid(True)

            plt.tight_layout()
            plt.show()

    def on_next(_):
        if state["t"] >= len(df) - 1:
            return
        eng.step(df, state["t"])
        state["t"] += 1
        render()

    try:
        btn._click_handlers.callbacks.clear()
    except Exception:
        pass

    btn.on_click(on_next)
    display(btn)
    display(out)
    render()


def run_stream_no_plot(df, cfg):
    """Run the engine from t=0..end without any plotting and return (trades_df, equity_series).

    trades_df: pandas.DataFrame of closed trades (columns include side, entry_idx, exit_idx, entry, exit, sl, tp, hit, pnl)
    equity_series: pandas.Series indexed like `df.index` (or RangeIndex) with equity over time (realized + unrealized)
    """
    eng = StreamBOSEngine(cfg)
    eqs = []
    realized = 0.0

    prev_closed_count = 0
    for t in range(len(df)):
        eng.step(df, t)
        # process newly closed trades for pnl
        new_closed = eng.closed_trades[prev_closed_count:]
        for tr in new_closed:
            entry = tr.get('entry')
            exitp = tr.get('exit')
            if entry is None or exitp is None:
                tr['pnl'] = None
                continue
            pnl = (exitp - entry) if tr.get('side') == 'long' else (entry - exitp)
            tr['pnl'] = pnl
            realized += pnl
        prev_closed_count = len(eng.closed_trades)

        # unrealized from current open trades
        px_now = float(df['close'].iloc[t])
        unreal = 0.0
        for tr in eng.open_trades:
            entry = tr.get('entry')
            if entry is None:
                continue
            if tr.get('side') == 'long':
                unreal += px_now - entry
            else:
                unreal += entry - px_now

        eqs.append(realized + unreal)

    trades = pd.DataFrame(eng.closed_trades)
    if not trades.empty and 'pnl' not in trades.columns:
        trades['pnl'] = trades.apply(lambda r: ((r['exit'] - r['entry']) if r['side'] == 'long' else (r['entry'] - r['exit'])), axis=1)

    # build equity series indexed like df
    if hasattr(df, 'index') and len(df.index) >= len(eqs):
        idx = df.index[:len(eqs)]
    else:
        idx = pd.RangeIndex(len(eqs))
    equity = pd.Series(eqs, index=idx)

    return trades, equity


# uso (sin plot):
# trades, equity = run_stream_no_plot(df, cfg)
# trades.head(), equity.tail()


In [None]:
df = load_binance_klines_csv("candle_data/BTCUSDT/1h/BTCUSDT_1h_last1825d.csv")


cfg = StreamBOSConfig(window=3, rr=1.0,
                    require_prior_swing=True,
                    allow_countertrend=False,
                    allow_micro_structure=True)

stepper_stream_ui(df, cfg)

<IPython.core.display.Javascript object>

Button(description='Next', style=ButtonStyle())

Output()

In [None]:
trades, equity = run_stream_no_plot(df, cfg)
