![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [None]:
# region imports
from AlgorithmImports import *
# endregion
from collections import deque
import pandas as pd
import numpy as np

def swing_highs_lows_online(
    ohlc: pd.DataFrame,
    N_candidates: list = [5, 10, 20, 50],
    N_confirmation: int = 3,
    min_move_threshold: float = 0.0,
    min_bars_between_swings: int = 3
) -> pd.DataFrame:
    """
    Online swing high/low detection with confirmation (no repainting).

    Parameters:
    -----------
    ohlc : pd.DataFrame with 'close', 'high', 'low'
    N_candidates : list of int - windows to detect candidate swings
    N_confirmation : int - number of future bars to confirm candidate
    min_move_threshold : float - minimal move in % to accept swing
    min_bars_between_swings : int - minimal bars between consecutive swings

    Returns:
    --------
    pd.DataFrame with columns:
        'HighLow' : 1 for swing high, -1 for swing low, NaN otherwise
        'Level'   : price level of the swing
    """
    
    swings = pd.DataFrame(index=ohlc.index, columns=["HighLow", "Level"], dtype=float)
    last_swing_index = -min_bars_between_swings - 1  # initialize for spacing
    
    # For each candidate window size
    for N in N_candidates:
        window = N
        closes = ohlc['close'].values
        highs = ohlc['high'].values
        lows = ohlc['low'].values
        
        # deque to hold future bars for confirmation
        future_window = deque(maxlen=N_confirmation)
        
        for i in range(len(ohlc)):
            # update future_window
            future_window.append(closes[i])
            
            # only start confirming when we have enough bars
            if len(future_window) < N_confirmation:
                continue
            
            # candidate index to check
            idx = i - N_confirmation
            if idx < 0 or (idx - last_swing_index) < min_bars_between_swings:
                continue
            
            # get window for candidate
            left = max(0, idx - window)
            right = idx + 1  # do not include future bars beyond candidate
            candidate_close = closes[idx]
            window_values = closes[left:right]
            
            # check swing high
            if candidate_close == max(window_values):
                # optional min_move_threshold check
                if min_move_threshold == 0 or (candidate_close - min(window_values)) / candidate_close >= min_move_threshold:
                    swings.at[ohlc.index[idx], "HighLow"] = 1
                    swings.at[ohlc.index[idx], "Level"] = highs[idx]
                    last_swing_index = idx
            
            # check swing low
            elif candidate_close == min(window_values):
                if min_move_threshold == 0 or (max(window_values) - candidate_close) / candidate_close >= min_move_threshold:
                    swings.at[ohlc.index[idx], "HighLow"] = -1
                    swings.at[ohlc.index[idx], "Level"] = lows[idx]
                    last_swing_index = idx
                    
    return swings

In [6]:
import math
from entry_exit import Bar, PositionDirection, has_level
from typing import Optional


class StopLossManager:
    """
    Stop loss manager.

    Computes SL on entry and checks its trigger on every bar.
    Supports three modes:
    - fixed: fixed percent from entry
    - structural: behind last swing level + buffer
    - bos: beyond the signal candle extreme (Break of Structure) + buffer

    Stop loss is fixed once at entry and then stays unchanged.
    """

    VALID_MODES = {"fixed", "structural", "bos"}

    def __init__(
        self,
        mode: str = "fixed",
        fixed_pct: float = 0.01,
        buffer_pct: float = 0.001,
    ):
        """
        Initialize stop loss manager.

        Args:
            mode: SL mode ("fixed" | "structural" | "bos")
            fixed_pct: Percent of entry for fixed mode (e.g., 0.01 = 1%)
            buffer_pct: Buffer percent of entry for structural and bos

        Raises:
            ValueError: If mode is invalid or parameters are non-positive
        """
        if mode not in self.VALID_MODES:
            raise ValueError(
                f"Invalid mode: '{mode}'. Allowed: {', '.join(self.VALID_MODES)}"
            )
        if fixed_pct <= 0:
            raise ValueError(f"fixed_pct must be > 0, got {fixed_pct}")
        if buffer_pct < 0:
            raise ValueError(f"buffer_pct cannot be negative, got {buffer_pct}")

        self.mode = mode
        self.fixed_pct = fixed_pct
        self.buffer_pct = buffer_pct
        self.reset()

    def reset(self) -> None:
        """
        Reset manager state.

        Call after a position is closed to prepare for the next entry.
        """
        self.direction: Optional[PositionDirection] = None
        self.entry_price: Optional[float] = None
        self.stop_price: Optional[float] = None
        self.active = False

    def on_entry(
        self,
        direction: PositionDirection,
        entry_price: float,
        *,
        last_swing_high: Optional[float] = None,
        last_swing_low: Optional[float] = None,
        signal_bar: Optional[Bar] = None,
    ) -> float:
        """
        Compute and fix stop loss on position entry.

        Args:
            direction: Position direction (PositionDirection.LONG or .SHORT)
            entry_price: Position entry price
            last_swing_high: Last swing high price (used in structural for SHORT)
            last_swing_low: Last swing low price (used in structural for LONG)
            signal_bar: BOS candle t (closed at t, entry at open t+1).
                StopLossManager does not compute BOS — caller responsibility.

        Returns:
            float: Calculated stop loss price

        Raises:
            RuntimeError: If manager is already active (reset() not called)
            ValueError: If required parameters for the chosen mode are missing
            ValueError: If entry_price <= 0
        """
        if self.active:
            raise RuntimeError(
                "StopLossManager already active! Call reset() before a new entry."
            )

        if entry_price <= 0:
            raise ValueError(f"entry_price must be > 0, got {entry_price}")

        self.direction = direction
        self.entry_price = entry_price

        if self.mode == "fixed":
            self.stop_price = self._fixed_sl()

        elif self.mode == "structural":
            if direction == PositionDirection.LONG:
                if not has_level(last_swing_low):
                    raise ValueError("For 'structural' LONG you must provide last_swing_low")
                assert last_swing_low is not None
                self.stop_price = self._structural_sl_long(last_swing_low)
            elif direction == PositionDirection.SHORT:
                if not has_level(last_swing_high):
                    raise ValueError("For 'structural' SHORT you must provide last_swing_high")
                assert last_swing_high is not None
                self.stop_price = self._structural_sl_short(last_swing_high)

        elif self.mode == "bos":
            if signal_bar is None:
                raise ValueError("For 'bos' you must provide signal_bar!")
            # bos: stop is placed beyond the signal candle extreme (t) with no lookahead for future swings.
            self.stop_price = self._bos_sl(signal_bar)

        if self.stop_price is None:
            raise ValueError(f"stop_price not computed (mode={self.mode}, direction={direction})")
        self._validate_stop_price(
            self.stop_price,
            last_swing_high=last_swing_high,
            last_swing_low=last_swing_low,
            signal_bar=signal_bar,
        )
        self.active = True
        return self.stop_price

    def should_exit(self, bar: Bar) -> bool:
        """
        Check if stop loss is hit on the current bar.

        Args:
            bar: Current candle (Bar)

        Returns:
            bool: True if price crosses stop loss, else False
        """
        if not self.active:
            return False

        if self.direction == PositionDirection.LONG:
            return bar.low <= self.stop_price  # type: ignore

        if self.direction == PositionDirection.SHORT:
            return bar.high >= self.stop_price  # type: ignore

        return False

    def _fixed_sl(self) -> float:
        """
        Compute stop loss for fixed mode.

        Returns:
            float: Stop loss price
        """
        if self.direction == PositionDirection.LONG:
            return self.entry_price * (1 - self.fixed_pct)

        return self.entry_price * (1 + self.fixed_pct)

    def _structural_sl_long(self, last_swing_low: float) -> float:
        """
        Compute stop loss for structural LONG (below last swing low).
        """
        buffer = self.entry_price * self.buffer_pct
        return last_swing_low - buffer

    def _structural_sl_short(self, last_swing_high: float) -> float:
        """
        Compute stop loss for structural SHORT (above last swing high).
        """
        buffer = self.entry_price * self.buffer_pct
        return last_swing_high + buffer

    def _bos_sl(self, signal_bar: Bar) -> float:
        """
        Compute stop loss for bos mode (beyond signal candle extreme).

        Args:
            signal_bar: Break of Structure signal candle

        Returns:
            float: Stop loss price with buffer
        """
        buffer = self.entry_price * self.buffer_pct

        if self.direction == PositionDirection.LONG:
            return signal_bar.low - buffer

        return signal_bar.high + buffer

    def _validate_stop_price(
        self,
        stop_price: Optional[float],
        *,
        last_swing_high: Optional[float],
        last_swing_low: Optional[float],
        signal_bar: Optional[Bar],
    ) -> None:
        """
        Unified validation of stop: validity and side relative to entry.
        """
        if self.entry_price is None:
            raise ValueError("entry_price is missing before stop validation")

        if stop_price is None or (isinstance(stop_price, float) and not math.isfinite(stop_price)):
            raise ValueError(
                f"Invalid stop_price (mode={self.mode}, dir={self.direction}, entry={self.entry_price}, stop={stop_price}, "
                f"last_swing_high={last_swing_high}, last_swing_low={last_swing_low})"
            )

        if stop_price <= 0:
            raise ValueError(
                f"stop_price must be > 0 (mode={self.mode}, dir={self.direction}, entry={self.entry_price}, stop={stop_price})"
            )

        if self.direction == PositionDirection.LONG and stop_price >= self.entry_price:
            raise ValueError(
                f"LONG stop must be below entry (mode={self.mode}, entry={self.entry_price}, stop={stop_price}, last_swing_low={last_swing_low})"
            )

        if self.direction == PositionDirection.SHORT and stop_price <= self.entry_price:
            raise ValueError(
                f"SHORT stop must be above entry (mode={self.mode}, entry={self.entry_price}, stop={stop_price}, last_swing_high={last_swing_high})"
            )

In [7]:
from __future__ import annotations

import math
from dataclasses import dataclass
from typing import Callable, Optional, Tuple, TYPE_CHECKING

if TYPE_CHECKING:
    from entry_exit import PositionDirection


@dataclass(frozen=True)
class RiskConfig:
    """
    Risk configuration for sizing positions.

    Attributes:
        risk_budget_cash: how much cash you are willing to lose on the trade.
        max_quantity: optional hard cap on quantity (float to allow fractional assets).
        min_risk_per_unit: optional minimal stop distance; if stop is tighter, skip the trade.
        use_buying_power_cap: if True, cap budget by provided buying_power_cash in size_position.
    """
    risk_budget_cash: float
    max_quantity: Optional[float] = None
    min_risk_per_unit: Optional[float] = None
    use_buying_power_cap: bool = False


def size_position(
    *,
    direction: "PositionDirection",
    entry_price: float,
    sl_price: float,
    risk_config: RiskConfig,
    buying_power_cash: Optional[float] = None,
    round_func: Callable[[float], float] = math.floor,
) -> Tuple[Optional[float], Optional[str]]:
    """
    Compute position quantity based on risk budget and stop distance.

    Hard failures -> ValueError (broken data).
    Soft refusals -> (None, reason).
    """
    if entry_price <= 0:
        raise ValueError(f"entry_price must be > 0, got {entry_price}")
    if sl_price <= 0:
        raise ValueError(f"sl_price must be > 0, got {sl_price}")
    if risk_config.risk_budget_cash <= 0:
        raise ValueError(f"risk_budget_cash must be > 0, got {risk_config.risk_budget_cash}")

    risk_per_unit = abs(entry_price - sl_price)
    if risk_per_unit <= 0:
        raise ValueError("risk_per_unit must be > 0 (entry and SL cannot coincide).")

    if risk_config.min_risk_per_unit is not None and risk_per_unit < risk_config.min_risk_per_unit:
        return None, "risk_per_unit below min_risk_per_unit"

    budget = risk_config.risk_budget_cash
    if risk_config.use_buying_power_cap and buying_power_cash is not None:
        budget = min(budget, buying_power_cash)

    if budget <= 0:
        return None, "no budget available"

    raw_qty = budget / risk_per_unit
    qty = round_func(raw_qty)

    if risk_config.max_quantity is not None:
        qty = min(qty, risk_config.max_quantity)

    if qty <= 0:
        return None, "sized quantity is zero after caps"

    return float(qty), None

In [8]:

from __future__ import annotations

import math
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from risk import RiskConfig, size_position

# IMPORTANT:
# If your stop_loss module imports Bar and PositionDirection from this file,
# these names must exist here and keep the same meaning.


class PositionDirection(str, Enum):
    LONG = "LONG"
    SHORT = "SHORT"


class ExitReason(str, Enum):
    SL = "SL"
    TP = "TP"


class TakeProfitMode(str, Enum):
    RR_BASED = "RR_BASED"
    RANGE_BASED = "RANGE_BASED"


class SameBarSlTpRule(str, Enum):
    WORST_CASE = "WORST_CASE"
    OPEN_PROXIMITY = "OPEN_PROXIMITY"
    LOWER_TIMEFRAME = "LOWER_TIMEFRAME"


@dataclass(frozen=True)
class Bar:
    """
    OHLC candlestick bar.
    Volume/time are optional and not required for V1 entry/exit rules.
    """
    open: float
    high: float
    low: float
    close: float
    volume: Optional[float] = None
    time: Optional[str] = None  # Replace with datetime if you prefer.


@dataclass(frozen=True)
class SwingLevels:
    """
    Snapshot of swing levels used by the strategy.

    Names match the V1 document:
      - last_swing_high_price == lastSwingHighPrice
      - last_swing_low_price == lastSwingLowPrice
    """
    last_swing_high_price: Optional[float] = None
    last_swing_low_price: Optional[float] = None


def has_level(level: Optional[float]) -> bool:
    """Level is usable only if it is not None/NaN."""
    return level is not None and not (isinstance(level, float) and math.isnan(level))


def bos_up(close: float, last_high: Optional[float]) -> bool:
    """Break of structure to the upside."""
    return has_level(last_high) and close > last_high  # type: ignore[arg-type]


def bos_down(close: float, last_low: Optional[float]) -> bool:
    """Break of structure to the downside."""
    return has_level(last_low) and close < last_low  # type: ignore[arg-type]


def update_last_swing_levels(
    swing_levels: SwingLevels,
    *,
    highlow_flag: Optional[float],
    level: Optional[float],
) -> SwingLevels:
    """
    Update last swing levels only when a confirmed swing is observed.

    Args:
        highlow_flag: 1 for swing high, -1 for swing low, anything else leaves levels unchanged.
        level: swing price level; ignored if invalid.
    """
    if not has_level(level):
        return swing_levels

    if highlow_flag == 1:
        return SwingLevels(last_swing_high_price=level, last_swing_low_price=swing_levels.last_swing_low_price)
    if highlow_flag == -1:
        return SwingLevels(last_swing_high_price=swing_levels.last_swing_high_price, last_swing_low_price=level)
    return swing_levels


@dataclass(frozen=True)
class BosSignal:
    """
    BOS signal detected on the CLOSE of the signal candle (t).

    Execution in V1 is deterministic:
      - signal is confirmed on Close[t]
      - entry is executed on Open[t+1]
    """
    direction: PositionDirection
    signal_candle_index: int  # t


@dataclass(frozen=True)
class TradePlan:
    """
    Trade plan created at entry time (V1 fixes SL/TP at entry).

    Contains:
      - signal_candle_index: t (where BOS was detected on close)
      - entry_candle_index: t+1 (where we execute on open)
      - entry_price, sl_price, tp_price, quantity
    """
    direction: PositionDirection
    signal_candle_index: int
    entry_candle_index: int
    entry_price: float
    sl_price: float
    tp_price: float
    quantity: float


@dataclass(frozen=True)
class TradeExit:
    """
    Exit event for a trade: price and reason (SL or TP).
    """
    exit_price: float
    exit_reason: ExitReason


def detect_bos_signal(*, bars: list[Bar], t: int, swing_levels: SwingLevels) -> Optional[BosSignal]:
    """
    V1 BOS definition:

      - BOS Long:  Close[t] > lastSwingHighPrice
      - BOS Short: Close[t] < lastSwingLowPrice

    Signal is evaluated on the CLOSE of bar t.
    Entry requires bar t+1 to exist (executed on Open[t+1]).
    """
    if t < 0 or t >= len(bars):
        raise IndexError("Bar index out of range.")

    # We need the next bar for execution on Open[t+1].
    if t + 1 >= len(bars):
        return None

    close_t = bars[t].close

    if not has_level(swing_levels.last_swing_high_price):
        assert not bos_up(close_t, swing_levels.last_swing_high_price)
    if not has_level(swing_levels.last_swing_low_price):
        assert not bos_down(close_t, swing_levels.last_swing_low_price)

    if bos_up(close_t, swing_levels.last_swing_high_price):
        return BosSignal(direction=PositionDirection.LONG, signal_candle_index=t)

    if bos_down(close_t, swing_levels.last_swing_low_price):
        return BosSignal(direction=PositionDirection.SHORT, signal_candle_index=t)

    return None


def calculate_take_profit_price(
    *,
    direction: PositionDirection,
    tp_mode: TakeProfitMode,
    entry_price: float,
    sl_price: float,
    tp_mult: float,
    swing_levels: SwingLevels,
) -> float:
    """
    V1 Take Profit modes:

    1) RR_BASED:
        R = abs(Entry - SL)
        Long:  TP = Entry + k * R
        Short: TP = Entry - k * R

    2) RANGE_BASED:
        range = lastSwingHighPrice - lastSwingLowPrice
        Long:  TP = Entry + range
        Short: TP = Entry - range
    """
    if tp_mode == TakeProfitMode.RR_BASED:
        r = abs(entry_price - sl_price)
        if r <= 0:
            raise ValueError("RR_BASED: invalid R (entry_price must differ from sl_price).")
        if tp_mult <= 0:
            raise ValueError("RR_BASED: tp_mult must be > 0.")

        if direction == PositionDirection.LONG:
            return entry_price + tp_mult * r
        return entry_price - tp_mult * r

    if tp_mode == TakeProfitMode.RANGE_BASED:
        hi = swing_levels.last_swing_high_price
        lo = swing_levels.last_swing_low_price
        if not (has_level(hi) and has_level(lo)):
            raise ValueError("RANGE_BASED: requires both last swing high and last swing low.")
        assert hi is not None and lo is not None
        rng = hi - lo
        if rng <= 0:
            raise ValueError("RANGE_BASED: invalid range (swing high must be > swing low).")

        if direction == PositionDirection.LONG:
            return entry_price + rng
        return entry_price - rng

    raise ValueError(f"Unsupported tp_mode: {tp_mode}")


def plan_trade_from_signal(
    *,
    bars: list[Bar],
    bos_signal: BosSignal,
    swing_levels: SwingLevels,
    stop_loss_manager,
    tp_mode: TakeProfitMode,
    tp_mult: float,
    risk_config: RiskConfig,
    buying_power_cash: Optional[float] = None,
    position_sizer=size_position,
) -> TradePlan:
    """
    WHERE ENTRY HAPPENS (V1):

      - Signal candle index = t (BOS confirmed on Close[t])
      - Entry candle index  = t+1
      - Entry price         = Open[t+1]

    This function:
      1) Takes entry_price from Open[t+1]
      2) Fixes SL using your StopLossManager.on_entry(...)
      3) Calculates position size (risk-based)
      4) Calculates TP (RR-based or Range-based)
      5) Returns a TradePlan with entry/sl/tp/qty fixed
    """
    t = bos_signal.signal_candle_index
    entry_candle_index = t + 1
    if entry_candle_index >= len(bars):
        raise ValueError("Cannot plan entry: next candle (t+1) does not exist.")

    entry_price = bars[entry_candle_index].open

    # Stop Loss is fixed at entry using your stop-loss module.
    sl_price = stop_loss_manager.on_entry(
        direction=bos_signal.direction,
        entry_price=entry_price,
        last_swing_high=swing_levels.last_swing_high_price,
        last_swing_low=swing_levels.last_swing_low_price,
        signal_bar=bars[t],
    )

    qty, refuse_reason = position_sizer(
        direction=bos_signal.direction,
        entry_price=entry_price,
        sl_price=sl_price,
        risk_config=risk_config,
        buying_power_cash=buying_power_cash,
    )
    if qty is None or qty <= 0:
        raise ValueError(
            f"Position sizing refused (reason={refuse_reason}, dir={bos_signal.direction}, entry={entry_price}, sl={sl_price})"
        )

    tp_price = calculate_take_profit_price(
        direction=bos_signal.direction,
        tp_mode=tp_mode,
        entry_price=entry_price,
        sl_price=sl_price,
        tp_mult=tp_mult,
        swing_levels=swing_levels,
    )

    return TradePlan(
        direction=bos_signal.direction,
        signal_candle_index=t,
        entry_candle_index=entry_candle_index,
        entry_price=entry_price,
        sl_price=sl_price,
        tp_price=tp_price,
        quantity=qty,
    )


def check_exit_rules(
    *,
    bar: Bar,
    direction: PositionDirection,
    sl_price: float,
    tp_price: float,
    same_bar_rule: SameBarSlTpRule,
) -> Optional[TradeExit]:
    """
    V1 exit rules:

    LONG:
      - SL hit if Low <= SL
      - TP hit if High >= TP

    SHORT:
      - SL hit if High >= SL
      - TP hit if Low <= TP

    If both SL and TP are hit within the same bar (OHLC-only ambiguity),
    we apply a deterministic tie-breaking rule:

      - WORST_CASE: assume SL first
      - OPEN_PROXIMITY: assume whichever level is closer to bar.open is hit first
      - LOWER_TIMEFRAME: not implemented in this module
    """
    if direction == PositionDirection.LONG:
        sl_hit = bar.low <= sl_price
        tp_hit = bar.high >= tp_price
    else:
        sl_hit = bar.high >= sl_price
        tp_hit = bar.low <= tp_price

    if not sl_hit and not tp_hit:
        return None

    if sl_hit and not tp_hit:
        return TradeExit(exit_price=sl_price, exit_reason=ExitReason.SL)

    if tp_hit and not sl_hit:
        return TradeExit(exit_price=tp_price, exit_reason=ExitReason.TP)

    # Both hit in the same bar
    if same_bar_rule == SameBarSlTpRule.WORST_CASE:
        return TradeExit(exit_price=sl_price, exit_reason=ExitReason.SL)

    if same_bar_rule == SameBarSlTpRule.OPEN_PROXIMITY:
        sl_dist = abs(bar.open - sl_price)
        tp_dist = abs(bar.open - tp_price)
        if sl_dist <= tp_dist:
            return TradeExit(exit_price=sl_price, exit_reason=ExitReason.SL)
        return TradeExit(exit_price=tp_price, exit_reason=ExitReason.TP)

    if same_bar_rule == SameBarSlTpRule.LOWER_TIMEFRAME:
        raise NotImplementedError(
            "LOWER_TIMEFRAME requires lower timeframe data and must be handled in the backtest engine."
        )

    raise ValueError(f"Unsupported same_bar_rule: {same_bar_rule}")


In [15]:
# QuantConnect Research Notebook
# BOS Breakout Strategy (V1) — Research Backtester + Grid Search

from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any, List, Tuple
import numpy as np
import pandas as pd

# -----------------------------
# 1) Enums / Config
# -----------------------------

class PositionDirection(str, Enum):
    LONG = "LONG"
    SHORT = "SHORT"

class ExitReason(str, Enum):
    SL = "SL"
    TP = "TP"

class TakeProfitMode(str, Enum):
    RR_BASED = "RR_BASED"
    RANGE_BASED = "RANGE_BASED"

class SameBarSlTpRule(str, Enum):
    WORST_CASE = "WORST_CASE"
    OPEN_PROXIMITY = "OPEN_PROXIMITY"

@dataclass(frozen=True)
class RiskConfig:
    # Research: simplest risk sizing (risk budget in cash)
    risk_budget_cash: float
    max_quantity: Optional[float] = None
    min_risk_per_unit: Optional[float] = None

@dataclass(frozen=True)
class StrategyConfig:
    sl_mode: str                     # "fixed" | "structural" | "bos"
    tp_mode: TakeProfitMode          # RR_BASED | RANGE_BASED
    fixed_pct: float                 # used if sl_mode == "fixed"
    buffer_pct: float                # used if sl_mode in {"structural","bos"}
    tp_mult: float                   # used if tp_mode == RR_BASED
    same_bar_rule: SameBarSlTpRule
    cooldown_bars: int = 0           # optional; 0 disables
    max_trades_per_day: Optional[int] = None  # optional; None disables

# -----------------------------
# 2) Helpers
# -----------------------------

def _has_level(x) -> bool:
    return x is not None and not (isinstance(x, float) and np.isnan(x))

def size_position(entry_price: float, sl_price: float, risk: RiskConfig) -> Tuple[Optional[float], Optional[str]]:
    if entry_price <= 0 or sl_price <= 0:
        raise ValueError("entry_price and sl_price must be > 0")
    risk_per_unit = abs(entry_price - sl_price)
    if risk_per_unit <= 0:
        return None, "risk_per_unit <= 0"
    if risk.min_risk_per_unit is not None and risk_per_unit < risk.min_risk_per_unit:
        return None, "risk_per_unit below min_risk_per_unit"
    qty = np.floor(risk.risk_budget_cash / risk_per_unit)
    if risk.max_quantity is not None:
        qty = min(qty, risk.max_quantity)
    if qty <= 0:
        return None, "qty <= 0 after sizing"
    return float(qty), None

def compute_sl(
    *,
    sl_mode: str,
    direction: PositionDirection,
    entry_price: float,
    fixed_pct: float,
    buffer_pct: float,
    last_swing_high: Optional[float],
    last_swing_low: Optional[float],
    signal_high: float,
    signal_low: float
) -> Optional[float]:
    """
    Returns:
      - float SL price if SL can be computed and is valid
      - None if SL cannot be computed for the current bar/config (soft-skip)
    """
    if entry_price <= 0:
        return None

    if sl_mode == "fixed":
        if fixed_pct <= 0:
            return None
        sl = entry_price * (1 - fixed_pct) if direction == PositionDirection.LONG else entry_price * (1 + fixed_pct)

    elif sl_mode == "structural":
        buffer = entry_price * buffer_pct
        if direction == PositionDirection.LONG:
            if not _has_level(last_swing_low):
                return None
            sl = float(last_swing_low) - buffer
        else:
            if not _has_level(last_swing_high):
                return None
            sl = float(last_swing_high) + buffer

    elif sl_mode == "bos":
        buffer = entry_price * buffer_pct
        sl = float(signal_low) - buffer if direction == PositionDirection.LONG else float(signal_high) + buffer

    else:
        raise ValueError(f"Unknown sl_mode: {sl_mode}")

    # Final validity checks (soft)
    if not np.isfinite(sl) or sl <= 0:
        return None

    # Side checks (soft)
    if direction == PositionDirection.LONG and sl >= entry_price:
        return None
    if direction == PositionDirection.SHORT and sl <= entry_price:
        return None

    return float(sl)

def compute_tp(
    *,
    tp_mode: TakeProfitMode,
    direction: PositionDirection,
    entry_price: float,
    sl_price: float,
    tp_mult: float,
    last_swing_high: Optional[float],
    last_swing_low: Optional[float],
) -> Optional[float]:
    """
    Returns:
      - float TP price if TP can be computed
      - None if TP is not computable for the current bar/config (soft-skip)
    """

    # Basic sanity
    if entry_price <= 0 or sl_price <= 0:
        return None

    if tp_mode == TakeProfitMode.RR_BASED:
        r = abs(entry_price - sl_price)
        if r <= 0 or tp_mult <= 0:
            return None

        if direction == PositionDirection.LONG:
            return entry_price + tp_mult * r
        else:
            return entry_price - tp_mult * r

    if tp_mode == TakeProfitMode.RANGE_BASED:
        # Soft requirements: if swings missing or invalid range, skip (do not raise)
        if not (_has_level(last_swing_high) and _has_level(last_swing_low)):
            return None

        rng = float(last_swing_high) - float(last_swing_low)
        if rng <= 0:
            return None

        if direction == PositionDirection.LONG:
            return entry_price + rng
        else:
            return entry_price - rng

    # Unknown mode is a real programming error: keep it loud
    raise ValueError(f"Unknown tp_mode: {tp_mode}")

def resolve_same_bar_exit(
    *,
    direction: PositionDirection,
    bar_open: float,
    sl_price: float,
    tp_price: float,
    same_bar_rule: SameBarSlTpRule
) -> Tuple[float, ExitReason]:
    if same_bar_rule == SameBarSlTpRule.WORST_CASE:
        return sl_price, ExitReason.SL

    if same_bar_rule == SameBarSlTpRule.OPEN_PROXIMITY:
        sl_dist = abs(bar_open - sl_price)
        tp_dist = abs(bar_open - tp_price)
        if sl_dist <= tp_dist:
            return sl_price, ExitReason.SL
        return tp_price, ExitReason.TP

    raise ValueError(f"Unsupported same_bar_rule: {same_bar_rule}")

# -----------------------------
# 3) Backtest engine (single config)
# -----------------------------

def backtest_bos_breakout(
    df: pd.DataFrame,
    swings: pd.DataFrame,
    config: StrategyConfig,
    risk: RiskConfig,
) -> Dict[str, Any]:
    """
    df: index = datetime, columns = open, high, low, close
    swings: index aligned with df, columns: HighLow (1/-1), Level (price)
    """
    # Ensure alignment
    df = df.copy()
    df.columns = [c.lower() for c in df.columns]
    swings = swings.reindex(df.index)

    n = len(df)
    if n < 5:
        return {"trades": [], "metrics": {"num_trades": 0}}

    # State
    in_pos = False
    direction = None
    entry_idx = None
    entry_price = None
    sl_price = None
    tp_price = None
    qty = None
    signal_idx = None

    last_swing_high = None
    last_swing_low = None

    cooldown_until = -1
    trades_today = 0
    current_day = None

    trades: List[Dict[str, Any]] = []

    # iterate over bars; BOS uses Close[t], entry at Open[t+1]
    for t in range(n - 1):  # need t+1
        ts = df.index[t]
        day = ts.date()

        # daily counter
        if current_day is None or day != current_day:
            current_day = day
            trades_today = 0

        # update swings only when confirmed swing at t
        hl = swings.iloc[t]["HighLow"] if "HighLow" in swings.columns else np.nan
        lvl = swings.iloc[t]["Level"] if "Level" in swings.columns else np.nan

        if _has_level(lvl):
            if hl == 1:
                last_swing_high = float(lvl)
            elif hl == -1:
                last_swing_low = float(lvl)

        # if in position: check exit on current bar t
        if in_pos:
            o = float(df.iloc[t]["open"])
            h = float(df.iloc[t]["high"])
            l = float(df.iloc[t]["low"])

            if direction == PositionDirection.LONG:
                sl_hit = l <= sl_price
                tp_hit = h >= tp_price
            else:
                sl_hit = h >= sl_price
                tp_hit = l <= tp_price

            if sl_hit or tp_hit:
                if sl_hit and tp_hit:
                    exit_price, exit_reason = resolve_same_bar_exit(
                        direction=direction, bar_open=o, sl_price=sl_price, tp_price=tp_price,
                        same_bar_rule=config.same_bar_rule
                    )
                elif sl_hit:
                    exit_price, exit_reason = sl_price, ExitReason.SL
                else:
                    exit_price, exit_reason = tp_price, ExitReason.TP

                # PnL
                if direction == PositionDirection.LONG:
                    pnl = (exit_price - entry_price) * qty
                else:
                    pnl = (entry_price - exit_price) * qty

                r = abs(entry_price - sl_price)
                r_mult = ( (exit_price - entry_price)/r ) if direction == PositionDirection.LONG else ( (entry_price - exit_price)/r )

                trades.append({
                    "entry_time": df.index[entry_idx],
                    "exit_time": df.index[t],
                    "direction": direction.value,
                    "entry_price": entry_price,
                    "sl_price": sl_price,
                    "tp_price": tp_price,
                    "exit_price": float(exit_price),
                    "exit_reason": exit_reason.value,
                    "qty": float(qty),
                    "pnl": float(pnl),
                    "r_mult": float(r_mult),
                    "signal_index": int(signal_idx),
                    "entry_index": int(entry_idx),
                    "exit_index": int(t),
                    "last_swing_high_on_entry": None if last_swing_high is None else float(last_swing_high),
                    "last_swing_low_on_entry": None if last_swing_low is None else float(last_swing_low),
                })

                # reset position
                in_pos = False
                direction = None
                entry_idx = None
                entry_price = None
                sl_price = None
                tp_price = None
                qty = None
                signal_idx = None

                # cooldown
                if config.cooldown_bars and config.cooldown_bars > 0:
                    cooldown_until = t + config.cooldown_bars

                continue  # after exit, do not enter on same bar

        # if flat: check entry signal on t close
        if (not in_pos) and (t >= cooldown_until):
            if config.max_trades_per_day is not None and trades_today >= config.max_trades_per_day:
                continue

            close_t = float(df.iloc[t]["close"])

            bos_long = _has_level(last_swing_high) and close_t > float(last_swing_high)
            bos_short = _has_level(last_swing_low) and close_t < float(last_swing_low)

            if bos_long or bos_short:
                dirn = PositionDirection.LONG if bos_long else PositionDirection.SHORT

                # execute at Open[t+1]
                entry_i = t + 1
                entry_p = float(df.iloc[entry_i]["open"])

                # signal candle extremes are from bar t
                signal_high = float(df.iloc[t]["high"])
                signal_low = float(df.iloc[t]["low"])

                try:
                    sl = compute_sl(
                        sl_mode=config.sl_mode,
                        direction=dirn,
                        entry_price=entry_p,
                        fixed_pct=config.fixed_pct,
                        buffer_pct=config.buffer_pct,
                        last_swing_high=last_swing_high,
                        last_swing_low=last_swing_low,
                        signal_high=signal_high,
                        signal_low=signal_low,
                    )

                    if sl is None:
                        continue

                    # sanity: side check (soft refusal)
                    if dirn == PositionDirection.LONG and sl >= entry_p:
                        continue
                    if dirn == PositionDirection.SHORT and sl <= entry_p:
                        continue

                    q, reason = size_position(entry_p, sl, risk)
                    if q is None:
                        continue

                    tp = compute_tp(
                        tp_mode=config.tp_mode,
                        direction=dirn,
                        entry_price=entry_p,
                        sl_price=sl,
                        tp_mult=config.tp_mult,
                        last_swing_high=last_swing_high,
                        last_swing_low=last_swing_low,
                        )

                    # If TP is not computable for this bar/config (e.g., RANGE_BASED invalid range),
                    # skip the trade (soft refusal, not an error).
                    if tp is None:
                        continue


                except Exception:
                    # invalid setup for this bar/config (e.g., missing swings for range-based)
                    continue

                # open position
                in_pos = True
                direction = dirn
                entry_idx = entry_i
                entry_price = entry_p
                sl_price = float(sl)
                tp_price = float(tp)
                qty = float(q)
                signal_idx = t
                trades_today += 1

    # metrics
    trades_df = pd.DataFrame(trades)
    metrics = compute_metrics(trades_df)

    return {"trades": trades_df, "metrics": metrics}

# -----------------------------
# 4) Metrics + stability slices
# -----------------------------

def compute_metrics(trades_df: pd.DataFrame) -> Dict[str, Any]:
    if trades_df is None or len(trades_df) == 0:
        return {
            "num_trades": 0, "total_pnl": 0.0, "winrate": np.nan,
            "avg_r": np.nan, "profit_factor": np.nan, "max_dd_pnl": np.nan
        }

    pnl = trades_df["pnl"].values
    wins = pnl[pnl > 0]
    losses = pnl[pnl < 0]

    total_pnl = float(np.sum(pnl))
    winrate = float(np.mean(pnl > 0))
    avg_r = float(np.mean(trades_df["r_mult"].values))

    pf = np.nan
    if len(losses) > 0:
        pf = float(np.sum(wins) / abs(np.sum(losses))) if np.sum(wins) != 0 else 0.0

    # drawdown on cumulative pnl (simple)
    cum = np.cumsum(pnl)
    peak = np.maximum.accumulate(cum)
    dd = cum - peak
    max_dd = float(np.min(dd)) if len(dd) else 0.0

    return {
        "num_trades": int(len(trades_df)),
        "total_pnl": total_pnl,
        "winrate": winrate,
        "avg_r": avg_r,
        "profit_factor": pf,
        "max_dd_pnl": max_dd,
    }

def evaluate_by_windows(
    df: pd.DataFrame,
    swings: pd.DataFrame,
    config: StrategyConfig,
    risk: RiskConfig,
    windows: List[Tuple[pd.Timestamp, pd.Timestamp]]
) -> Dict[str, Any]:
    window_metrics = []
    for (start, end) in windows:
        mask = (df.index >= start) & (df.index < end)
        if mask.sum() < 10:
            window_metrics.append({"start": start, "end": end, "num_trades": 0, "total_pnl": 0.0})
            continue
        res = backtest_bos_breakout(df.loc[mask], swings.loc[mask], config, risk)
        m = res["metrics"]
        window_metrics.append({"start": start, "end": end, **m})

    wm = pd.DataFrame(window_metrics)
    # robustness: worst-case pnl and median pnl across windows
    robust = {
        "windows_worst_total_pnl": float(wm["total_pnl"].min()),
        "windows_median_total_pnl": float(wm["total_pnl"].median()),
        "windows_min_trades": int(wm["num_trades"].min()),
    }
    return {"per_window": wm, "robust": robust}

# -----------------------------
# 5) Grid search runner
# -----------------------------

def grid_search(
    df: pd.DataFrame,
    swings: pd.DataFrame,
    risk: RiskConfig,
    configs: List[StrategyConfig],
    windows: Optional[List[Tuple[pd.Timestamp, pd.Timestamp]]] = None
) -> pd.DataFrame:
    rows = []
    for cfg in configs:
        res = backtest_bos_breakout(df, swings, cfg, risk)
        m = res["metrics"]

        row = {
            "sl_mode": cfg.sl_mode,
            "tp_mode": cfg.tp_mode.value,
            "fixed_pct": cfg.fixed_pct,
            "buffer_pct": cfg.buffer_pct,
            "tp_mult": cfg.tp_mult,
            "same_bar_rule": cfg.same_bar_rule.value,
            "cooldown_bars": cfg.cooldown_bars,
            "max_trades_per_day": cfg.max_trades_per_day,
            **m
        }

        if windows is not None:
            w = evaluate_by_windows(df, swings, cfg, risk, windows)
            row.update(w["robust"])
        rows.append(row)

    out = pd.DataFrame(rows)

    # Example ranking: robustness first, then overall pnl
    sort_cols = []
    if windows is not None:
        sort_cols += ["windows_worst_total_pnl", "windows_median_total_pnl"]
    sort_cols += ["total_pnl", "max_dd_pnl"]
    out = out.sort_values(sort_cols, ascending=[False]*len(sort_cols)).reset_index(drop=True)
    return out

# -----------------------------
# 6) QC data load template
# -----------------------------
# Example usage in QC Research:
#
# from QuantConnect.Research import QuantBook
# qb = QuantBook()
# symbol = qb.AddCrypto("BTCUSD", Resolution.Hour).Symbol   # or AddEquity(...)
# hist = qb.History(symbol, start, end, Resolution.Hour)
# df = hist.loc[symbol].copy()
# df = df.rename(columns={"open":"open","high":"high","low":"low","close":"close"})[["open","high","low","close"]]
#
# --- Swing detection ---
# Use your existing swing detector here.
#
# swings = swing_highs_lows_online(df.rename(columns={"open":"open","high":"high","low":"low","close":"close"}) , ...)
#
# Then build configs list and run grid_search.



In [16]:
from QuantConnect.Research import QuantBook
from datetime import datetime

qb = QuantBook()

symbol = qb.AddCrypto("BTCUSD", Resolution.Hour).Symbol
start = datetime(2021, 1, 1)
end   = datetime(2024, 12, 31)

hist = qb.History(symbol, start, end, Resolution.Hour)

# QC history обычно multiindex: (symbol, time)
df = hist.loc[symbol].copy()
df = df[["open","high","low","close"]].dropna()
df.index = pd.to_datetime(df.index)

# 1) swings: подключаешь твою swing_highs_lows_online
swings = swing_highs_lows_online(
    df.rename(columns={"close":"close","high":"high","low":"low"}),
    N_candidates=[5,10,20],
    N_confirmation=3,
    min_move_threshold=0.0,
    min_bars_between_swings=3
)

# 2) risk + configs grid
risk = RiskConfig(risk_budget_cash=100, max_quantity=None, min_risk_per_unit=None)

sl_modes = ["fixed", "structural", "bos"]
tp_modes = [TakeProfitMode.RR_BASED, TakeProfitMode.RANGE_BASED]
fixed_grid  = [0.005, 0.01]         # 0.5%, 1%
buffer_grid = [0.0, 0.001, 0.002]   # 0%, 0.1%, 0.2%
tp_mult_grid = [1.0, 1.5, 2.0]
same_bar = SameBarSlTpRule.WORST_CASE

configs = []
for sl in sl_modes:
    for tp in tp_modes:
        for fixed_pct in fixed_grid:
            for buffer_pct in buffer_grid:
                for tp_mult in tp_mult_grid:
                    # убрать неактуальные параметры по режиму:
                    if sl != "fixed" and fixed_pct != fixed_grid[0]:
                        continue
                    if tp == TakeProfitMode.RANGE_BASED and tp_mult != tp_mult_grid[0]:
                        continue

                    configs.append(StrategyConfig(
                        sl_mode=sl,
                        tp_mode=tp,
                        fixed_pct=fixed_pct,
                        buffer_pct=buffer_pct,
                        tp_mult=tp_mult,
                        same_bar_rule=same_bar,
                        cooldown_bars=0,
                        max_trades_per_day=None
                    ))

# 3) windows (regime slices)
windows = [
    (pd.Timestamp("2021-01-01"), pd.Timestamp("2022-01-01")),
    (pd.Timestamp("2022-01-01"), pd.Timestamp("2023-01-01")),
    (pd.Timestamp("2023-01-01"), pd.Timestamp("2024-01-01")),
    (pd.Timestamp("2024-01-01"), pd.Timestamp("2025-01-01")),
]

results = grid_search(df, swings, risk, configs, windows=windows)
results.head(20)


Unnamed: 0,sl_mode,tp_mode,fixed_pct,buffer_pct,tp_mult,same_bar_rule,cooldown_bars,max_trades_per_day,num_trades,total_pnl,winrate,avg_r,profit_factor,max_dd_pnl,windows_worst_total_pnl,windows_median_total_pnl,windows_min_trades
0,fixed,RR_BASED,0.01,0.0,1.0,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
1,fixed,RR_BASED,0.01,0.0,1.5,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
2,fixed,RR_BASED,0.01,0.0,2.0,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
3,fixed,RR_BASED,0.01,0.001,1.0,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
4,fixed,RR_BASED,0.01,0.001,1.5,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
5,fixed,RR_BASED,0.01,0.001,2.0,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
6,fixed,RR_BASED,0.01,0.002,1.0,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
7,fixed,RR_BASED,0.01,0.002,1.5,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
8,fixed,RR_BASED,0.01,0.002,2.0,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
9,fixed,RANGE_BASED,0.01,0.0,1.0,WORST_CASE,0,,0,0.0,,,,,0.0,0.0,0
