# Barridas de apertura: análisis cuantitativo de reversión

Notebook guiado para detectar barridas de apertura (sweeps), etiquetar los eventos con triple barrier y estudiar su probabilidad de reversión.


## 1. Cargar entorno del motor
- Importa utilidades del motor de backtesting (data loader, analytics, utils).
- Carga el CSV bruto del usuario con columnas estándar: `timestamp`, `open`, `high`, `low`, `close`, `volume`.


In [None]:
from pathlib import Path
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from src.data.feeds import NPZOHLCVFeed
from src.analytics.metrics import equity_curve_metrics
from src.utils.risk import compute_position_size

pd.set_option('display.max_columns', 50)
sns.set(style='whitegrid')


In [None]:
# Ruta al CSV intradía del usuario
csv_path = Path('data/user_intraday.csv')

# Carga del dataset
df_raw = pd.read_csv(csv_path, parse_dates=['timestamp'])
df_raw = df_raw.rename(columns=str.lower)
df_raw = df_raw.sort_values('timestamp').set_index('timestamp')
df_raw.head()


## 2. Construcción del dataset intradía
- Opcional: resamplear a una granularidad homogénea (ej. 1m).
- Añadir métricas: ATR(14), volumen relativo (RVOL), rango, true range (TR) y retornos futuros.


In [None]:
# Parámetros de preprocesado
resample_rule = '1min'  # Usa None si ya está a 1m
atr_window = 14

df = df_raw.copy()
if resample_rule:
    ohlc = {
        'open': 'first',
        'high': 'max',
        'low': 'min',
        'close': 'last',
        'volume': 'sum',
    }
    df = df.resample(resample_rule).apply(ohlc).dropna(how='any')

# Métricas básicas
df['rango'] = df['high'] - df['low']
prev_close = df['close'].shift(1)
components = pd.concat([df['high'] - prev_close, prev_close - df['low'], df['rango']], axis=1)
df['tr'] = components.max(axis=1)
df['atr'] = df['tr'].rolling(atr_window).mean()
df['volumen_medio'] = df['volume'].rolling(atr_window).mean()
df['rvol'] = df['volume'] / df['volumen_medio']
df['return_5m'] = df['close'].pct_change(periods=5) * 100
df['return_10m'] = df['close'].pct_change(periods=10) * 100
df = df.dropna()
df.head()


## 3. Detección de eventos de posible barrida
Condiciones parametrizables: caída brusca, rango amplio vs ATR y volumen inusualmente alto.


In [None]:
def detect_sweep_events(
    data: pd.DataFrame,
    x_pct: float = 0.5,
    k: float = 1.5,
    rvol_percentile: float = 0.8,
) -> pd.DataFrame:
    "Filtra barras que cumplan con el patrón de barrida."
    threshold = data['rvol'].quantile(rvol_percentile)
    conditions = (
        (data['return_5m'] < -x_pct)
        & (data['rango'] > k * data['atr'])
        & (data['rvol'] > threshold)
    )
    events = data.loc[conditions].copy()
    events['rvol_threshold'] = threshold
    events['sweep_params'] = {'x_pct': x_pct, 'k': k, 'rvol_percentile': rvol_percentile}
    return events

events = detect_sweep_events(df, x_pct=0.5, k=1.5, rvol_percentile=0.85)
events.head()


## 4. Etiquetado avanzado (triple barrier)
Aplicamos la metodología de López de Prado con TP/SL en función del ATR y un horizonte máximo de 1h.


In [None]:
from typing import Dict, Any

def apply_triple_barrier(
    data: pd.DataFrame,
    events: pd.DataFrame,
    tp_mult: float = 1.5,
    sl_mult: float = 1.0,
    max_horizon: str = '60min',
) -> pd.DataFrame:
    labels = []
    outcomes: list[Dict[str, Any]] = []
    horizon_delta = pd.Timedelta(max_horizon)

    for ts, row in events.iterrows():
        entry = float(row['close'])
        atr = float(row['atr'])
        tp = entry + tp_mult * atr
        sl = entry - sl_mult * atr

        future = data.loc[ts : ts + horizon_delta]
        hit_tp = future[future['high'] >= tp].head(1)
        hit_sl = future[future['low'] <= sl].head(1)

        tp_time = hit_tp.index[0] if not hit_tp.empty else None
        sl_time = hit_sl.index[0] if not hit_sl.empty else None

        if tp_time is not None and (sl_time is None or tp_time <= sl_time):
            label = 1
            exit_time = tp_time
            exit_price = tp
        elif sl_time is not None:
            label = -1
            exit_time = sl_time
            exit_price = sl
        else:
            label = 0
            exit_time = future.index[-1]
            exit_price = float(future['close'].iloc[-1])

        labels.append(label)
        outcomes.append(
            {
                'entry_price': entry,
                'exit_price': exit_price,
                'exit_time': exit_time,
                'tp': tp,
                'sl': sl,
                'horizon': max_horizon,
            }
        )

    labeled = events.copy()
    labeled['label'] = labels
    labeled = pd.concat([labeled, pd.DataFrame(outcomes, index=events.index)], axis=1)
    labeled['holding_minutes'] = (labeled['exit_time'] - labeled.index).dt.total_seconds() / 60
    labeled['pnl_pct'] = (labeled['exit_price'] / labeled['entry_price'] - 1) * 100
    return labeled

labeled_events = apply_triple_barrier(df, events, tp_mult=1.5, sl_mult=1.0, max_horizon='60min')
labeled_events.head()


## 5. Estadística de probabilidad de reversal
Calculamos la probabilidad de alcanzar TP, distribución de retornos futuros y comparación por volumen.


In [None]:
def forward_returns(data: pd.DataFrame, horizons=(5, 10, 15, 30, 60)) -> pd.DataFrame:
    result = {}
    for h in horizons:
        result[f'return_{h}m_fwd'] = data['close'].pct_change(periods=h).shift(-h) * 100
    return pd.DataFrame(result, index=data.index)

fwd = forward_returns(df)
labeled_events = labeled_events.join(fwd, how='left')

p_tp = (labeled_events['label'] == 1).mean() if not labeled_events.empty else np.nan
ret_columns = [c for c in labeled_events.columns if c.startswith('return_') and c.endswith('m_fwd')]
ret_stats = labeled_events[ret_columns].describe() if ret_columns else pd.DataFrame()

high_rvol_cut = df['rvol'].quantile(0.9)
hi_volume_events = labeled_events[labeled_events['rvol'] > high_rvol_cut]
normal_volume_events = labeled_events[labeled_events['rvol'] <= high_rvol_cut]

p_tp_high = (hi_volume_events['label'] == 1).mean() if not hi_volume_events.empty else np.nan
p_tp_normal = (normal_volume_events['label'] == 1).mean() if not normal_volume_events.empty else np.nan

summary = {
    'p_tp_global': p_tp,
    'p_tp_high_rvol': p_tp_high,
    'p_tp_normal': p_tp_normal,
    'n_events': len(labeled_events),
    'n_high_rvol': len(hi_volume_events),
    'n_normal': len(normal_volume_events),
}
summary


In [None]:
# Curva de equity simulada: entrar long tras la barrida y salir con triple barrier
equity = [1_000_000]
eq_index = []
for ts, row in labeled_events.iterrows():
    eq_index.append(row['exit_time'])
    equity.append(equity[-1] * (1 + row['pnl_pct'] / 100))

equity_series = pd.Series(equity[1:], index=pd.to_datetime(eq_index)).sort_index()
eq_metrics = equity_curve_metrics(equity_series) if not equity_series.empty else {}
eq_metrics


## 6. Visualización
Incluimos ejemplos de barridas detectadas, heatmaps y distribución de outcomes.


In [None]:
def plot_sweep_example(data: pd.DataFrame, events: pd.DataFrame, window: int = 60):
    if events.empty:
        print('No hay eventos para mostrar')
        return
    ts = events.index[0]
    segment = data.loc[ts - pd.Timedelta(minutes=window): ts + pd.Timedelta(minutes=window)]
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(segment.index, segment['close'], label='Close')
    ax.axvline(ts, color='red', linestyle='--', label='Evento')
    ax.set_title('Ejemplo de barrida detectada')
    ax.legend()
    plt.show()

plot_sweep_example(df, labeled_events)


In [None]:
# Heatmap: retorno esperado según RVOL y TR/ATR
if not labeled_events.empty:
    labeled_events['tr_atr_ratio'] = labeled_events['tr'] / labeled_events['atr']
    labeled_events['rvol_bin'] = pd.qcut(labeled_events['rvol'], q=5, duplicates='drop')
    labeled_events['tr_bin'] = pd.qcut(labeled_events['tr_atr_ratio'], q=5, duplicates='drop')
    pivot = labeled_events.pivot_table(
        values='pnl_pct', index='rvol_bin', columns='tr_bin', aggfunc='mean'
    )
    plt.figure(figsize=(8, 5))
    sns.heatmap(pivot, annot=True, fmt='.2f', cmap='coolwarm')
    plt.title('Retorno medio (%) por RVOL y TR/ATR')
    plt.show()


In [None]:
# Distribución de outcomes del triple barrier
if not labeled_events.empty:
    plt.figure(figsize=(6, 4))
    sns.countplot(x='label', data=labeled_events)
    plt.title('Distribución de etiquetas triple barrier')
    plt.show()


## 7. Conclusión automática
Resumen textual de si existe edge, mejores condiciones y relación con volumen.


In [None]:
def auto_summary(stats: dict, eq_metrics: dict) -> str:
    lines = []
    p_tp_val = stats.get('p_tp_global', np.nan)
    lines.append(f"Probabilidad de TP: {p_tp_val:.2%}" if not np.isnan(p_tp_val) else 'Probabilidad de TP: n/d')
    if eq_metrics:
        sharpe_val = eq_metrics.get('sharpe_ratio', float('nan'))
        lines.append(f"Sharpe simulado 1h: {sharpe_val:.2f}")
    else:
        lines.append('Sharpe simulado 1h: n/d')
    lines.append(f"Eventos analizados: {stats.get('n_events', 0)}")
    high_rvol = stats.get('p_tp_high_rvol', np.nan)
    normal_rvol = stats.get('p_tp_normal', np.nan)
    if not np.isnan(high_rvol):
        lines.append(f"Más edge en rvol alto? p(TP|rvol>p90) = {high_rvol:.2%}")
    if not np.isnan(normal_rvol):
        lines.append(f"p(TP|rvol<=p90) = {normal_rvol:.2%}")
    lines.append('Ventana óptima: ajustar x_pct/k según mayor retorno medio en heatmap.')
    return "\n".join(lines)

print(auto_summary(summary, eq_metrics))


### Checklist de métricas calculadas
- Probabilidad de reversal: `p_tp_global` y condicionada por volumen.
- Expectativas condicionadas: estadísticos de `return_{5,10,15,30,60}m_fwd`.
- Sharpe simulado de estrategia de entrada inmediata post-shock.
- Autocorrelaciones pre/post evento se pueden añadir con `df['close'].pct_change().autocorr(lag)`.
- Distribución de volumen y colas revisada con los percentiles de `rvol`.
