# Notebook 01 — Visual Debug Consolidation Detector (SMA20/SMA200 Trend Continuation)

Tujuan:
- Load data **XAUUSD M1** dari CSV (UTC).
- Resample ke **M1 / M5 / M15** (right-closed).
- Hitung **SMA20, SMA200, ATR14**.
- Deteksi **Strong Trend**.
- Deteksi **Consolidation**: **Rectangle** dan **Flag** secara deterministic.
- Visual debug: plot OHLC proxy + overlay box consolidation + titik breakout.

> Fokus notebook: validasi detector & overlay, **bukan** backtest profit.

In [None]:
# === Imports ===
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

plt.rcParams["figure.figsize"] = (14, 6)

In [None]:
# === User-provided loader ===
def load_ohlcv(path: str) -> pd.DataFrame:
    """
    Load data OHLCV dari CSV.
    - Baris pertama di-skip (label)
    - Kolom: date, time, open, high, low, close, volume
    - Timestamp dianggap UTC.
    """
    names = ['date', 'time', 'open', 'high', 'low', 'close', 'volume']

    df = pd.read_csv(
        path,
        header=None,
        names=names,
        dtype={'date': str, 'time': str},
        skiprows=1
    )

    df['timestamp'] = pd.to_datetime(
        df['date'] + ' ' + df['time'],
        format='%Y.%m.%d %H:%M',
        utc=True
    )

    df = df.drop(columns=['date', 'time'])
    df = df.sort_values('timestamp').reset_index(drop=True)
    df = df.set_index('timestamp')
    return df

In [None]:
# === Resample helper (right-closed, label=right) ===
def resample_ohlcv(df_m1: pd.DataFrame, tf: str) -> pd.DataFrame:
    """
    tf: '1min', '5min', '15min' (atau alias pandas: '1T','5T','15T')
    Right-closed + label=right => keputusan berbasis bar-close tanpa look-ahead.
    """
    agg = {
        "open": "first",
        "high": "max",
        "low": "min",
        "close": "last",
        "volume": "sum",
    }
    out = df_m1.resample(tf, closed="right", label="right").agg(agg).dropna()
    return out

In [None]:
# === Indicators ===
def sma(series: pd.Series, n: int) -> pd.Series:
    return series.rolling(n, min_periods=n).mean()

def atr(df: pd.DataFrame, n: int = 14) -> pd.Series:
    """
    ATR sederhana (rolling mean dari True Range) untuk keperluan debug.
    """
    high = df["high"]
    low = df["low"]
    close = df["close"]
    prev_close = close.shift(1)
    tr = pd.concat([
        (high - low),
        (high - prev_close).abs(),
        (low - prev_close).abs()
    ], axis=1).max(axis=1)
    return tr.rolling(n, min_periods=n).mean()

In [None]:
# === Strong Trend Detector (Spec v1.0) ===
def compute_trend_flags(df: pd.DataFrame, L: int = 20, slope_min_atr_mult: float = 0.1) -> pd.DataFrame:
    """
    Adds:
      - SMA20, SMA200, ATR14
      - slope_200 (per-bar)
      - bull_trend / bear_trend
      - strong_bull / strong_bear
    """
    out = df.copy()
    out["SMA20"] = sma(out["close"], 20)
    out["SMA200"] = sma(out["close"], 200)
    out["ATR14"] = atr(out, 14)

    out["slope_200"] = (out["SMA200"] - out["SMA200"].shift(L)) / L
    out["slope_20"]  = (out["SMA20"]  - out["SMA20"].shift(L)) / L

    out["bull_trend"] = out["SMA20"] > out["SMA200"]
    out["bear_trend"] = out["SMA20"] < out["SMA200"]

    slope_min = slope_min_atr_mult * out["ATR14"]
    out["strong_slope"] = out["slope_200"].abs() >= slope_min

    roll_min_low = out["low"].rolling(L, min_periods=L).min()
    roll_max_high = out["high"].rolling(L, min_periods=L).max()

    out["strong_bull"] = (
        out["bull_trend"]
        & out["strong_slope"]
        & (out["close"] > out["SMA20"])
        & (roll_min_low > out["SMA200"])
        & (np.sign(out["slope_20"]) == np.sign(out["slope_200"]))
    )

    out["strong_bear"] = (
        out["bear_trend"]
        & out["strong_slope"]
        & (out["close"] < out["SMA20"])
        & (roll_max_high < out["SMA200"])
        & (np.sign(out["slope_20"]) == np.sign(out["slope_200"]))
    )

    return out

In [None]:
# === Rectangle Consolidation Detector (Spec v1.0) ===
def detect_rectangle(df: pd.DataFrame, W: int = 10, k1: float = 0.8, k2: float = 0.3) -> pd.DataFrame:
    """
    Deterministic rectangle consolidation over trailing window W ending at t-1 (safety).
    Window definition: [t-W, t) where bar t is NOT included.

    Adds:
      - rect_active (bool at time t)
      - rect_high, rect_low, rect_size
    """
    out = df.copy()

    rect_high = out["high"].shift(1).rolling(W, min_periods=W).max()
    rect_low  = out["low"].shift(1).rolling(W, min_periods=W).min()
    rect_size = rect_high - rect_low

    close_std = out["close"].shift(1).rolling(W, min_periods=W).std()

    close_max = out["close"].shift(1).rolling(W, min_periods=W).max()
    close_min = out["close"].shift(1).rolling(W, min_periods=W).min()
    cond3 = (close_min >= rect_low) & (close_max <= rect_high)

    cond4 = (np.sign(out["slope_20"]) == np.sign(out["slope_200"]))

    cond1 = rect_size <= (k1 * out["ATR14"])
    cond2 = close_std <= (k2 * out["ATR14"])

    out["rect_active"] = cond1 & cond2 & cond3 & cond4
    out["rect_high"] = rect_high
    out["rect_low"] = rect_low
    out["rect_size"] = rect_size
    return out

In [None]:
# === Flag Consolidation Detector (Spec v1.0) ===
def _rolling_slope(series: pd.Series, win: int) -> pd.Series:
    """
    Rolling linear-regression slope on values, window ends at t-1 (series already shifted by caller).
    """
    idx = np.arange(win)
    denom = ((idx - idx.mean())**2).sum()

    def slope_arr(a: np.ndarray) -> float:
        if np.any(np.isnan(a)):
            return np.nan
        return (((idx - idx.mean()) * (a - a.mean())).sum()) / denom

    return series.rolling(win, min_periods=win).apply(slope_arr, raw=True)

def detect_flag(
    df: pd.DataFrame,
    impulse_k3: float = 1.5,
    impulse_M: int = 10,
    pullback_k4: float = 0.5,
    pullback_W: int = 8
) -> pd.DataFrame:
    """
    Deterministic bull/bear flag detector.
    - Impulse measured over last impulse_M bars ending at t-1.
    - Pullback measured over last pullback_W bars ending at t-1.

    Adds:
      - bull_flag_active, bear_flag_active
      - flag_high, flag_low
    """
    out = df.copy()

    # Impulse window ends at t-1
    imp_high = out["high"].shift(1).rolling(impulse_M, min_periods=impulse_M).max()
    imp_low  = out["low"].shift(1).rolling(impulse_M, min_periods=impulse_M).min()
    imp_range = imp_high - imp_low

    # Pullback window ends at t-1
    pb_high = out["high"].shift(1).rolling(pullback_W, min_periods=pullback_W).max()
    pb_low  = out["low"].shift(1).rolling(pullback_W, min_periods=pullback_W).min()
    pb_range = pb_high - pb_low

    # Slopes (shift close by 1 so window excludes bar t)
    pb_slope = _rolling_slope(out["close"].shift(1), pullback_W)
    imp_slope = _rolling_slope(out["close"].shift(1), impulse_M)

    impulse_ok = (imp_range >= impulse_k3 * out["ATR14"])
    pb_ok = (pb_range <= pullback_k4 * imp_range)

    out["bull_flag_active"] = (
        out["strong_bull"]
        & impulse_ok
        & pb_ok
        & (imp_slope > 0)
        & (pb_slope < 0)
        & (pb_low > out["SMA200"])
        & (pb_high < imp_high)
    )

    out["bear_flag_active"] = (
        out["strong_bear"]
        & impulse_ok
        & pb_ok
        & (imp_slope < 0)
        & (pb_slope > 0)
        & (pb_high < out["SMA200"])
        & (pb_low > imp_low)
    )

    out["flag_high"] = pb_high
    out["flag_low"] = pb_low
    return out

In [None]:
# === Breakout markers (for visual debug) ===
def mark_breakouts(df: pd.DataFrame) -> pd.DataFrame:
    """
    Consolidation bounds:
      - Use rectangle bounds when rect_active
      - Else use flag bounds when flag active
    Breakout markers at time t:
      - bull_breakout if close[t] > cons_high
      - bear_breakout if close[t] < cons_low
    """
    out = df.copy()

    cons_high = np.where(out["rect_active"], out["rect_high"], np.nan)
    cons_low  = np.where(out["rect_active"], out["rect_low"],  np.nan)
    cons_type = np.where(out["rect_active"], "RECT", "")

    use_flag = (~out["rect_active"]) & (out["bull_flag_active"] | out["bear_flag_active"])
    cons_high = np.where(use_flag, out["flag_high"], cons_high)
    cons_low  = np.where(use_flag, out["flag_low"],  cons_low)
    cons_type = np.where(
        use_flag,
        np.where(out["bull_flag_active"], "FLAG_BULL", "FLAG_BEAR"),
        cons_type
    )

    out["cons_high"] = cons_high
    out["cons_low"] = cons_low
    out["cons_type"] = cons_type

    out["bull_breakout"] = (out["close"] > out["cons_high"]) & out["cons_high"].notna()
    out["bear_breakout"] = (out["close"] < out["cons_low"])  & out["cons_low"].notna()
    return out

In [None]:
# === Plotting helper (candlestick-like proxy) ===
def plot_debug(df: pd.DataFrame, start: str, end: str, title: str = ""):
    """
    Plot OHLC proxy + SMA20/200 + consolidation bands + breakout markers.
    """
    d = df.loc[start:end].copy()
    if d.empty:
        raise ValueError("Selected window empty. Check START/END.")

    x = np.arange(len(d))
    fig, ax = plt.subplots(figsize=(16, 7))

    # Wick (high-low)
    ax.vlines(x, d["low"].values, d["high"].values, linewidth=0.7)

    # Open/close ticks
    ax.hlines(d["open"].values, x-0.2, x, linewidth=1.2)
    ax.hlines(d["close"].values, x, x+0.2, linewidth=1.2)

    # MAs
    ax.plot(x, d["SMA20"].values, linewidth=1.2, label="SMA20")
    ax.plot(x, d["SMA200"].values, linewidth=1.2, label="SMA200")

    # Consolidation bounds
    ax.plot(x, d["cons_high"].values, linewidth=1.0, label="Cons High")
    ax.plot(x, d["cons_low"].values, linewidth=1.0, label="Cons Low")

    # Breakouts
    bull_idx = np.where(d["bull_breakout"].values)[0]
    bear_idx = np.where(d["bear_breakout"].values)[0]
    ax.scatter(bull_idx, d["close"].iloc[bull_idx].values, marker="^", s=60, label="Bull Breakout")
    ax.scatter(bear_idx, d["close"].iloc[bear_idx].values, marker="v", s=60, label="Bear Breakout")

    # Label consolidation type transitions
    last = None
    for i, ct in enumerate(d["cons_type"].values):
        if ct != last and ct != "":
            ax.text(i, d["high"].iloc[i], ct, fontsize=8, rotation=90, va="bottom")
        last = ct

    ax.set_title(title)
    ax.set_xlabel("Bar index (local window)")
    ax.set_ylabel("Price")
    ax.legend()
    ax.grid(True)
    plt.show()

## Run: Load → Resample → Compute → Detect → Plot

1) Set `CSV_PATH` sesuai lokasi file.  
2) Set `TF` (M1/M5/M15).  
3) Pilih `START` & `END` window untuk debug.

In [None]:
# === CONFIG ===
CSV_PATH = "XAUUSD_M1.csv"  # ganti path (absolute OK)
TF = "5min"                 # "1min" | "5min" | "15min"

START = "2025-01-02 00:00:00+00:00"
END   = "2025-01-02 06:00:00+00:00"

# Trend params
L = 20
slope_min_atr_mult = 0.1

# Rectangle params
W_rect = 10
k1 = 0.8
k2 = 0.3

# Flag params
impulse_k3 = 1.5
impulse_M = 10
pullback_k4 = 0.5
pullback_W = 8

In [None]:
# === PIPELINE ===
df_m1 = load_ohlcv(CSV_PATH)
df_tf = df_m1 if TF == "1min" else resample_ohlcv(df_m1, TF)

df_tf = compute_trend_flags(df_tf, L=L, slope_min_atr_mult=slope_min_atr_mult)
df_tf = detect_rectangle(df_tf, W=W_rect, k1=k1, k2=k2)
df_tf = detect_flag(df_tf, impulse_k3=impulse_k3, impulse_M=impulse_M, pullback_k4=pullback_k4, pullback_W=pullback_W)
df_tf = mark_breakouts(df_tf)

print("Rows:", len(df_tf))
print("Strong bull:", int(df_tf["strong_bull"].sum()), "Strong bear:", int(df_tf["strong_bear"].sum()))
print("Rectangle active:", int(df_tf["rect_active"].sum()))
print("Bull flag active:", int(df_tf["bull_flag_active"].sum()), "Bear flag active:", int(df_tf["bear_flag_active"].sum()))
print("Bull breakouts:", int(df_tf["bull_breakout"].sum()), "Bear breakouts:", int(df_tf["bear_breakout"].sum()))

In [None]:
# === PLOT DEBUG WINDOW ===
plot_debug(
    df_tf,
    start=START,
    end=END,
    title=f"Debug Consolidation Detector | TF={TF} | {START} → {END}"
)

## Tips Debug cepat
- Terlalu sedikit setup: naikkan `k1/k2`, atau naikkan `W_rect` / `pullback_W`.
- Terlalu banyak: turunkan `k1/k2` atau naikkan `slope_min_atr_mult`.
- Jangan optimasi parameter dulu; pastikan **definisi** sesuai ekspektasi visual.