In [25]:
# =============================================================================
# AUDITORÍA FORENSE TOP-50 (Script Blindado para CSV de Excel)
# =============================================================================

import pandas as pd
import numpy as np
import os
from scipy.stats import spearmanr
from tqdm.auto import tqdm
from datetime import datetime, timedelta

# --- CONFIGURACIÓN ---
FILENAME_TOP50 = "bottom 500.csv"   # Tu archivo exacto
SAVE_DIR = "."                  # Directorio de los parquets

# =============================================================================
# 1. FUNCIONES AUXILIARES DE LIMPIEZA DE DATOS (NUEVO)
# =============================================================================

def excel_date_to_datetime(serial):
    """Convierte número serial de Excel (ej. 44938) a fecha real."""
    if isinstance(serial, (int, float)):
        # Excel base date is usually 1899-12-30 for PC
        return datetime(1899, 12, 30) + timedelta(days=serial)
    return pd.to_datetime(serial) # Si ya es texto, lo deja igual

def excel_time_to_string(serial):
    """Convierte fracción de día (ej. 0.916667) a 'HH:MM'."""
    if isinstance(serial, (int, float)):
        total_seconds = int(serial * 24 * 3600)
        hours = total_seconds // 3600
        minutes = (total_seconds % 3600) // 60
        # Corrección de redondeo (ej. 21:59:59 -> 22:00)
        if (total_seconds % 3600) % 60 > 30:
            minutes += 1
            if minutes == 60:
                minutes = 0
                hours += 1
        return f"{hours:02d}:{minutes:02d}"
    return str(serial) # Si ya es texto, devuelve texto

# =============================================================================
# 2. FUNCIONES DE LÓGICA DE AUDITORÍA (REAL / PESIMISTA / OPTIMISTA)
# =============================================================================

def find_entry_tick_mecha(ticks_from_rompi, execution_delay):
    if ticks_from_rompi.empty: return None
    rompimiento_tick = ticks_from_rompi.index[0]
    delay_time = rompimiento_tick + pd.Timedelta(milliseconds=execution_delay)
    tick_activo = ticks_from_rompi[ticks_from_rompi.index <= delay_time]
    return tick_activo.iloc[-1] if not tick_activo.empty else None

def find_entry_tick_cuerpo(ticks_siguiente_vela, execution_delay):
    if ticks_siguiente_vela.empty: return None
    tick_inicio = ticks_siguiente_vela.iloc[0]
    delay_time = tick_inicio.name + pd.Timedelta(milliseconds=execution_delay)
    tick_activo = ticks_siguiente_vela[ticks_siguiente_vela.index <= delay_time]
    return tick_activo.iloc[-1] if not tick_activo.empty else None

def operativa_cuerpo_audit(vela_df, ticks_df, resultado, index, duracion_vela):
    inicio_operativa = index + pd.DateOffset(minutes=duracion_vela)
    velas_post_espera = vela_df.loc[inicio_operativa:]

    if resultado['Direccion_rompimiento'] == 'arriba':
        tp_index = velas_post_espera['high'].ge(resultado['TP']).idxmax() if velas_post_espera['high'].ge(resultado['TP']).any() else None
        sl_index = velas_post_espera['low'].le(resultado['SL']).idxmax() if velas_post_espera['low'].le(resultado['SL']).any() else None
    else:
        tp_index = velas_post_espera['low'].le(resultado['TP']).idxmax() if velas_post_espera['low'].le(resultado['TP']).any() else None
        sl_index = velas_post_espera['high'].ge(resultado['SL']).idxmax() if velas_post_espera['high'].ge(resultado['SL']).any() else None

    tp_val = resultado['TP_Ticks_Param']
    sl_val = -resultado['SL_Ticks_Param']

    if tp_index is None and sl_index is None: return 0, 0, 0
    if tp_index is not None and sl_index is None: return tp_val, tp_val, tp_val
    if sl_index is not None and tp_index is None: return sl_val, sl_val, sl_val

    if tp_index is not None and sl_index is not None:
        if tp_index == sl_index:
            # --- AMBIGÜEDAD INTRA-BARRA ---
            fin_vela = inicio_operativa + pd.DateOffset(minutes=duracion_vela)
            relevant_ticks = ticks_df.loc[inicio_operativa:fin_vela]

            if resultado['Direccion_rompimiento'] == 'arriba':
                t_tp = relevant_ticks[relevant_ticks['average'] >= resultado['TP']].index.min()
                t_sl = relevant_ticks[relevant_ticks['average'] <= resultado['SL']].index.min()
            else:
                t_tp = relevant_ticks[relevant_ticks['average'] <= resultado['TP']].index.min()
                t_sl = relevant_ticks[relevant_ticks['average'] >= resultado['SL']].index.min()

            if pd.isna(t_tp) and pd.isna(t_sl): return 0,0,0
            if pd.isna(t_tp): return sl_val, sl_val, sl_val
            if pd.isna(t_sl): return tp_val, tp_val, tp_val

            real_res = tp_val if t_tp < t_sl else sl_val
            pes_res = sl_val
            opt_res = tp_val
            return real_res, pes_res, opt_res
        else:
            res = tp_val if tp_index < sl_index else sl_val
            return res, res, res
    return 0, 0, 0

def operativa_mecha_audit(resultado, ticks_from_rompi):
    if resultado['Direccion_rompimiento'] == 'arriba':
        t_tp = ticks_from_rompi[ticks_from_rompi >= resultado['TP']].index.min()
        t_sl = ticks_from_rompi[ticks_from_rompi <= resultado['SL']].index.min()
    else:
        t_tp = ticks_from_rompi[ticks_from_rompi <= resultado['TP']].index.min()
        t_sl = ticks_from_rompi[ticks_from_rompi >= resultado['SL']].index.min()

    t_tp = None if pd.isna(t_tp) else t_tp
    t_sl = None if pd.isna(t_sl) else t_sl
    tp_val = resultado['TP_Ticks_Param']
    sl_val = -resultado['SL_Ticks_Param']

    if t_tp is not None and t_sl is not None:
        real_res = tp_val if t_tp < t_sl else sl_val
        return real_res, sl_val, tp_val
    elif t_tp is not None: return tp_val, tp_val, tp_val
    elif t_sl is not None: return sl_val, sl_val, sl_val
    else: return None

# =============================================================================
# 3. CORE: STRATEGY TESTER AUDIT
# =============================================================================

def Estrategy_Tester_Audit(vela_df, ticks_df, fecha_evaluacion, hora_apertura, num_velas, v_espera, tp_ticks, sl_ticks, tipo_vela_rompimiento, duracion_vela, tick_size):
    fecha_hora_inicio = pd.to_datetime(f"{fecha_evaluacion} {hora_apertura}", format='%Y-%m-%d %H:%M')
    try: fecha_hora_inicio = fecha_hora_inicio.tz_localize('Etc/GMT+5')
    except: pass

    vela_rango = vela_df.loc[fecha_hora_inicio:fecha_hora_inicio + pd.DateOffset(minutes=num_velas * duracion_vela)]

    resultado = {
        'High del rango': vela_rango['high'].max() if not vela_rango.empty else "none",
        'Low del rango': vela_rango['low'].min() if not vela_rango.empty else "none",
        'Rompimiento': False,
        'TP_Ticks_Param': tp_ticks,
        'SL_Ticks_Param': sl_ticks
    }

    if vela_rango.empty: return 0, 0, 0

    inicio_espera = fecha_hora_inicio + pd.DateOffset(minutes=num_velas*duracion_vela + duracion_vela)
    vela_espera = vela_df.loc[inicio_espera:inicio_espera + pd.DateOffset(minutes=v_espera*duracion_vela - duracion_vela)]
    execution_delay = 500

    for index, row in vela_espera.iterrows():
        entry_found = False
        entry_price = 0
        direccion = ""

        if row['high'] > resultado['High del rango']:
            direccion = 'arriba'
            if tipo_vela_rompimiento == 1:
                fin_vela_romp = index + pd.DateOffset(minutes=duracion_vela)
                ticks_vela_romp = ticks_df.loc[index:fin_vela_romp]
                romp_idx = ticks_vela_romp[ticks_vela_romp['average'] > resultado['High del rango']].index.min()
                if pd.notna(romp_idx):
                    ticks_from = ticks_vela_romp.loc[romp_idx:]
                    tick_entry = find_entry_tick_mecha(ticks_from, execution_delay)
                    if tick_entry is not None:
                        entry_price = tick_entry['average']
                        entry_found = True
            elif tipo_vela_rompimiento == 0:
                fin_vela_romp = index + pd.DateOffset(minutes=duracion_vela)
                ticks_vela_romp = ticks_df.loc[index:fin_vela_romp]
                if not ticks_vela_romp.empty and ticks_vela_romp.iloc[-1]['average'] > resultado['High del rango']:
                    ultimo_tick = ticks_vela_romp.index[-1]
                    sig_vela_ini = ultimo_tick + pd.Timedelta(minutes=duracion_vela)
                    ticks_sig = ticks_df.loc[sig_vela_ini:sig_vela_ini + pd.Timedelta(minutes=duracion_vela)]
                    tick_entry = find_entry_tick_cuerpo(ticks_sig, execution_delay)
                    if tick_entry is not None:
                        entry_price = tick_entry['average']
                        entry_found = True

        elif row['low'] < resultado['Low del rango']:
            direccion = 'abajo'
            if tipo_vela_rompimiento == 1:
                fin_vela_romp = index + pd.DateOffset(minutes=duracion_vela)
                ticks_vela_romp = ticks_df.loc[index:fin_vela_romp]
                romp_idx = ticks_vela_romp[ticks_vela_romp['average'] < resultado['Low del rango']].index.min()
                if pd.notna(romp_idx):
                    ticks_from = ticks_vela_romp.loc[romp_idx:]
                    tick_entry = find_entry_tick_mecha(ticks_from, execution_delay)
                    if tick_entry is not None:
                        entry_price = tick_entry['average']
                        entry_found = True
            elif tipo_vela_rompimiento == 0:
                fin_vela_romp = index + pd.DateOffset(minutes=duracion_vela)
                ticks_vela_romp = ticks_df.loc[index:fin_vela_romp]
                if not ticks_vela_romp.empty and ticks_vela_romp.iloc[-1]['average'] < resultado['Low del rango']:
                    ultimo_tick = ticks_vela_romp.index[-1]
                    sig_vela_ini = ultimo_tick + pd.Timedelta(minutes=duracion_vela)
                    ticks_sig = ticks_df.loc[sig_vela_ini:sig_vela_ini + pd.Timedelta(minutes=duracion_vela)]
                    tick_entry = find_entry_tick_cuerpo(ticks_sig, execution_delay)
                    if tick_entry is not None:
                        entry_price = tick_entry['average']
                        entry_found = True

        if entry_found:
            resultado.update({
                'Rompimiento': True,
                'Hora rompimiento': index,
                'Precio entrada': entry_price,
                'TP': entry_price + (tp_ticks * tick_size) if direccion == 'arriba' else entry_price - (tp_ticks * tick_size),
                'SL': entry_price - (sl_ticks * tick_size) if direccion == 'arriba' else entry_price + (sl_ticks * tick_size),
                'Direccion_rompimiento': direccion
            })

            if tipo_vela_rompimiento == 1:
                fin_vela_romp = index + pd.DateOffset(minutes=duracion_vela)
                ticks_vela_romp = ticks_df.loc[index:fin_vela_romp]
                if direccion == 'arriba':
                     romp_idx = ticks_vela_romp[ticks_vela_romp['average'] > resultado['High del rango']].index.min()
                else:
                     romp_idx = ticks_vela_romp[ticks_vela_romp['average'] < resultado['Low del rango']].index.min()
                ticks_from = ticks_vela_romp.loc[romp_idx:]
                res_mecha = operativa_mecha_audit(resultado, ticks_from['average'])
                if res_mecha is not None: return res_mecha

            return operativa_cuerpo_audit(vela_df, ticks_df, resultado, index, duracion_vela)

    return 0, 0, 0

# =============================================================================
# 4. LOOP PRINCIPAL
# =============================================================================

def run_audit():
    print("1. Cargando datos...")
    if not os.path.exists("ticks_df.parquet"):
        print("ERROR: Sube ticks_df.parquet y vela_df.parquet a Colab")
        return

    ticks_df = pd.read_parquet("ticks_df.parquet")
    vela_df = pd.read_parquet("vela_df.parquet")
    top50 = pd.read_csv(FILENAME_TOP50)

    print(f"2. Auditando {len(top50)} estrategias...")
    duracion_vela = 5
    tick_size = 0.00001
    audit_data = []

    for idx, row in tqdm(top50.iterrows(), total=len(top50)):
        try:
            # --- LIMPIEZA DE DATOS AUTOMÁTICA ---
            h_ap = excel_time_to_string(row['hora_apertura'])
            f_ini = excel_date_to_datetime(row['fecha_inicio'])
            f_fin = excel_date_to_datetime(row['fecha_fin'])

            v_ran = int(row['v_rango'])
            v_esp = int(row['v_espera'])
            tp_t = row['TP_Ticks']
            sl_t = row['SL_ticks']
            tipo = int(row['Tipo_vela_rompimiento'])

            dates = pd.date_range(f_ini, f_fin, freq='D')
            net_real, net_pes, net_opt = 0, 0, 0

            for d in dates:
                fecha_str = d.strftime('%Y-%m-%d')
                r, p, o = Estrategy_Tester_Audit(
                    vela_df, ticks_df, fecha_str, h_ap, v_ran, v_esp, tp_t, sl_t, tipo, duracion_vela, tick_size
                )
                net_real += r
                net_pes += p
                net_opt += o

            audit_data.append({
                'ID': idx,
                'Net_Real_Audit': net_real,
                'Net_Pesimista': net_pes,
                'Net_Optimista': net_opt,
                'Net_Original_Excel': row['Ticks_totales_estrategia']
            })

        except Exception as e:
            print(f"Error en fila {idx}: {e}")
            continue

    df_audit = pd.DataFrame(audit_data)

    # Calcular Métricas
    df_audit['Divergencia'] = df_audit['Net_Real_Audit'] != df_audit['Net_Optimista']
    pct_divergencia = df_audit['Divergencia'].mean() * 100
    mean_error = (df_audit['Net_Optimista'] - df_audit['Net_Real_Audit']).mean()
    rho, _ = spearmanr(df_audit['Net_Real_Audit'], df_audit['Net_Optimista'])

    print("\n" + "="*60)
    print("INFORME DE AUDITORÍA FORENSE (Bottom 500)")
    print("="*60)
    print(f"1. Porcentaje de Estrategias con Divergencia: {pct_divergencia:.2f}%")
    print(f"2. Sobreestimación Media (Optimista - Real):  {mean_error:.2f} ticks")
    print(f"3. Robustez del Ranking (Spearman rho):       {rho:.4f}")
    print("="*60)

    df_audit.to_csv("Detalle_Auditoria_Top50.csv")

if __name__ == "__main__":
    run_audit()

1. Cargando datos...
2. Auditando 500 estrategias...


  0%|          | 0/500 [00:00<?, ?it/s]


INFORME DE AUDITORÍA FORENSE (Bottom 500)
1. Porcentaje de Estrategias con Divergencia: 1.40%
2. Sobreestimación Media (Optimista - Real):  1.68 ticks
3. Robustez del Ranking (Spearman rho):       0.9930


Primero, podemos intentar leer el esquema del archivo usando `pyarrow`. Si esto falla, el archivo no es un Parquet válido o está dañado.

In [19]:
# =============================================================================
# PRUEBA DE ESTRÉS EXTREMA ("KAMIKAZE")
# =============================================================================
def run_hyper_stress_test():
    print("\n⚠️ INICIANDO PRUEBA KAMIKAZE (TP=3 ticks, SL=3 ticks)...")

    # 1. Cargar datos
    if not os.path.exists("ticks_df.parquet"):
        print("Error: Faltan archivos .parquet")
        return
    ticks_df = pd.read_parquet("ticks_df.parquet")
    vela_df = pd.read_parquet("vela_df.parquet")

    # 2. Estrategia diseñada para colisionar
    # Usamos TP/SL de 3 ticks (0.00003).
    # Cualquier vela normal de 1 pip tocará ambos lados.
    stress_strategy = pd.DataFrame([{
        'hora_apertura': "09:00", # Hora NY (Volátil)
        'v_rango': 1,
        'v_espera': 1,
        'TP_Ticks': 3,   # <--- 0.3 Pips (Minúsculo)
        'SL_ticks': 3,   # <--- 0.3 Pips (Minúsculo)
        'Tipo_vela_rompimiento': 0, # Cuerpo
        'fecha_inicio': "2024-01-08", # Semana laboral completa
        'fecha_fin': "2024-01-12",
        'Ticks_totales_estrategia': 0
    }])

    print("Auditando estrategia microscópica...")
    duracion_vela = 5
    tick_size = 0.00001

    row = stress_strategy.iloc[0]
    dates = pd.date_range(row['fecha_inicio'], row['fecha_fin'], freq='D')

    ambiguedad_count = 0
    total_trades = 0

    # Debug: Ver si hay datos
    print(f"Verificando datos para {row['fecha_inicio']}...")
    try:
        sample_ticks = ticks_df.loc[row['fecha_inicio']]
        print(f"Ticks disponibles para el primer día: {len(sample_ticks)}")
    except:
        print("ADVERTENCIA: No parece haber ticks para esta fecha.")

    for d in dates:
        fecha_str = d.strftime('%Y-%m-%d')
        r, p, o = Estrategy_Tester_Audit(
            vela_df, ticks_df, fecha_str,
            row['hora_apertura'], int(row['v_rango']), int(row['v_espera']),
            int(row['TP_Ticks']), int(row['SL_ticks']), int(row['Tipo_vela_rompimiento']),
            duracion_vela, tick_size
        )

        # Detectamos si hubo trade
        if r != 0 or p != 0 or (r==0 and p==0 and o==0 and 'TP_Ticks_Param' in str(r)):
             # Nota: si el resultado es 0 puede ser break-even o no trade,
             # pero aquí asumimos que TP 3 ticks siempre gana o pierde algo.
             pass

        # Contamos divergencia real
        if o != p: # Si Optimista es distinto de Pesimista
            ambiguedad_count += 1
            # Imprimir el primer caso encontrado para verificar
            if ambiguedad_count == 1:
                print(f"  -> ¡Primera Ambigüedad detectada el {fecha_str}!")
                print(f"     Real: {r}, Pesimista: {p}, Optimista: {o}")

        # Estimación simple de trades (si hubo retorno distinto al pnl esperado o no)
        # En esta prueba asumimos que hubo actividad si contamos divergencia

    print("\n" + "="*40)
    print("RESULTADO PRUEBA KAMIKAZE")
    print("="*40)
    print(f"Eventos de Ambigüedad Detectados: {ambiguedad_count}")

    if ambiguedad_count > 0:
        print("✅ ÉXITO: El detector funciona.")
        print("   (Si detecta esto, entonces el 0.00% del Top-50 es REAL).")
    else:
        print("❌ FALLO: Sigue sin detectar nada. (Revisar carga de datos).")
    print("="*40)

if __name__ == "__main__":
    run_hyper_stress_test()


⚠️ INICIANDO PRUEBA KAMIKAZE (TP=3 ticks, SL=3 ticks)...
Auditando estrategia microscópica...
Verificando datos para 2024-01-08...
Ticks disponibles para el primer día: 107199
  -> ¡Primera Ambigüedad detectada el 2024-01-08!
     Real: -3, Pesimista: -3, Optimista: 3

RESULTADO PRUEBA KAMIKAZE
Eventos de Ambigüedad Detectados: 2
✅ ÉXITO: El detector funciona.
   (Si detecta esto, entonces el 0.00% del Top-50 es REAL).


# Sección nueva