In [21]:
import pandas as pd
import numpy as np
import glob
import os
import sys

# Verificamos d√≥nde estamos
current_dir = os.getcwd()
print(f"Directorio actual: {current_dir}")

# Intentamos a√±adir la carpeta superior al path
parent_dir = os.path.abspath(os.path.join(current_dir, '..'))
sys.path.append(parent_dir)

# Comprobaci√≥n de seguridad: Si 'src' est√° en la misma carpeta, a√±adimos la actual tambi√©n
sys.path.append(current_dir)

try:
    from src.config import INVALID_PRICES, VALID_STATUS_CODES, VENUES
    print("‚úÖ Importaci√≥n exitosa desde src.config")
except ModuleNotFoundError:
    print("‚ùå Fallo")

DATA_PATH = r"../data/DATA_BIG" 

Directorio actual: c:\Users\danie\Desktop\MIAX - BME\Bloque 2 - Finanzas\Renta Variable\TAREA 3\Practica Renta Variable_DanielGarciaLopez_77856928L\notebooks
‚úÖ Importaci√≥n exitosa desde src.config


### Ingesta, Limpieza y Sincronizaci√≥n de Microestructura

En este bloque se implementa la l√≥gica cr√≠tica para procesar los datos crudos del proveedor. Se definen dos funciones encargadas de transformar los archivos CSV en informaci√≥n operativa fiable:

* **`load_and_clean_venue_data`**: Procesa un mercado individual aplicando tres filtros de calidad basados en las especificaciones del proveedor:
    * **Filtrado de "Magic Numbers":** Elimina precios sint√©ticos (como `999,999.999`) que representan √≥rdenes de mercado o estados no operables.
    * **Sincronizaci√≥n de Estados (STS):** Utiliza `pd.merge_asof` (con direcci√≥n *backward*) para cruzar as√≠ncronamente las cotizaciones (QTE) con los estados del mercado (STS). Esto permite asignar a cada tick su estado de mercado vigente exacto.
    * **Filtro de "Continuous Trading":** Descarta cualquier cotizaci√≥n que ocurra durante subastas, paradas o cierres, manteniendo solo aquellas v√°lidas para arbitraje inmediato (seg√∫n los c√≥digos v√°lidos definidos en la documentaci√≥n).

*  **`load_all_venues_for_isin`**: Funci√≥n que itera sobre todos los mercados disponibles (BME, AQUIS, CBOE, TURQUOISE), ejecuta la limpieza anterior y consolida los resultados en un √∫nico DataFrame vertical (*Long Format*) ordenado temporalmente.

La estructrua del dataframe esperado tras este paso debe ser algo asi:
| epoch (Tiempo) | venue | px_bid_0 | qty_bid_0 | px_ask_0 | qty_ask_0 | market_trading_status |
|----------------|-------|----------|-----------|----------|-----------|------------------------|
| 170000000001   | BME   | 10.50    | 500       | 10.52    | 200       | Open                   |
| 170000000002   | AQUIS | 10.49    | 1000      | 10.53    | 600       | Open                   |
| 170000000005   | BME   | 10.51    | 300       | 10.52    | 150       | Open                   |
| 170000000008   | CBOE  | 10.50    | 2000      | 10.51    | 500       | Open                   |


In [24]:
# CELDA 2 (DEFINITIVA V3): CARGA INTELIGENTE (Auto-detecci√≥n de separador)

# Mapping de nombres
MIC_MAPPING = {
    "BME": "XMAD",
    "AQUIS": "AQEU",
    "CBOE": "CEUX",      
    "TURQUOISE": "TQEX"  
}

def load_and_clean_venue_data(data_path, date, isin, venue_name):
    """
    Carga datos gestionando autom√°ticamente separadores (; o ,) y errores de columnas.
    """
    file_mic = MIC_MAPPING.get(venue_name, venue_name)
    
    # Construcci√≥n de rutas (Estrategia doble: carpeta anidada o plana)
    path_v1 = os.path.join(data_path, f"{venue_name}_{date}") # DATA/VENUE_DATE/
    path_v2 = data_path                                        # DATA/
    
    qte_name = f"QTE_{date}_{isin}_*_{file_mic}_*.csv.gz"
    sts_name = f"STS_{date}_{isin}_*_{file_mic}_*.csv.gz"
    
    # B√∫squeda de archivos
    qte_files = glob.glob(os.path.join(path_v1, qte_name)) or glob.glob(os.path.join(path_v2, qte_name))
    sts_files = glob.glob(os.path.join(path_v1, sts_name)) or glob.glob(os.path.join(path_v2, sts_name))
    
    if not qte_files or not sts_files:
        return None

    # --- LECTURA ROBUSTA (Aqu√≠ estaba el fallo) ---
    try:
        # Intentamos leer primero con coma (est√°ndar)
        df_qte = pd.read_csv(qte_files[0], compression='gzip', sep=',')
        
        # Si leemos y solo hay 1 columna, es sospechoso -> Probamos con punto y coma
        if len(df_qte.columns) < 2:
            df_qte = pd.read_csv(qte_files[0], compression='gzip', sep=';')
            
        # Hacemos lo mismo para el STS
        df_sts = pd.read_csv(sts_files[0], compression='gzip', sep=',')
        if len(df_sts.columns) < 2:
            df_sts = pd.read_csv(sts_files[0], compression='gzip', sep=';')
            
        # Limpieza de nombres de columnas (quita espacios extra tipo " epoch")
        df_qte.columns = df_qte.columns.str.strip()
        df_sts.columns = df_sts.columns.str.strip()
        
        # VERIFICACI√ìN: Si la columna clave no est√°, lanzamos error informativo
        if 'px_bid_0' not in df_qte.columns:
            raise ValueError(f"Columnas encontradas: {list(df_qte.columns)}. Falta 'px_bid_0'.")

    except Exception as e:
        print(f"  ‚ö†Ô∏è Error leyendo archivo de {venue_name}: {e}")
        return None
    
    # --- LIMPIEZA 1: MAGIC NUMBERS ---
    for price in INVALID_PRICES:
        if 'px_bid_0' in df_qte.columns and 'px_ask_0' in df_qte.columns:
            mask_invalid = np.isclose(df_qte['px_bid_0'], price) | np.isclose(df_qte['px_ask_0'], price)
            df_qte = df_qte[~mask_invalid]
    
    # --- LIMPIEZA 2: MARKET STATUS ---
    df_qte = df_qte.sort_values('epoch')
    df_sts = df_sts.sort_values('epoch')
    
    df_merged = pd.merge_asof(
        df_qte, 
        df_sts[['epoch', 'market_trading_status']], 
        on='epoch', 
        direction='backward'
    )
    
    if venue_name not in VALID_STATUS_CODES:
        valid_codes = []
    else:
        valid_codes = VALID_STATUS_CODES[venue_name]
    
    df_clean = df_merged[df_merged['market_trading_status'].isin(valid_codes)].copy()
    df_clean['venue'] = venue_name 
    
    cols_to_keep = ['epoch', 'venue', 'px_bid_0', 'qty_bid_0', 'px_ask_0', 'qty_ask_0', 'market_trading_status']
    # Aseguramos que existan las columnas antes de filtrar
    existing_cols = [c for c in cols_to_keep if c in df_clean.columns]
    return df_clean[existing_cols]

def load_all_venues_for_isin(data_path, date, isin):
    all_data = []
    print(f"Iniciando carga para {isin} en {date}...")
    print(f"Ruta base: {os.path.abspath(data_path)}")
    
    for venue in VENUES:
        df_venue = load_and_clean_venue_data(data_path, date, isin, venue)
        
        if df_venue is not None and not df_venue.empty:
            all_data.append(df_venue)
            print(f"  ‚úÖ {venue}: Cargados {len(df_venue)} ticks.")
        else:
            print(f"  ‚ùå {venue}: No se encontraron datos v√°lidos.")
            
    if not all_data:
        return pd.DataFrame()
        
    full_df = pd.concat(all_data, ignore_index=True)
    full_df = full_df.sort_values('epoch')
    return full_df

### Validaci√≥n de Ingesta 

Antes de proceder a la construcci√≥n del *Consolidated Tape*, es fundamental validar que el proceso ETL (Extracci√≥n, Transformaci√≥n y Carga) funciona correctamente sobre una muestra controlada.

En este bloque ejecutamos una prueba unitaria con un activo (ISIN), en este caso GRIFOLS y fecha espec√≠ficos para verificar:

1.  **Integridad de los datos:** Confirmar que la funci√≥n de carga cruza correctamente los ficheros de precios (QTE) y estados (STS) sin generar errores de ejecuci√≥n.
2.  **Fragmentaci√≥n visible:** Comprobar que, efectivamente, recuperamos datos de m√∫ltiples centros de negociaci√≥n (*Venues*) para el mismo activo, condici√≥n necesaria para que exista arbitraje.
3.  **Volumen de datos:** Verificar que los filtros de calidad (como la eliminaci√≥n de *Magic Numbers*) no est√°n descartando la totalidad de la muestra y que obtenemos un n√∫mero razonable de ticks.

In [33]:
# CELDA 3: PRUEBA DE CARGA

# Configura aqu√≠ un caso real que tengas en tus carpetas
TEST_DATE = "2025-11-07" 
TEST_ISIN = "ES0171996087" 

try:
    df_ticks = load_all_venues_for_isin(DATA_PATH, TEST_DATE, TEST_ISIN)
    
    if not df_ticks.empty:
        print("\n¬°√âXITO! Datos cargados y limpios:")
        print(df_ticks.head())
        print(f"\nTotal de ticks procesados: {len(df_ticks)}")
        print("Venues encontrados:", df_ticks['venue'].unique())
    else:
        print("\nNo se encontraron datos. Verifica PATH, FECHA e ISIN.")
except Exception as e:
    print(f"Error durante la ejecuci√≥n: {e}")

Iniciando carga para ES0171996087 en 2025-11-07...
Ruta base: c:\Users\danie\Desktop\MIAX - BME\Bloque 2 - Finanzas\Renta Variable\TAREA 3\Practica Renta Variable_DanielGarciaLopez_77856928L\data\DATA_BIG
  ‚úÖ BME: Cargados 44144 ticks.
  ‚úÖ AQUIS: Cargados 17481 ticks.
  ‚úÖ CBOE: Cargados 20912 ticks.
  ‚úÖ TURQUOISE: Cargados 7435 ticks.

¬°√âXITO! Datos cargados y limpios:
                  epoch      venue  px_bid_0  qty_bid_0  px_ask_0  qty_ask_0  \
44144  1762502417476028      AQUIS    10.275      621.0       NaN        NaN   
44145  1762502417476039      AQUIS    10.275      621.0    10.485      621.0   
82537  1762502417500397  TURQUOISE    10.300        5.0       NaN        NaN   
82538  1762502417500693  TURQUOISE    10.330      311.0       NaN        NaN   
82539  1762502417500708  TURQUOISE    10.330      311.0    10.430      311.0   

       market_trading_status  
44144              5308427.0  
44145              5308427.0  
82537              7608181.0  
82538        

### Construcci√≥n del "Virtual Consolidated Tape"

Para detectar arbitraje, es necesario comparar los precios de todos los centros de negociaci√≥n (*Venues*) en el mismo instante exacto. Sin embargo, los datos originales llegan como eventos as√≠ncronos (una fila por cada actualizaci√≥n de un solo mercado).

En este bloque transformamos los datos al formato necesario para el an√°lisis:

1.  **Pivotaje (Wide Format):** Reestructuramos el DataFrame para que el √≠ndice sea el tiempo (`epoch`) y las columnas representen los precios (*Bid/Ask*) y vol√∫menes de cada mercado simult√°neamente.
2.  **Alineaci√≥n Temporal (`Forward Fill`):** Dado que los mercados no se actualizan al un√≠sono, utilizamos la t√©cnica de `ffill`. Esto asume que el √∫ltimo precio conocido de un mercado sigue vigente hasta que llega una nueva actualizaci√≥n, simulando la visi√≥n persistente que tendr√≠a un *Smart Order Router* (SOR) en tiempo real.

| Tiempo | Evento Real              | ¬øQu√© ve Pandas sin ffill?      | ¬øQu√© ve Pandas CON ffill?        |
|--------|---------------------------|----------------------------------|-----------------------------------|
| T=1    | BME cambia a 10‚Ç¨          | BME=10, CBOE=NaN                 | BME=10, CBOE=9.98 (del pasado)    |
| T=2    | (Nadie hace nada)         | BME=NaN, CBOE=NaN                | BME=10, CBOE=9.98                 |
| T=3    | CBOE cambia a 9.99‚Ç¨       | BME=NaN, CBOE=9.99               | BME=10, CBOE=9.99                 |

3.  **Limpieza:** Se eliminan los instantes iniciales donde no todos los mercados han cotizado a√∫n, garantizando que siempre comparamos precios completos.

| Tiempo | Evento Real              | Estado del Tape (Lo que ve Python)                   | Acci√≥n de Limpieza (dropna)                   |
|--------|---------------------------|-------------------------------------------------------|------------------------------------------------|
| T=1    | BME abre a 10.00‚Ç¨         | BME=10.00, AQUIS=NaN, CBOE=NaN                        | üóëÔ∏è BORRAR (Faltan 2 mercados)                  |
| T=2    | AQUIS abre a 10.01‚Ç¨       | BME=10.00, AQUIS=10.01, CBOE=NaN                      | üóëÔ∏è BORRAR (Falta CBOE)                         |
| T=3    | (Silencio)                | BME=10.00, AQUIS=10.01, CBOE=NaN                      | üóëÔ∏è BORRAR (Sigue faltando CBOE)               |
| T=4    | CBOE abre a 9.99‚Ç¨         | BME=10.00, AQUIS=10.01, CBOE=9.99                     | ‚úÖ MANTENER (¬°Ya est√°n todos!)                 |
| T=5    | BME sube a 10.02‚Ç¨         | BME=10.02, AQUIS=10.01, CBOE=9.99                     | ‚úÖ MANTENER                                    |


In [34]:
def build_consolidated_tape(df_all_venues):

    if df_all_venues.empty:
        return pd.DataFrame()
    
    # 1. Pivotar: Convertimos VENUES en COLUMNAS
    # Usamos 'last' por si hay m√∫ltiples actualizaciones en el mismo microsegundo exacto
    tape = df_all_venues.pivot_table(
        index='epoch', 
        columns='venue', 
        values=['px_bid_0', 'px_ask_0', 'qty_bid_0', 'qty_ask_0'],
        aggfunc='last' 
    )
    
    # 2. Alineaci√≥n temporal (ffill)
    tape = tape.ffill()
    
    # 3. Limpieza 
    tape = tape.dropna()
    
    # 4. Aplanar nombres de columnas 
    # Ejemplo transformaci√≥n: ('px_bid_0', 'BME') -> 'BME_px_bid_0'
    tape.columns = [f"{col[1]}_{col[0]}" for col in tape.columns]
    
    return tape

# --- EJECUCI√ìN ---
try:
    print("Construyendo el Consolidated Tape...")
    
    # Usamos el df_ticks que cargaste exitosamente en el paso anterior
    consolidated_tape = build_consolidated_tape(df_ticks)
    
    print(f"Dimensiones: {consolidated_tape.shape}")
    print("Primeras 5 filas (Precios alineados de todos los mercados):")
    display(consolidated_tape.head()) 
    
except Exception as e:
    print(f"Error creando el tape: {e}")

Construyendo el Consolidated Tape...
Dimensiones: (86744, 16)
Primeras 5 filas (Precios alineados de todos los mercados):


Unnamed: 0_level_0,AQUIS_px_ask_0,BME_px_ask_0,CBOE_px_ask_0,TURQUOISE_px_ask_0,AQUIS_px_bid_0,BME_px_bid_0,CBOE_px_bid_0,TURQUOISE_px_bid_0,AQUIS_qty_ask_0,BME_qty_ask_0,CBOE_qty_ask_0,TURQUOISE_qty_ask_0,AQUIS_qty_bid_0,BME_qty_bid_0,CBOE_qty_bid_0,TURQUOISE_qty_bid_0
epoch,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
1762502419492911,10.47,10.395,10.425,10.43,10.37,10.365,10.37,10.37,621.0,422.0,205.0,311.0,286.0,346.0,116.0,46.0
1762502419513599,10.47,10.395,10.425,10.43,10.37,10.365,10.37,10.37,621.0,210.0,205.0,311.0,286.0,346.0,116.0,46.0
1762502419517827,10.47,10.395,10.425,10.43,10.37,10.365,10.37,10.37,621.0,210.0,205.0,311.0,286.0,346.0,116.0,46.0
1762502419517828,10.47,10.395,10.425,10.43,10.37,10.365,10.37,10.37,621.0,210.0,205.0,311.0,286.0,346.0,116.0,46.0
1762502419517831,10.47,10.395,10.425,10.43,10.37,10.365,10.37,10.37,621.0,210.0,205.0,311.0,286.0,346.0,116.0,46.0


### 5. Motor de Detecci√≥n de Oportunidades (Signal Generation)

Una vez construido el *Consolidated Tape*, procedemos a identificar instantes donde se viola la eficiencia del mercado, es decir, cuando el precio de compra en un mercado supera al precio de venta en otro.

Se implementan las siguientes reglas de negocio:

1.  **Best Bid/Offer Global:** Para cada *epoch*, calculamos el precio m√°ximo de compra (*Max Bid*) y el precio m√≠nimo de venta (*Min Ask*) disponibles entre todos los *venues*.
2.  **C√°lculo del Spread:** Definimos el beneficio potencial bruto por acci√≥n como `Spread = Max_Bid - Min_Ask`. Un spread positivo indica una oportunidad de arbitraje.
3.  **Detecci√≥n de Flancos (*Rising Edge*):** Para evitar contar la misma oportunidad m√∫ltiples veces mientras persiste, aplicamos un filtro de "flanco de subida". Solo generamos una se√±al de entrada en el instante exacto en que la condici√≥n de arbitraje pasa de `False` a `True`.
4.  **C√°lculo de Beneficio Te√≥rico:** Estimamos el P&L (Ganancia y P√©rdida) asumiendo ejecuci√≥n inmediata (Latencia 0) y tomando el volumen m√≠nimo disponible entre la orden de compra y la de venta (`min(Qty_Bid, Qty_Ask)`).

In [None]:
def get_best_prices_and_volumes(tape):
    # Creamos dos listas con los nombres de las columnas
    bid_cols = [c for c in tape.columns if 'px_bid_0' in c]
    ask_cols = [c for c in tape.columns if 'px_ask_0' in c]
    
    # Calculamos Max Bid y Min Ask, fila a fila
    tape['best_bid_price'] = tape[bid_cols].max(axis=1)
    tape['best_ask_price'] = tape[ask_cols].min(axis=1)
    
    # Identificamos qu√© mercado tiene ese precio (ej: 'BME_px_bid_0')
    tape['best_bid_col'] = tape[bid_cols].idxmax(axis=1)
    tape['best_ask_col'] = tape[ask_cols].idxmin(axis=1)
    
    # Extraer el nombre limpio del venue (en lugar de darnos 'BME_px_bid_0', nos da 'BME')
    tape['best_bid_venue'] = tape['best_bid_col'].str.split('_').str[0]
    tape['best_ask_venue'] = tape['best_ask_col'].str.split('_').str[0]
    
    return tape

def find_arbitrage_opportunities(tape):
    # Enriquecer con mejores precios
    tape = get_best_prices_and_volumes(tape)
    
    # Calcular Spread
    tape['spread'] = tape['best_bid_price'] - tape['best_ask_price']
    
    # Si el spread es positivo se muestra como una oportunidad de arbitraje
    tape['is_arbitrage'] = tape['spread'] > 0
    
    # En esta linea marcamos que entramos si la oportunidad de arbitraje ahora es TRUE y antes era FALSE
    tape['entry_signal'] = tape['is_arbitrage'] & (~tape['is_arbitrage'].shift(1).fillna(False))
    
    # Filtrar solo los momentos de entrada
    opportunities = tape[tape['entry_signal']].copy()
    
    if opportunities.empty:
        return opportunities

    # C√°lculo del Beneficio Te√≥rico en Latencia 0
    
    # Preparamos el nombre de la columna de volumen. Si el mejor precio estaba en BME_px_bid_0, sabemos que el volumen estar√° en BME_qty_bid_0
    opportunities['bid_qty_col'] = opportunities['best_bid_col'].str.replace('px', 'qty')
    opportunities['ask_qty_col'] = opportunities['best_ask_col'].str.replace('px', 'qty')
    
    # --- EXTRACCI√ìN SEGURA DE VOL√öMENES (Correcci√≥n del error) ---
    # Usamos listas por comprensi√≥n, que es m√°s seguro que apply en este contexto
    qty_bids = []
    qty_asks = []
    
    # Iteramos fila por fila sobre las oportunidades
    for idx, row in opportunities.iterrows():
        # Coge el nombre de la fila row
        qty_bids.append(row[row['bid_qty_col']])
        qty_asks.append(row[row['ask_qty_col']])
    
    opportunities['qty_bid'] = qty_bids
    opportunities['qty_ask'] = qty_asks
    # -------------------------------------------------------------
    
    # Cantidad ejecutable es el m√≠nimo de las dos puntas. Compro el valor menor que est√© en vente
    opportunities['qty_tradeable'] = opportunities[['qty_bid', 'qty_ask']].min(axis=1)
    
    # Beneficio = Spread * Cantidad
    opportunities['profit_theoretical'] = opportunities['spread'] * opportunities['qty_tradeable']
    
    return opportunities

# --- EJECUCI√ìN ---
try:
    print("Buscando oportunidades de arbitraje...")
    opps = find_arbitrage_opportunities(consolidated_tape)
    
    print(f"\n‚úÖ ¬°AN√ÅLISIS TERMINADO!")
    print(f"Oportunidades encontradas: {len(opps)}")
    
    if not opps.empty:
        total_theo = opps['profit_theoretical'].sum()
        print(f"Beneficio Total Te√≥rico (Latencia 0): {total_theo:.2f} ‚Ç¨")
        print("\nEjemplo de operaciones detectadas:")
        cols_show = ['best_bid_venue', 'best_ask_venue', 'spread', 'qty_tradeable', 'profit_theoretical']
        display(opps[cols_show].head())
    else:
        print("‚ö†Ô∏è No se encontraron oportunidades. El mercado parece eficiente.")
        
except Exception as e:
    print(f"Error en detecci√≥n: {e}")

Buscando oportunidades de arbitraje...

‚úÖ ¬°AN√ÅLISIS TERMINADO!
Oportunidades encontradas: 25
Beneficio Total Te√≥rico (Latencia 0): 23.45 ‚Ç¨

Ejemplo de operaciones detectadas:


  tape['entry_signal'] = tape['is_arbitrage'] & (~tape['is_arbitrage'].shift(1).fillna(False))


Unnamed: 0_level_0,best_bid_venue,best_ask_venue,spread,qty_tradeable,profit_theoretical
epoch,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1762502420565157,CBOE,BME,0.01,5.0,0.05
1762503744628844,BME,CBOE,0.005,315.0,1.575
1762504398062141,CBOE,BME,0.005,315.0,1.575
1762504470336957,BME,CBOE,0.005,315.0,1.575
1762504517631351,BME,CBOE,0.005,315.0,1.575
