In [None]:
import datetime
import pandas as pd
from scipy.stats import binomtest
import numpy as np
import os
import matplotlib.pyplot as plt
from binance_historical_data import BinanceDataDumper
from itertools import product
from joblib import Parallel, delayed
from contextlib import redirect_stdout, redirect_stderr


# Download data functions

In [None]:

def download_data_from_binance(symbol, temporalidad, initial, end):
    dumper = BinanceDataDumper(
        path_dir_where_to_dump="data",
        asset_class="spot",
        data_type="klines",
        data_frequency=temporalidad
    )

    current = datetime.date(initial[0], initial[1], initial[2])
    end = datetime.date(end[0], end[1], end[2])

    while current <= end:
        next_month = (current.replace(day=1) + datetime.timedelta(days=32)).replace(day=1)
        print(f"Bajando {current} a {next_month - datetime.timedelta(days=1)}")
        dumper.dump_data(
            tickers=[symbol],
            date_start=current,
            date_end=next_month - datetime.timedelta(days=1)
        )
        current = next_month


In [None]:
import datetime, os, sys, logging
from contextlib import contextmanager
from binance_historical_data import BinanceDataDumper

@contextmanager
def squelch_all_output():
    # silenciar logging
    prev_level = logging.root.level
    logging.disable(logging.CRITICAL)

    # silenciar tqdm
    old_tqdm = os.environ.get("TQDM_DISABLE")
    os.environ["TQDM_DISABLE"] = "1"

    # redirigir stdout/stderr a /dev/null a nivel fd
    devnull = os.open(os.devnull, os.O_WRONLY)
    saved_fds = (os.dup(1), os.dup(2))
    os.dup2(devnull, 1)
    os.dup2(devnull, 2)
    try:
        yield
    finally:
        os.dup2(saved_fds[0], 1)
        os.dup2(saved_fds[1], 2)
        os.close(saved_fds[0]); os.close(saved_fds[1]); os.close(devnull)
        if old_tqdm is None:
            del os.environ["TQDM_DISABLE"]
        else:
            os.environ["TQDM_DISABLE"] = old_tqdm
        logging.disable(prev_level)

def download_data_from_binance(symbol, temporalidad, initial, end, quiet=True):
    dumper = BinanceDataDumper(
        path_dir_where_to_dump="data",
        asset_class="spot",
        data_type="klines",
        data_frequency=temporalidad
    )

    # carpeta final esperada
    final_path = f"data/spot/daily/klines/{symbol}/{temporalidad}"

    if os.path.exists(final_path):
        print(f"[SKIP] Ya existe {final_path}, no se descarga nada.")
        return

    current = datetime.date(*initial)
    end_dt  = datetime.date(*end)

    while current <= end_dt:
        next_month = (current.replace(day=1) + datetime.timedelta(days=32)).replace(day=1)
        period_end = min(next_month - datetime.timedelta(days=1), end_dt)

        print(f"[INFO] Bajando {symbol} ({temporalidad}) desde {current} hasta {period_end}...")
        if quiet:
            with squelch_all_output():
                dumper.dump_data(tickers=[symbol], date_start=current, date_end=period_end)
        else:
            dumper.dump_data(tickers=[symbol], date_start=current, date_end=period_end)

        print(f"[OK] {symbol} {temporalidad} {period_end} listo.")
        current = next_month


In [None]:
def parse_binance_files(folder_path, expected_timeframe_seconds=None):

    column_names = [
        'open_time', 'open', 'high', 'low', 'close', 'volume',
        'close_time', 'quote_asset_volume', 'number_of_trades',
        'taker_buy_volume_base', 'taker_buy_volume_quote', 'ignore'
    ]

    float_cols = ['open', 'high', 'low', 'close', 'volume',
                'quote_asset_volume', 'taker_buy_volume_base', 'taker_buy_volume_quote']
    
    dfs = []
    files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(".csv")]

    for f in files:
        df = pd.read_csv(f, names=column_names)
        for unit in ['ms', 'us']:
            try:
                df['date'] = pd.to_datetime(df['open_time'], unit=unit)
                df['close_time'] = pd.to_datetime(df['close_time'], unit=unit)
                delta = df['date'].diff().dt.total_seconds().dropna()
                mode_delta = delta.mode().iloc[0]

                if expected_timeframe_seconds is None or mode_delta == expected_timeframe_seconds:
                    break
            except Exception:
                continue
        else:
            print(f"Archivo descartado por timeframe: {f}")
            continue

        df[float_cols] = df[float_cols].astype(float)
        df = df.drop(columns='ignore')
        dfs.append(df)

    if not dfs:
        print("No se cargaron archivos.")
        return pd.DataFrame()

    df = pd.concat(dfs).sort_values('open_time').reset_index(drop=True)
    
    # Crear columna con hora
    df['hora'] = df['date'].dt.time

    # Calcular tendencia
    df['ema20'] = df['close'].ewm(span=20, adjust=False).mean()
    df['tendency'] = np.where(df['close'] > df['ema20'], 1, -1)
    df['prev_tendency'] = df['tendency'].shift(1)

    # Tipo de vela y tamaño
    df['type'] = np.where(df['close'] > df['open'], 'up', 'dw')
    df['size'] = abs(df['close'] - df['open'])

    # Engulf detection
    o1 = df['open'].shift(1)
    c1 = df['close'].shift(1)
    o2 = df['open'].shift(2)
    c2 = df['close'].shift(2)

    engulf_up = (
        (c1 > o1) & (c2 < o2) &
        (o1 < c2) & (c1 > o2)
    )

    engulf_dw = (
        (c1 < o1) & (c2 > o2) &
        (o1 > c2) & (c1 < o2)
    )

    df['engulf'] = 0
    df.loc[engulf_up, 'engulf'] = 1
    df.loc[engulf_dw, 'engulf'] = -1

    return df


# N-order matrix

In [None]:
# Función corregida para usar la vela siguiente como target
def order_n_matrix(df,
                    order=1,
                    time_filter=None,
                    use_engulf=False,
                    vis=False):
    
    # Shift para armar los patrones del pasado
    for i in range(1, order + 1):
        df[f'prev_{i}'] = df['type'].shift(i)

    # Shift futuro: target = vela siguiente
    df['next_type'] = df['type'].shift(-1)
    
    # Crear patrón
    pattern_cols = [f'prev_{i}' for i in range(order, 0, -1)]
    df = df[df[f'prev_{order}'].notna()].reset_index(drop=True)
    df['candle_pattern'] = df[pattern_cols].agg('_'.join, axis=1)
    
    # Filtro horario si aplica
    if time_filter:
        h_open, m_open = time_filter["open"]
        h_close, m_close = time_filter["close"]
        df = df[
            (df['hora'] >= datetime.time(h_open, m_open)) &
            (df['hora'] <= datetime.time(h_close, m_close))
        ].copy()

    # Agrupación por patrón + tendencia + resultado siguiente
    group_cols = ['candle_pattern', 'tendency', 'next_type']
    if use_engulf:
        group_cols.append('engulf')

    transition_counts = df.groupby(group_cols).size().unstack(fill_value=0)
    transition_matrix = transition_counts.div(transition_counts.sum(axis=1), axis=0)

    if 'compute_tests' in globals():
        tests = transition_counts.apply(compute_tests, axis=1)
        transition_matrix = transition_matrix.join(tests)

    if vis:
        print("Matriz de transición (probabilidades):")
        print(transition_matrix.round(4))
        print("\nMatriz de transición (ocurrencias):")
        print(transition_counts.round(4))

    return transition_matrix, transition_counts, df


In [None]:

# Calcular los tests y agregarlos como columnas
def compute_tests(row):
    up = row.get('up', 0)
    dw = row.get('dw', 0)
    total = up + dw

    # Test binomial (¿es el número de 'up' significativamente distinto de 50%?)
    binom_p = binomtest(up, n=total, p=0.5, alternative='two-sided').pvalue if total > 0 else np.nan

    return pd.Series({ 'binom_p': binom_p})

# Estrategia

In [None]:
def compute_expansions(df, pattern):
    orden = ORDER
    
    if pattern[2] == "up":
        favorable_col = "high"
        adversarial_col = "low"
    else:
        favorable_col = "low"
        adversarial_col = "high"
    
    favorable = []
    adversarial = []

    for i in range(orden, len(df) - 1):
        tendency = df.loc[i - 1, "tendency"]
        candle_pattern = "_".join(df.loc[i - orden : i - 1, "type"].values.tolist())
        if candle_pattern == pattern[0] and tendency == pattern[1]:
            close_prev = df.loc[i - 1, "close"]
            favorable.append(abs(df.loc[i, favorable_col] - close_prev))
            adversarial.append(abs(df.loc[i, adversarial_col] - close_prev))

    fav = np.array(favorable)
    adv = np.array(adversarial)

    return {
        "favorable": {
            "mean": round(np.mean(fav), 2),
            "median": round(np.median(fav), 2),
            "p90": round(np.percentile(fav, 90), 2),
            "p95" : 2*round(np.percentile(fav, 95), 2)
        },
        "adversarial": {
            "mean": round(np.mean(adv), 2),
            "median": round(np.median(adv), 2),
            "p90": round(np.percentile(adv, 90), 2),
            "p95" : 2*round(np.percentile(fav, 95), 2)            
        }
    }


In [None]:
def plot_conditional_equity(df_equity, full_pattern):
    
    y = df_equity["capital"]
    x = np.arange(y.size, dtype=float)
    slope = float(np.polyfit(x, y, 1)[0])
    ratio_above = (y > 0).mean()
    capital_final = float(y.iloc[-1])
    
    valid_patterns = []
    if slope > 0 and ratio_above > 0.5 and capital_final > 2*abs(min(y)) :
        plt.figure(figsize=(16, 8))
        plt.plot(df_equity["entry_time"], y, label="Equity")
        plt.xlabel("Fecha")
        plt.ylabel("Capital")
        plt.title(f"Curva de Equity para patrón {full_pattern[0]}, tendencia {full_pattern[1]}, y dirección {full_pattern[2]}")
        plt.grid(True)
        plt.legend()
        plt.show()
        
        valid_patterns.append(full_pattern[:-1])
    
    return valid_patterns

In [None]:
def plot_conditional_equity(df_equity, full_pattern, exp=None):
    
    y = df_equity["capital"]
    x = np.arange(y.size, dtype=float)
    slope = float(np.polyfit(x, y, 1)[0])
    ratio_above = (y > 0).mean()
    capital_final = float(y.iloc[-1])
    
    valid_patterns = []
    if slope > 0 and ratio_above > 0.5 and capital_final > 2*abs(min(y)) :
        plt.figure(figsize=(16, 8))
        plt.plot(df_equity["entry_time"], y, label="Equity")
        plt.xlabel("Fecha")
        plt.ylabel("Capital")
        titulo = f"Curva de Equity para patrón {full_pattern[0]}, tendencia {full_pattern[1]}, y dirección {full_pattern[2]}"
        if exp:
            titulo += f" para la expansión {exp}"
        plt.title(titulo)
        plt.grid(True)
        plt.legend()
        plt.show()
        valid_patterns.append(full_pattern[:-1] + (full_pattern[-1]["adversarial"][exp],))
    
    return valid_patterns

In [None]:
def run_simulation(df, 
                pattern_with_expansions,
                stake=1.0,
                comission=0.0,
                tp_val=None,
                sl_val=None):
    
    orden = ORDER
    pattern, tendency, direction_str, _ = pattern_with_expansions
    direction = 1 if direction_str == "up" else -1
    capital = 0.0
    results = []
    for i in range(orden, len(df) - 1):
        if "_".join(df.loc[i - orden:i - 1, "type"].values.tolist()) == pattern and df.loc[i - 1, "tendency"] == tendency:
            entry_time = df.loc[i, "date"]
            row = df.iloc[i]
            entry_price = row["open"]
            high = row["high"]
            low = row["low"]
            
            if tp_val is not None and sl_val is not None:
                sl_real = sl_val
                tp_real = tp_val
                
                sl_triggered = (low <= entry_price - sl_real) if direction == 1 else (high >= entry_price + sl_real)
                tp_triggered = (high >= entry_price + tp_real) if direction == 1 else (low <= entry_price - tp_real)
                
                if sl_triggered and tp_triggered:
                    pnl = -1.0              #------------------>UPGRADEAR?
                elif sl_triggered:
                    pnl = -1.0
                elif tp_triggered:
                    pnl = tp_real / sl_real
                else:
                    close_price = row["close"]
                    move = (close_price - entry_price) * direction
                    pnl = move / sl_real
            else:
                close_price = row["close"]
                move = (close_price - entry_price) * direction
                pnl = move
            
            capital += pnl * stake - comission
            
            results.append({
                "index": i,
                "entry_time": entry_time,
                "entry_price": entry_price,
                "exit_price": row["close"],
                "pnl": pnl,
                "capital": capital,
            })
    
    df_results = pd.DataFrame(results)
    n_trades = len(df_results)
    n_days = df_results["entry_time"].dt.date.nunique() if not df_results.empty else 0
    avg_trades_per_day = n_trades / n_days if n_days > 0 else 0.0
    mdd_capital = compute_max_drawdown(df_results["capital"]) if not df_results.empty else {"mdd": 0.0, "mdd_pct": 0.0}
    
    df_results.attrs["metrics"] = {
        "mdd_capital": mdd_capital,
        "n_trades": int(n_trades),
        "avg_trades_per_day": float(avg_trades_per_day),
        "final_capital": float(capital),
    }

    return df_results


In [None]:
def run_simulation_sl_only(df, 
                pattern_with_expansions,
                stake=1.0,
                comission=0.0,
                sl_val=None):
    
    orden = ORDER
    pattern, tendency, direction_str, _ = pattern_with_expansions
    direction = 1 if direction_str == "up" else -1
    capital = 0.0
    results = []
    for i in range(orden, len(df) - 1):
        if "_".join(df.loc[i - orden:i - 1, "type"].values.tolist()) == pattern and df.loc[i - 1, "tendency"] == tendency:
            entry_time = df.loc[i, "date"]
            row = df.iloc[i]
            entry_price = row["open"]
            high = row["high"]
            low = row["low"]
            
            if sl_val is not None:
                sl_real = sl_val
                
                sl_triggered = (low <= entry_price - sl_real) if direction == 1 else (high >= entry_price + sl_real)
                
                if sl_triggered:
                    pnl = -1.0
                else:
                    close_price = row["close"]
                    move = (close_price - entry_price) * direction
                    pnl = move / sl_real
            else:
                close_price = row["close"]
                move = (close_price - entry_price) * direction
                pnl = move
            
            capital += pnl * stake - comission
            
            results.append({
                "index": i,
                "entry_time": entry_time,
                "entry_price": entry_price,
                "exit_price": row["close"],
                "pnl": pnl,
                "capital": capital,
            })
    
    df_results = pd.DataFrame(results)
    n_trades = len(df_results)
    n_days = df_results["entry_time"].dt.date.nunique() if not df_results.empty else 0
    avg_trades_per_day = n_trades / n_days if n_days > 0 else 0.0
    mdd_capital = compute_max_drawdown(df_results["capital"]) if not df_results.empty else {"mdd": 0.0, "mdd_pct": 0.0}
    
    df_results.attrs["metrics"] = {
        "mdd_capital": mdd_capital,
        "n_trades": int(n_trades),
        "avg_trades_per_day": float(avg_trades_per_day),
        "final_capital": float(capital),
    }

    return df_results


In [None]:

def simulate_continuous_strategies(df,
                                    pattern_with_expansions,
                                    stake=1.0,
                                    comission=0.0,
                                    vis=True,
                                    use_tp_sl=True):
    orden = ORDER
    pattern, tendency, direction_str, expansion_data = pattern_with_expansions

    if use_tp_sl:
        results_by_params = {}
        if vis:
            plt.figure(figsize=(12, 6))
        for tp_label, tp_val in pattern_with_expansions[-1]["favorable"].items():
            for sl_label, sl_val in pattern_with_expansions[-1]["adversarial"].items():
                key = (tp_label, sl_label)
                df_res = run_simulation(df, 
                                        pattern_with_expansions,
                                        tp_val=tp_val,
                                        sl_val=sl_val)
                results_by_params[key] = df_res
                if vis:
                    plt.plot(df_res["entry_time"], df_res["capital"], label=f'TP:{tp_label}, SL:{sl_label}')
        if vis:
            plt.xlabel("Fecha")
            plt.ylabel("Capital")
            plt.title("Curvas de Equity")
            plt.legend()
            plt.grid(True)
            plt.show()
        return results_by_params
    else:
        return run_simulation(df, pattern, tendency, direction_str, orden, stake, comission)


In [None]:
def simulate_continuous_strategy_no_expansion(df, pattern_with_expansions, stake=1.0, comission=0.0, vis=True):
    orden = ORDER
    pattern = pattern_with_expansions[:3]
    direction = 1 if pattern[2] == "up" else -1

    capital = 0.0
    results = []

    for i in range(orden, len(df) - 1):
        tendency = df.loc[i - 1, "tendency"]
        candle_pattern = "_".join(df.loc[i - orden:i - 1, "type"].values.tolist())

        if candle_pattern == pattern[0] and tendency == pattern[1]:
            entry_time = df.loc[i, "date"]
            row_candle = df.iloc[i]
            entry_price = row_candle["open"]
            exit_price = row_candle["close"]

            price_diff = (exit_price - entry_price) * direction
            pnl = stake * price_diff

            capital += pnl - comission

            results.append(
                {
                    "index": i,
                    "entry_time": entry_time,
                    "entry_price": entry_price,
                    "exit_price": exit_price,
                    "pnl": pnl,
                    "capital": capital,
                }
            )

    df_results = pd.DataFrame(results)

    n_trades = len(df_results)
    n_days = df_results["entry_time"].dt.date.nunique() if not df_results.empty else 0
    avg_trades_per_day = n_trades / n_days if n_days > 0 else 0.0

    if vis:
        print(f"Operaciones totales: {n_trades}")
        print(f"Promedio por día: {avg_trades_per_day:.2f}")
        print(f"Capital final: {capital:.2f}")

    if "capital" in df_results.columns:
        mdd_capital = compute_max_drawdown(df_results["capital"])
    else:
        mdd_capital = {"mdd": 0.0, "mdd_pct": 0.0}

    metrics = {
        "mdd_capital": mdd_capital,
        "n_trades": int(n_trades),
        "avg_trades_per_day": float(avg_trades_per_day),
        "final_capital": float(capital),
    }
    df_results.attrs["metrics"] = metrics
    return df_results


In [None]:
def simulate_continuous_strategy_full(df, pattern_with_expansions, stake=1.0, comission=0.0, vis=True, use_tp_sl=True):
    orden = ORDER
    pattern, tendency, direction_str, expansion_data = pattern_with_expansions
    direction = 1 if direction_str == "up" else -1

    def run_simulation(tp_val=None, sl_val=None):
        capital = 0.0
        results = []

        for i in range(orden, len(df) - 1):
            if "_".join(df.loc[i - orden:i - 1, "type"].values.tolist()) == pattern and df.loc[i - 1, "tendency"] == tendency:
                entry_time = df.loc[i, "date"]
                row = df.iloc[i]
                entry_price = row["open"]
                high = row["high"]
                low = row["low"]

                if tp_val is not None and sl_val is not None:
                    tp_real = tp_val
                    sl_real = sl_val

                    sl_triggered = (low <= entry_price - sl_real) if direction == 1 else (high >= entry_price + sl_real)
                    tp_triggered = (high >= entry_price + tp_real) if direction == 1 else (low <= entry_price - tp_real)

                    if sl_triggered and tp_triggered:
                        pnl = -1.0
                    elif sl_triggered:
                        pnl = -1.0
                    elif tp_triggered:
                        pnl = tp_real / sl_real
                    else:
                        close_price = row["close"]
                        move = (close_price - entry_price) * direction
                        pnl = move / sl_real
                else:
                    close_price = row["close"]
                    move = (close_price - entry_price) * direction
                    pnl = move

                capital += pnl * stake - comission

                results.append({
                    "index": i,
                    "entry_time": entry_time,
                    "entry_price": entry_price,
                    "exit_price": row["close"],
                    "pnl": pnl,
                    "capital": capital,
                })

        df_results = pd.DataFrame(results)
        n_trades = len(df_results)
        n_days = df_results["entry_time"].dt.date.nunique() if not df_results.empty else 0
        avg_trades_per_day = n_trades / n_days if n_days > 0 else 0.0

        mdd_capital = compute_max_drawdown(df_results["capital"]) if not df_results.empty else {"mdd": 0.0, "mdd_pct": 0.0}
        df_results.attrs["metrics"] = {
            "mdd_capital": mdd_capital,
            "n_trades": int(n_trades),
            "avg_trades_per_day": float(avg_trades_per_day),
            "final_capital": float(capital),
        }

        return df_results

    if use_tp_sl:
        results_by_params = {}
        if vis:
            plt.figure(figsize=(12, 6))
        for tp_label, tp_val in expansion_data["favorable"].items():
            for sl_label, sl_val in expansion_data["adversarial"].items():
                key = (tp_label, sl_label)
                df_res = run_simulation(tp_val=tp_val, sl_val=sl_val)
                results_by_params[key] = df_res
                if vis:
                    plt.plot(df_res["entry_time"], df_res["capital"], label=f'TP:{tp_label}, SL:{sl_label}')
        if vis:
            plt.xlabel("Fecha")
            plt.ylabel("Capital")
            plt.title("Curvas de Equity")
            plt.legend()
            plt.grid(True)
            plt.show()
        return results_by_params
    else:
        return run_simulation()


In [None]:
def compute_max_drawdown(series):
    s = pd.Series(series, dtype=float)
    if s.empty:
        return {"mdd": 0.0, "mdd_pct": 0.0}
    peak = s.cummax()
    dd = peak - s
    mdd = float(dd.max())
    argmax_dd = int(np.nanargmax(dd.values))
    peak_at_mdd = float(peak.iloc[argmax_dd]) if mdd > 0 else float(peak.iloc[0])
    mdd_pct = float(mdd / peak_at_mdd) if peak_at_mdd != 0 else 0.0
    return {"mdd": mdd, "mdd_pct": mdd_pct}


In [None]:
def iterate_over_patterns(df, 
                          relevant_transition_matrix, 
                          max_adverse_move,
                          minimum_range_movement,
                          stake=1,
                          vis=False):
    survival_strategies = []

    for index, row in relevant_transition_matrix.iterrows():
        pattern, tendency = index

        try:
            resultados = simulate_continuous_strategy(
                df,
                relevant_transition_matrix,
                pattern,
                tendency,
                max_adverse_move,
                minimum_range_movement,
                stake,
                vis
            )
        except Exception:
            continue  # si la simulación falla, descartamos

        # seguridad: vector capital limpio y finito
        y = np.asarray(resultados["capital"], dtype=float)
        if y.size < 2:
            continue
        if not np.all(np.isfinite(y)):
            continue

        x = np.arange(y.size, dtype=float)

        # polyfit robusto: descartar si "diverge" (NaN/Inf/error numérico)
        try:
            slope = float(np.polyfit(x, y, 1)[0])
        except np.linalg.LinAlgError:
            continue  # SVD did not converge
        except Exception:
            continue

        if not np.isfinite(slope):
            continue

        # ratio de tiempo por encima de 0 (tu capital base)
        ratio_above = (y > 0).mean()
        capital_final = float(y[-1])

        prob_up = float(row.get("up", np.nan))
        prob_dw = float(row.get("dw", np.nan))
        expected = "up" if prob_up > prob_dw else "dw"

        if slope > 0 and ratio_above > 0.5 and capital_final > 0:
            survival_strategies.append([
                pattern,
                tendency,
                expected,
                max_adverse_move,
                minimum_range_movement,
                resultados,
                resultados.attrs.get("metrics", {}).get("mdd_capital", np.nan)
            ])

    return survival_strategies


# Optimización

In [None]:

def analyze_winners_mae(
    df,
    transition_matrix,
    pattern,
    tendency,
    max_adverse_move=None,
    minimum_range_movement=None,
    stake=1,
    comission=0.00,
    vis=False
):
    df_results = simulate_continuous_strategy(
        df=df,
        transition_matrix=transition_matrix,
        pattern=pattern,
        tendency=tendency,
        max_adverse_move=max_adverse_move,
        minimum_range_movement=minimum_range_movement,
        stake=stake,
        vis=False,
        comission=comission
    )
    if df_results.empty:
        if vis:
            print("No hay operaciones para graficar.")
        return df_results, {"count_winners": 0}

    # === Plotea TODAS las curvas de equity disponibles en df_results ===
    if vis:
        plt.figure(figsize=(10, 5))
        for col in ("capital", "capital_adjusted"):
            if col in df_results.columns:
                plt.plot(df_results[col].values, linewidth=2, label=col)
        plt.title(f"Evolución del capital - Patrón: {pattern}, Tendencia: {tendency}")
        plt.xlabel("Trade #")
        plt.ylabel("Capital")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    # === Cálculo de MAE para ganadoras ===
    rows = []
    for _, r in df_results.iterrows():
        j = int(r["index"]) + 1
        c = df.iloc[j]
        entry = float(r["entry_price"])
        close_ = float(c["close"])
        high = float(c["high"])
        low = float(c["low"])
        dir_sign = 1 if close_ > entry else (-1 if close_ < entry else 0)
        if dir_sign > 0:
            adverse = max(0.0, entry - low)
        elif dir_sign < 0:
            adverse = max(0.0, high - entry)
        else:
            adverse = 0.0
        rows.append({
            "index_base": int(r["index"]),
            "entry_time": r["entry_time"],
            "entry_price": entry,
            "close": close_,
            "high": high,
            "low": low,
            "pnl": float(r["pnl"]),
            "adverse_excursion": adverse
        })

    ae_df = pd.DataFrame(rows)
    winners = ae_df[ae_df["pnl"] > 0].copy()
    if winners.empty:
        return winners, {"count_winners": 0}

    ae = winners["adverse_excursion"].values
    summary = {
        "count_winners": int(len(winners)),
        "mae_mean": float(np.mean(ae)),
        "mae_median": float(np.median(ae)),
        "mae_p10": float(np.percentile(ae, 10)),
        "mae_p25": float(np.percentile(ae, 25)),
        "mae_p80": float(np.percentile(ae, 80)),
        "mae_p90": float(np.percentile(ae, 90)),
        "mae_p95": float(np.percentile(ae, 95)),
        "suggested_MAM_p90": float(np.percentile(ae, 90)),
        "suggested_MAM_p95": float(np.percentile(ae, 95)),
    }

    if vis:
        print("Ganadoras:", summary["count_winners"])
        print("MAE mean/median:", round(summary["mae_mean"],2), "/", round(summary["mae_median"],2))
        print("P10/P25/P50/P80/P90/P95:",
            round(summary["mae_p10"],2),
            round(summary["mae_p25"],2),
            round(summary["mae_p80"],2), "/", 
            round(summary["mae_p90"],2), "/", 
            round(summary["mae_p95"],2))

    return winners, summary



In [None]:

def build_grid(df, 
            transition_matrix,
            mam_and_mrm,
            n_jobs=-1):
    pairs = [(mrm, mam) for mrm, mam in product(mrm_values, mam_values)]
    def run_iteration(mrm, mam):
        return iterate_over_patterns(
            df,
            transition_matrix,
            max_adverse_move=mam,
            minimum_range_movement=mrm,
            vis=False
        )
    outputs = Parallel(n_jobs=n_jobs)(
        delayed(run_iteration)(mrm, mam) for mrm, mam in pairs
    )
    grid = {pair: out for pair, out in zip(pairs, outputs)}
    return grid

In [None]:
def extract_survival_data(grid, pattern_tendency):
    def _safe_round(x, n=2):
        return round(float(x), n) if pd.notna(x) else x

    candidates = []
    for key in grid.keys():
        for entry in grid[key]:
            pattern, tendency, direction, mam, mrm, capital_curve, dd = entry
            if (pattern, tendency) != tuple(pattern_tendency):
                continue

            if isinstance(capital_curve, pd.DataFrame):
                if "capital" in capital_curve.columns:
                    y = capital_curve["capital"].astype(float).values
                else:
                    continue
            elif isinstance(capital_curve, pd.Series):
                y = capital_curve.astype(float).values
            else:
                y = np.asarray(capital_curve, dtype=float)

            if y.size < 2:
                continue

            x = np.arange(y.size, dtype=float)
            slope = float(np.polyfit(x, y, 1)[0])
            dd_val = float(dd.get("mdd", np.nan)) if isinstance(dd, dict) else np.nan

            candidates.append({
                "key": (pattern, tendency, mam, mrm),
                "slope": slope,
                "dd": dd_val,
                "curve": y,
                "length": len(y)
            })

    if not candidates:
        return {}

    def _ratio(c):
        if not pd.notna(c["dd"]):
            return -np.inf
        if c["dd"] == 0:
            return np.inf
        return c["slope"] / c["dd"]

    valid_dd = [c for c in candidates if pd.notna(c["dd"])]
    pick_min_dd = min(valid_dd, key=lambda c: c["dd"]) if valid_dd else None
    pick_max_slope = max(candidates, key=lambda c: c["slope"])
    pick_best_ratio = max(candidates, key=_ratio)

    max_len = max(c["length"] for c in candidates)
    longest = [c for c in candidates if c["length"] == max_len]

    pick_longest_min_dd = min([c for c in longest if pd.notna(c["dd"])], key=lambda c: c["dd"], default=None)
    pick_longest_max_slope = max(longest, key=lambda c: c["slope"])
    pick_longest_best_ratio = max(longest, key=_ratio)

    # Gráfico
    picks = [
        ("Min DD", pick_min_dd),
        ("Max Slope", pick_max_slope),
        ("Best Slope/DD", pick_best_ratio),
        ("Longest - Min DD", pick_longest_min_dd),
        ("Longest - Max Slope", pick_longest_max_slope),
        ("Longest - Best Slope/DD", pick_longest_best_ratio)
    ]

    seen = set()
    plt.figure(figsize=(14, 6))
    for label_prefix, c in picks:
        if c is None or c["key"] in seen:
            continue
        seen.add(c["key"])
        p, t, mam, mrm = c["key"]
        slope_r = _safe_round(c["slope"], 2)
        dd_r = "NA" if not pd.notna(c["dd"]) else _safe_round(c["dd"], 2)
        mam_r = mam if mam is None else _safe_round(mam, 2)
        mrm_r = mrm if mrm is None else _safe_round(mrm, 2)
        label = f"{label_prefix}: MAM={mam_r}, MRM={mrm_r}, slope={slope_r}, dd={dd_r}, len={c['length']}"
        plt.plot(c["curve"], linewidth=2, label=label)

    p, t = pattern_tendency
    plt.title(f"Curvas seleccionadas - Patrón: {p}, Tendencia: {t}")
    plt.xlabel("Trade #")
    plt.ylabel("Capital")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    def _pack(c):
        if c is None:
            return None
        p, t, mam, mrm = c["key"]
        return {
            "pattern": p,
            "tendency": t,
            "direction": direction,
            "mam": mam,
            "mrm": mrm,
            "slope": _safe_round(c["slope"], 2),
            "dd": None if not pd.notna(c["dd"]) else _safe_round(c["dd"], 2),
            "ratio": None if (not pd.notna(c["dd"]) or c["dd"] == 0) else _safe_round(c["slope"]/c["dd"], 4),
            "length": c["length"]
        }

    return {
        "min_dd": _pack(pick_min_dd),
        "max_slope": _pack(pick_max_slope),
        "best_slope_over_dd": _pack(pick_best_ratio),
        "longest": {
            "min_dd": _pack(pick_longest_min_dd),
            "max_slope": _pack(pick_longest_max_slope),
            "best_slope_over_dd": _pack(pick_longest_best_ratio)
        }
    }



# Visualization functions

In [None]:
def plot_equity_curve(df_results,
                    pattern=None,
                    tendency=None,
                    capital_cols=['capital']):
    
    if df_results.empty:
        print("No hay operaciones para graficar.")
        return

    plt.figure(figsize=(10, 5))

    for col in capital_cols:
        plt.plot(df_results[col], linewidth=2, label=col)

    title = "Evolución del capital"
    if pattern and tendency is not None:
        title += f" - Patrón: {pattern}, Tendencia: {tendency}"

    plt.title(title)
    plt.xlabel("Trade #")
    plt.ylabel("Capital")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()


In [None]:
def plot_multiple_equity_curves(df_results,
                                label,
                                pattern,
                                tendency):
    
    if df_results.empty:
        print("No hay operaciones para graficar.")
        return

    plt.plot(pd.DataFrame(df_results), linewidth=2, label=label)

    title = "Evolución del capital"
    if pattern and tendency is not None:
        title += f" - Patrón: {pattern}, Tendencia: {tendency}"

    plt.title(title)
    plt.xlabel("Trade #")
    plt.ylabel("Capital")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
