In [None]:
# What needs to be done
# 1) Load OHLC data from a local CSV into a pandas DataFrame.
# 2) Ensure column names and dtypes match the SR zone contract.
# 3) Convert price data into a NumPy-backed structure when needed,
#    while still keeping pandas for indexing and rolling ops.
#
# Brute-force approach (not recommended)
# - Read CSV directly into NumPy with np.loadtxt
# - Lose column names, datetime index, and rolling-window convenience
#
# Edge cases
# - CSV missing required OHLC columns
# - Datetime not parsed correctly
# - NaNs or non-float values
# - Wrong column order
#
# Optimal approach
# - Use pandas for ingestion + validation
# - Convert to NumPy *only* where it adds value
# - Keep pandas as the "source of truth"


import numpy as np
import pandas as pd


REQUIRED_COLUMNS = ["Open", "High", "Low", "Close"]


def load_prices_from_csv(
  path: str,
) -> pd.DataFrame:
  """
  Load OHLC data from CSV into a clean pandas DataFrame.

  Time complexity: O(N)
  Space complexity: O(N)
  """

  df = pd.read_csv(
    path,
    parse_dates=["Date"],
  )

  # Enforce datetime index
  df = df.set_index("Date")

  # Validate required columns
  missing = [c for c in REQUIRED_COLUMNS if c not in df.columns]
  if missing:
    raise ValueError(f"Missing required columns: {missing}")

  # Keep only OHLC columns, in fixed order
  df = df[REQUIRED_COLUMNS]

  # Enforce float dtype
  df = df.astype(float)

  # Drop rows with missing data
  df = df.dropna()

  return df


def prices_to_numpy(
  df: pd.DataFrame,
) -> np.ndarray:
  """
  Convert OHLC pandas DataFrame to NumPy array.

  Output shape:
    (N, 4) where columns are [Open, High, Low, Close]

  Time complexity: O(N)
  Space complexity: O(N)
  """

  return df.values


def prices_to_structured_numpy(
  df: pd.DataFrame,
) -> np.ndarray:
  """
  Convert OHLC DataFrame to a structured NumPy array.

  Useful if you want named fields without pandas.
  """

  return np.array(
    list(df.itertuples(index=False, name=None)),
    dtype=[
      ("Open", "f8"),
      ("High", "f8"),
      ("Low", "f8"),
      ("Close", "f8"),
    ],
  )


# Example usage
if __name__ == "__main__":
  df = load_prices_from_csv("data/spy_hourly.csv")

  prices_np = prices_to_numpy(df)
  prices_struct = prices_to_structured_numpy(df)

  print("Pandas DF:")
  print(df.head())

  print("\nNumPy array shape:", prices_np.shape)
  print(prices_np[:3])

  print("\nStructured NumPy array:")
  print(prices_struct[:3])


In [None]:
# What needs to be done
# 1) Find swing highs/lows ("pivots") from OHLC data.
# 2) Turn pivot prices into support/resistance "zones" by clustering
#    nearby prices (because levels are fuzzy, not exact).
# 3) Score zones (touches + recency), and return a compact set of
#    strongest zones you can use in a backtester.
#
# Brute-force approach (for intuition)
# - For each bar, look back/forward W bars and declare a pivot if it's
#   the max/min in that window (O(N*W)).
# - Put every pivot price into a list, then compare each pivot to every
#   other pivot and group if within some tolerance (O(P^2)).
# - This works but gets slow and messy as P grows.
#
# Edge cases
# - Flat regions (many equal highs/lows) -> duplicates; handle ties.
# - Very low volatility -> tolerance too small; clamp minimum tolerance.
# - Gaps / split-adjusted data -> verify data is adjusted/consistent.
# - Early bars (not enough window) -> pivots undefined.
# - If data has missing candles -> drop/forward-fill carefully.
#
# Optimal-ish approach below
# - Pivot detection: O(N) using rolling max/min (with pandas).
# - Clustering: sort pivots then single pass merge by tolerance O(P log P).
# - Scoring: O(P) per zone aggregation.
#
# Notes
# - This produces "zones" as [zone_low, zone_high] with a "level"
#   (center) and metadata. Zones are stable and easy to backtest.
# - Uses ATR to scale tolerance automatically to volatility regime.

from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional, Tuple, Dict

import numpy as np
import pandas as pd


@dataclass
class SRZone:
  kind: str                  # "support" or "resistance"
  level: float               # center price of the zone
  low: float                 # zone lower bound
  high: float                # zone upper bound
  touches: int               # how many pivots contributed
  last_touch_idx: int        # index (row number) of last touch pivot
  score: float               # computed strength score

df = load_prices_from_csv("data/spy_hourly.csv")

def compute_atr(
  df: pd.DataFrame,
  period: int = 14,
) -> pd.Series:
  """
  ATR (SMA of True Range).

  Time:  O(N)
  Space: O(N)
  """
  high = df["High"].astype(float)
  low = df["Low"].astype(float)
  close = df["Close"].astype(float)

  prev_close = close.shift(1)

  tr1 = (high - low).abs()
  tr2 = (high - prev_close).abs()
  tr3 = (low - prev_close).abs()

  tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)

  atr = tr.rolling(period, min_periods=period).mean()
  return atr


def find_pivots(
  df: pd.DataFrame,
  left: int = 5,
  right: int = 5,
  high_col: str = "High",
  low_col: str = "Low",
) -> Tuple[pd.Series, pd.Series]:
  """
  Pivot highs/lows using a symmetric window:
  - pivot high at i if High[i] is the max in [i-left, i+right]
  - pivot low  at i if Low[i]  is the min in [i-left, i+right]

  Complexity:
  - Rolling max/min: ~O(N) (pandas optimized)
  """
  high = df[high_col].astype(float)
  low = df[low_col].astype(float)

  window = left + right + 1

  roll_max = high.rolling(window, center=True, min_periods=window).max()
  roll_min = low.rolling(window, center=True, min_periods=window).min()

  pivot_high = (high == roll_max)
  pivot_low = (low == roll_min)

  # Convert booleans to price series (NaN where not a pivot)
  pivot_high_price = high.where(pivot_high, np.nan)
  pivot_low_price = low.where(pivot_low, np.nan)

  return pivot_high_price, pivot_low_price


def _cluster_prices_into_zones(
  prices: np.ndarray,
  idxs: np.ndarray,
  tolerance: float,
) -> List[Tuple[float, float, int, int]]:
  """
  Cluster sorted pivot prices into contiguous groups if gap <= tolerance.

  Returns list of tuples:
  (group_low, group_high, touches, last_touch_idx)

  Complexity:
  - Sort: O(P log P)
  - Single pass merge: O(P)
  """
  if prices.size == 0:
    return []

  order = np.argsort(prices)
  prices_sorted = prices[order]
  idxs_sorted = idxs[order]

  groups: List[Tuple[float, float, int, int]] = []

  g_low = prices_sorted[0]
  g_high = prices_sorted[0]
  touches = 1
  last_touch_idx = int(idxs_sorted[0])

  for p, i in zip(prices_sorted[1:], idxs_sorted[1:]):
    # O(1)
    if p <= g_high + tolerance:
      # Merge into current group
      g_high = max(g_high, p)
      g_low = min(g_low, p)
      touches += 1
      last_touch_idx = max(last_touch_idx, int(i))
    else:
      # Close group, start new
      groups.append((float(g_low), float(g_high), touches, last_touch_idx))
      g_low = p
      g_high = p
      touches = 1
      last_touch_idx = int(i)

  groups.append((float(g_low), float(g_high), touches, last_touch_idx))
  return groups


def build_sr_zones(
  df: pd.DataFrame,
  left: int = 5,
  right: int = 5,
  atr_period: int = 14,
  zone_width_atr: float = 0.30,
  min_zone_width: float = 0.05,
  max_zones_each: int = 8,
  high_col: str = "High",
  low_col: str = "Low",
  close_col: str = "Close",
) -> Dict[str, List[SRZone]]:
  """
  Build support/resistance zones from pivot lows/highs.

  Tolerance / zone width:
  - Use ATR * zone_width_atr as the merge tolerance (vol-adjusted).
  - Also enforce a minimum width to avoid tiny tolerance in quiet markets.

  Scoring:
  - score = touches * recency_weight
  - recency_weight decays with how long since last touch.

  Complexity:
  - ATR: O(N)
  - Pivots: ~O(N)
  - Clustering: O(P log P)
  """
  df = df.copy()

  atr = compute_atr(
    df,
    period=atr_period,
  )

  # Use the most recent ATR as a scaling value
  recent_atr = float(atr.dropna().iloc[-1]) if atr.dropna().size else 0.0
  tolerance = max(min_zone_width, recent_atr * zone_width_atr)

  pivot_high_price, pivot_low_price = find_pivots(
    df,
    left=left,
    right=right,
    high_col=high_col,
    low_col=low_col,
  )

  # Extract pivot arrays (prices + indices)
  ph = pivot_high_price.dropna()
  pl = pivot_low_price.dropna()

  ph_prices = ph.values.astype(float)
  ph_idxs = ph.index.to_numpy()

  pl_prices = pl.values.astype(float)
  pl_idxs = pl.index.to_numpy()

  # If index is datetime, we still need an integer "position" for scoring.
  # We'll map index -> integer row position.
  idx_to_pos = {idx: pos for pos, idx in enumerate(df.index)}
  ph_pos = np.array([idx_to_pos[i] for i in ph_idxs], dtype=int)
  pl_pos = np.array([idx_to_pos[i] for i in pl_idxs], dtype=int)

  res_groups = _cluster_prices_into_zones(
    prices=ph_prices,
    idxs=ph_pos,
    tolerance=tolerance,
  )
  sup_groups = _cluster_prices_into_zones(
    prices=pl_prices,
    idxs=pl_pos,
    tolerance=tolerance,
  )

  n = len(df)

  def make_zones(groups: List[Tuple[float, float, int, int]], kind: str):
    zones: List[SRZone] = []
    for g_low, g_high, touches, last_touch_pos in groups:
      level = (g_low + g_high) / 2.0

      # Optional: widen to a consistent band so zones are not razor thin.
      # O(1)
      half_width = max((g_high - g_low) / 2.0, tolerance / 2.0)
      low = level - half_width
      high = level + half_width

      # Recency: newer touches matter more. O(1)
      recency_weight = 1.0 / (1+touches / 50.0)

      score = float(touches) * float(recency_weight)

      zones.append(
        SRZone(
          kind=kind,
          level=float(level),
          low=float(low),
          high=float(high),
          touches=int(touches),
          last_touch_idx=int(last_touch_pos),
          score=float(score),
        )
      )

    # Keep strongest zones first
    zones.sort(key=lambda z: z.score, reverse=True)
    return zones[:max_zones_each]

  support_zones = make_zones(sup_groups, "support")
  resistance_zones = make_zones(res_groups, "resistance")

  return {"support": support_zones, "resistance": resistance_zones}


def nearest_zone(
  price: float,
  zones: List[SRZone],
) -> Optional[SRZone]:
  """
  Find the closest zone by distance to its center (level).
  Complexity: O(Z)
  """
  if not zones:
    return None

  best = min(zones, key=lambda z: abs(price - z.level))
  return best


# Example usage:
# df must have columns: Open, High, Low, Close (standard OHLC)
#
zones = build_sr_zones(
  df,
  left=5,
  right=5,
  atr_period=14,
  zone_width_atr=0.30,
  max_zones_each=8,
)

print("Support zones:")
for z in zones["support"]:
  print(z)

print("Resistance zones:")
for z in zones["resistance"]:
  print(z)


In [None]:
# What this script does
# 1) Draw candlesticks where:
#    - X-axis is time
#    - Width reflects time delta between bars
#    - Wick = High/Low
#    - Body = Open/Close
# 2) Overlay support/resistance zones as horizontal bands
#
# Time complexity:
# - O(N) candles
# - O(Z) zones
#
# Space complexity:
# - O(1) additional


from typing import Dict, List
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


zones = build_sr_zones(
df,
left=5,
right=5,
atr_period=14,
zone_width_atr=0.30,
max_zones_each=8,
)

df = load_prices_from_csv("data/spy_hourly.csv")

def plot_candles_with_sr_zones(
  df: pd.DataFrame,
  zones: Dict[str, List[object]],
  title: str = "Candlesticks with Support / Resistance Zones",
  max_support: int = 6,
  max_resistance: int = 6,
  zone_alpha: float = 0.12,
) -> None:

  # --- Safety checks ---
  required = {"Open", "High", "Low", "Close"}
  if not required.issubset(df.columns):
    raise ValueError(
      f"df must contain columns {required}"
    )

  if not pd.api.types.is_datetime64_any_dtype(df.index):
    raise ValueError("df index must be datetime")

  df = df.sort_index()

  times = df.index.to_numpy()
  opens = df["Open"].to_numpy()
  highs = df["High"].to_numpy()
  lows = df["Low"].to_numpy()
  closes = df["Close"].to_numpy()

  # --- Compute candle widths from time deltas ---
  # width[i] = time[i+1] - time[i]
  # last candle uses same width as previous
  deltas = np.diff(times).astype("timedelta64[s]").astype(float)
  if len(deltas) == 0:
    raise ValueError("Not enough data to plot candles")

  widths = np.append(deltas, deltas[-1])

  # Convert seconds to matplotlib date units (days)
  widths = widths / (24 * 60 * 60)

  fig, ax = plt.subplots(figsize=(14, 7))

  # --- Draw candlesticks ---
  for i in range(len(df)):
    t = times[i]
    o, h, l, c = opens[i], highs[i], lows[i], closes[i]

    color = "green" if c >= o else "red"

    # Wick (high-low)
    ax.vlines(
      t,
      l,
      h,
      color=color,
      linewidth=1,
    )

    # Body
    body_low = min(o, c)
    body_height = abs(c - o)

    ax.bar(
      t,
      body_height,
      bottom=body_low,
      width=widths[i],
      color=color,
      align="center",
      linewidth=0,
    )

  # --- Overlay SR zones ---
  
  support = (zones.get("support") or [])[:max_support]
  resistance = (zones.get("resistance") or [])[:max_resistance]

  for z in support:
    ax.axhspan(
      z.low,
      z.high,
      alpha=zone_alpha,
    )

  for z in resistance:
    ax.axhspan(
      z.low,
      z.high,
      alpha=zone_alpha,
    )

  # Remove duplicate legend entries
  handles, labels = ax.get_legend_handles_labels()
  by_label = dict(zip(labels, handles))
  ax.legend(by_label.values(), by_label.keys())

  ax.set_title(title)
  ax.set_xlabel("Time")
  ax.set_ylabel("Price")
  ax.grid(True, alpha=0.25)
  plt.tight_layout()
  plt.show()
  
  
if __name__ == "__main__":
  plot_candles_with_sr_zones(df, zones)

In [54]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, List, Optional, Sequence

import numpy as np
import pandas as pd


# -------------------------
# Trade record
# -------------------------

@dataclass
class Trade:
  entry_time: pd.Timestamp
  exit_time: pd.Timestamp
  zone_low: float
  zone_high: float
  entry: float
  stop: float
  tp: float
  exit_price: float
  outcome: str           # "tp" or "stop"
  r_multiple: float
  pnl_per_share: float



df_h = load_prices_from_csv("data/spy_hourly.csv")

zones_dict = build_sr_zones(
df_h,
left=5,
right=5,
atr_period=14,
zone_width_atr=0.30,
max_zones_each=8,
)
zones = zones_dict["support"] + zones_dict["resistance"]
df = load_prices_from_csv("data/spy_5min.csv")

# -------------------------
# Zone helpers (object or dict)
# -------------------------

def _z_low(z: Any) -> float:
  # O(1)

  if isinstance(z, dict):
    return float(z["low"])
  return float(getattr(z, "low"))


def _z_high(z: Any) -> float:
  # O(1)
  if isinstance(z, dict):
    return float(z["high"])
  return float(getattr(z, "high"))


def pick_zone_for_signal(
  bar_low: float,
  bar_close: float,
  zones: Sequence[Any],
) -> Optional[Any]:
  """
  Entry conditions (your spec):
  - z.low <= bar_low <= z.high
  - bar_close > z.high

  Multiple matches -> choose highest z.high.

  Time:  O(Z)
  Space: O(1)
  """
  best = None
  best_high = -np.inf
  for z in zones:
    zl = _z_low(z)
    zh = _z_high(z)

    if not (zl <= bar_low <= zh):
      continue

    if not (bar_close > zh):
      continue

    if zh > best_high:
      best_high = zh
      best = z

  return best


# -------------------------
# Backtester
# -------------------------

def backtest_zone_reclaim_5m(
  df: pd.DataFrame,
  zones: Sequence[Any],
  atr_period: int = 14,
  tp_r_multiple: float = 1.5,
  stop_atr_frac: float = 0.5,         # ATR/2
  conservative_same_bar_rule: bool = True,
) -> pd.DataFrame:
  """
  See header comments for rules.

  Time:  O(N * Z)
  Space: O(T)
  """

  required = {"Open", "High", "Low", "Close"}
  if not required.issubset(df.columns):
    raise ValueError(
      f"df must contain columns {required}"
    )

  df = df.copy()

  # Ensure datetime index
  if "Date" in df.columns:
    df["Date"] = pd.to_datetime(df["Date"])
    df = df.set_index("Date")

  df = df.sort_index()

  # ATR column
  # O(N)
  df["ATR"] = compute_atr(df, period=atr_period)

  trades: List[Trade] = []

  # Single-position state
  in_trade = False
  entry_time = None
  entry = stop = tp = None
  zone_low = zone_high = None

  for t, row in df.iterrows():
    o = float(row["Open"])
    h = float(row["High"])
    l = float(row["Low"])
    c = float(row["Close"])
    a = row["ATR"]

    # Skip until ATR exists
    # O(1)
    if pd.isna(a):
      continue

    a = float(a)

    # -------------------------
    # Manage open trade
    # -------------------------
    # O(1)
    if in_trade:
      stop_hit = (l <= float(stop))
      tp_hit = (h >= float(tp))

      if stop_hit and tp_hit:
        if conservative_same_bar_rule:
          exit_price = float(stop)
          outcome = "stop"
        else:
          exit_price = float(tp)
          outcome = "tp"
      elif stop_hit:
        exit_price = float(stop)
        outcome = "stop"
      elif tp_hit:
        exit_price = float(tp)
        outcome = "tp"
      else:
        continue

      risk = float(entry) - float(stop)
      r_mult = (
        (exit_price - float(entry)) / risk
        if risk > 0 else 0.0
      )
      pnl = exit_price - float(entry)

      trades.append(
        Trade(
          entry_time=entry_time,
          exit_time=t,
          zone_low=float(zone_low),
          zone_high=float(zone_high),
          entry=float(entry),
          stop=float(stop),
          tp=float(tp),
          exit_price=float(exit_price),
          outcome=outcome,
          r_multiple=float(r_mult),
          pnl_per_share=float(pnl),
        )
      )

      # Reset
      in_trade = False
      entry_time = None
      entry = stop = tp = None
      zone_low = zone_high = None

      continue

    # -------------------------
    # Find entry signal
    # -------------------------
    # O(Z)
    z = pick_zone_for_signal(
      bar_low=l,
      bar_close=c,
      zones=zones,
    )
    if z is None:
      continue

    z_l = _z_low(z)
    z_h = _z_high(z)

    # Entry fill assumption: signal bar close
    # O(1)
    entry_price = float(c)

    # Stop: ATR/2 below candle low
    stop_price = float(l) - (a * float(stop_atr_frac))

    risk = entry_price - stop_price
    if risk <= 0:
      continue

    tp_price = entry_price + (float(tp_r_multiple) * 2)

    # Enter
    in_trade = True
    entry_time = t
    entry = entry_price
    stop = stop_price
    tp = tp_price
    zone_low = z_l
    zone_high = z_h

  if not trades:
    return pd.DataFrame()

  out = pd.DataFrame([tr.__dict__ for tr in trades])

  out["win"] = (out["outcome"] == "tp").astype(int)
  out["minutes_held"] = (
    (out["exit_time"] - out["entry_time"])
    .dt.total_seconds()
    / 60.0
  )

  return out


def summarize_trades(
  trades: pd.DataFrame,
) -> None:
  if trades.empty:
    print("No trades generated.")
    return

  n = len(trades)
  win_rate = float(trades["win"].mean()) * 100.0
  avg_r = float(trades["r_multiple"].mean())
  total_r = float(trades["r_multiple"].sum())
  avg_hold = float(trades["minutes_held"].mean())

  print(f"Trades: {n}")
  print(f"Win rate: {win_rate:.2f}%")
  print(f"Avg R: {avg_r:.3f}")
  print(f"Total R: {total_r:.3f}")
  print(f"Avg hold (min): {avg_hold:.1f}")
  print("\nOutcomes:")
  print(trades["outcome"].value_counts())

trades = backtest_zone_reclaim_5m(df=df, zones=zones)
summarize_trades(trades)
trades.head()


Trades: 81
Win rate: 30.86%
Avg R: 0.265
Total R: 21.477
Avg hold (min): 519.0

Outcomes:
outcome
stop    56
tp      25
Name: count, dtype: int64


Unnamed: 0,entry_time,exit_time,zone_low,zone_high,entry,stop,tp,exit_price,outcome,r_multiple,pnl_per_share,win,minutes_held
0,2025-11-17 15:50:00+00:00,2025-11-17 16:00:00+00:00,670.148599,670.646383,671.159973,669.943743,674.159973,669.943743,stop,-1.0,-1.21623,0,10.0
1,2025-11-17 16:30:00+00:00,2025-11-17 17:50:00+00:00,670.148599,670.646383,670.840027,669.734351,673.840027,669.734351,stop,-1.0,-1.105676,0,80.0
2,2025-11-17 19:25:00+00:00,2025-11-17 19:35:00+00:00,664.83613,665.333914,665.839478,664.51872,668.839478,664.51872,stop,-1.0,-1.320757,0,10.0
3,2025-11-18 15:05:00+00:00,2025-11-18 16:00:00+00:00,656.756113,657.253897,658.880005,655.910843,661.880005,655.910843,stop,-1.0,-2.969162,0,55.0
4,2025-11-18 17:35:00+00:00,2025-11-18 17:55:00+00:00,658.588602,659.086386,661.149902,658.176886,664.149902,664.149902,tp,1.009076,3.0,1,20.0
