<a href="https://colab.research.google.com/github/GUNAPILLCO/neural_profit/blob/main/2_obtencion_preparacion_exploracion_datos/2_2_limpieza_normalizaci%C3%B3n_estructuraci%C3%B3n.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 2_1_Limpieza, Normalización y estructuración de series temporales
# Preprocesamiento de Datos del Índice E-mini Nasdaq 100 (MNQ)

## 0. Clonamos el repositorio

LINK DE REPOSITORIO: https://github.com/GUNAPILLCO/neural_profit

In [3]:
#Clonamos el repo
!git clone https://github.com/GUNAPILLCO/neural_profit.git

Cloning into 'neural_profit'...
remote: Enumerating objects: 37, done.[K
remote: Counting objects: 100% (37/37), done.[K
remote: Compressing objects: 100% (35/35), done.[K
remote: Total 37 (delta 0), reused 34 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (37/37), 40.88 MiB | 21.17 MiB/s, done.


## 1. Importación de Librerías

In [4]:
import sys
!{sys.executable} -m pip install -q pandas_market_calendars
print("✅ Librería instalada: pandas_market_calendars")

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/123.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m123.9/123.9 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/200.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.1/200.1 kB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[?25h✅ Librería instalada: pandas_market_calendars


In [5]:
# Utilidades generales
from datetime import datetime, timedelta
import os
import glob
import warnings
warnings.filterwarnings('ignore')

# Manejo y procesamiento de datos
import pandas as pd
from tabulate import tabulate

# Calendario de mercados
import pandas_market_calendars as mcal
import pandas as pd
import requests
from io import StringIO


## 2. Contexto y fuente de datos

Los datos corresponden al contrato MNQ (Micro E-mini Nasdaq 100) descargados desde NinjaTrader con frecuencia de un minuto (formato OHLCV).

- Open: precio de apertura
- High: precio máximo
- Low: precio mínimo
- Close: precio de cierre
- Volume: volumen negociado

Los datos están en la zona horaria UTC.


## 3. Generación de dataset

Dado que los contratos se encuentran almacenados en archivos .txt dentro de la carpeta historicos_mnq, es necesario unificarlos en un único dataset consolidado.

La siguiente función se encarga de leer los archivos .txt, asignar nombres a las columnas correspondientes y establecer la columna datetime como índice temporal del dataframe.

In [7]:
def generar_df ():

    # Ruta a los archivos .txt
    ruta_historicos = '/content/neural_profit/1_historicos_mnq/*.txt'  # Reemplace con su ruta local

    # Lista para almacenar DataFrames individuales
    df_mnq = []

    # Leer todos los archivos .txt
    for archivo in glob.glob(ruta_historicos):
        df = pd.read_csv(
            archivo,
            sep=';',
            header=None,
            names=['datetime', 'open', 'high', 'low', 'close', 'volume'],
            dtype={'open': float, 'high': float, 'low': float, 'close': float, 'volume': int}
        )

        # Convertir columna 'datetime' al formato datetime real
        df['datetime'] = pd.to_datetime(df['datetime'], format='%Y%m%d %H%M%S')

        # Establecer como índice
        df.set_index('datetime', inplace=True)

        df_mnq.append(df)

    # Unir todos los DataFrames
    df_mnq_raw = pd.concat(df_mnq)
    # Ordenar por fecha si es necesario
    df_mnq_raw.sort_index(inplace=True)

    return df_mnq_raw

El siguiente bloque de código verifica si el dataset consolidado ya ha sido generado previamente.

En particular, comprueba la existencia del archivo mnq_raw_data.parquet.

- Si el archivo está presente, se carga directamente en la variable df_mnq.

- En caso contrario, se invoca la función generate_dataset() para generar el dataset a partir de los archivos originales.

In [10]:
def cargar_o_generar_df():

    archivo = '/content/neural_profit/2_obtencion_preparacion_exploracion_datos/mnq_raw_data.parquet'

    if os.path.exists(archivo):
        print("📂 Archivo encontrado en disco. Cargando dataset local...")
        df_mnq_raw = pd.read_parquet(archivo)

    else:
        print("⚠️ Archivo no encontrado en GitHub. Generando dataset desde archivos históricos...")
        df_mnq_raw = generar_df()
        df_mnq_raw.to_parquet(archivo, index=True)
        print("✅ Dataset generado y guardado localmente.")

    return df_mnq_raw

In [11]:
df_mnq_raw = cargar_o_generar_df()

📂 Archivo encontrado en disco. Cargando dataset local...


## 4. Filtrado de días no hábiles y horario bursátil

### 4.1. Filtrado de fines de semana y feriados bursátiles estadounidenses

Es necesario filtrar del conjunto de datos aquellas filas correspondientes a sábados, domingos y feriados bursátiles. Para ello, se utilizará la librería pandas_market_calendars, que permite identificar los días hábiles de operación según el calendario oficial del NASDAQ.

La función implementada filtra un DataFrame con índice de tipo DatetimeIndex, conservando únicamente aquellas filas cuya fecha coincida con un día hábil del mercado. La marca temporal completa (fecha y hora) se mantiene sin modificaciones.

In [13]:
def filtrar_dias_habiles_nasdaq(df):
    # Crear el calendario del mercado NASDAQ
    nasdaq = mcal.get_calendar('NASDAQ')

    # Obtener el rango de fechas del índice del DataFrame
    start_date = df.index.min().date()
    end_date = df.index.max().date()

    # Obtener el cronograma de días hábiles del mercado
    valid_dates = nasdaq.schedule(start_date=start_date, end_date=end_date).index.date

    # Filtrar el DataFrame verificando si la fecha de cada marca temporal está en los días válidos
    df_filtrado = df[df.index.normalize().isin(valid_dates)]

    return df_filtrado

In [14]:
df_mnq = filtrar_dias_habiles_nasdaq (df_mnq_raw)

### 4.2. Filtrado de horario de operación de mercado de New York (09:30 a 16:00) con pre mercado, desde las 07:30

Dado que los timestamps del índice (DatetimeIndex) provienen de archivos .txt sin información de zona horaria, es necesario indicar explícitamente a pandas que dichos valores están en formato UTC.

Una vez establecido el timezone, se procede a convertir los timestamps desde UTC a la hora local del mercado estadounidense (zona US/Eastern), correspondiente a los horarios de operación del NASDAQ/NYSE. Esta conversión se realiza teniendo en cuenta automáticamente los ajustes por horario de verano o invierno.

In [15]:
def configurar_zona_horaria(df, from_tz='UTC', to_tz='America/New_York'):
    """
    Asegura que el índice del DataFrame tenga zona horaria 'from_tz'
    y lo convierte a la zona horaria 'to_tz'.
    """
    if df.index.tz is None:
        # Localiza en from_tz si no tiene zona horaria
        df.index = df.index.tz_localize(from_tz)
    # Convierte a la zona horaria deseada
    df.index = df.index.tz_convert(to_tz)
    return df

In [16]:
df_mnq = configurar_zona_horaria(df_mnq)

La siguiente función selecciona únicamente las muestras que se encuentran dentro del horario regular de operación bursátil del NASDAQ.

Filtra un DataFrame cuyo índice es de tipo DatetimeIndex, conservando solo aquellas filas cuya marca temporal se encuentre entre las 09:30 y 16:00 horas (US/Eastern), correspondientes al horario de negociación estándar en días hábiles de mercado.

Particularmente, decido agregar una hora de pre mercado, desde las 07:30AM.

In [17]:
def filtrar_horas_habiles_nasdaq(df):

    # Filtrar solo las filas dentro de las horas de mercado (de 7:30 AM a 4:00 PM)
    df_filtered = df.between_time('07:30:00', '16:00:00')

    # Retornar el DataFrame filtrado
    return df_filtered

In [18]:
df_mnq = filtrar_horas_habiles_nasdaq(df_mnq)

## 5. Análisis de registros diarios

Es necesario verificar que todos los días del conjunto de datos contengan la misma cantidad de registros y que estos sean consecutivos, es decir, que no falte ningún minuto dentro de cada jornada.

La función analizar_registros_por_dia permite realizar este control sobre un DataFrame con índice de tipo datetime. La función contabiliza la cantidad de registros por día e imprime una tabla resumen que indica cuántos días presentan una determinada cantidad de registros. Esto resulta útil para identificar inconsistencias, como días incompletos o interrupciones en la frecuencia temporal esperada.

In [19]:
def analizar_registros_por_dia(df: pd.DataFrame) -> pd.Series:
    """
    Analiza la cantidad de registros por día en un DataFrame con índice datetime.

    Imprime:
    - Distribución de la cantidad de registros por día.
    - Porcentaje de días con menos registros que el valor más frecuente.

    Retorna:
    - Serie con el conteo de registros por cada día.
    """
    # Contar la cantidad de registros por día
    conteo_diario = df.groupby(df.index.date).size()

    # Calcular la distribución de registros por día
    distribucion = conteo_diario.value_counts().sort_index(ascending=False)
    tabla = [[registros, cantidad_dias] for registros, cantidad_dias in distribucion.items()]

    print("Distribución de cantidad de registros por día:\n")
    print(tabulate(tabla, headers=["Registros por día", "Cantidad de días"], tablefmt="grid"))

    # Determinar el valor más frecuente de registros por día
    registros_dia_completo = conteo_diario.mode().iloc[0]
    print(f"\nCantidad de registros en un día completo: {registros_dia_completo}")

    # Calcular el porcentaje de días incompletos
    total_dias = len(conteo_diario)
    dias_con_menos = (conteo_diario < registros_dia_completo).sum()
    porcentaje = (dias_con_menos / total_dias) * 100

    print(f"Días con menos de {registros_dia_completo} registros: {dias_con_menos} de {total_dias} ({porcentaje:.2f}%)")

    return conteo_diario, registros_dia_completo

In [20]:
resumen, registros_dia_completo = analizar_registros_por_dia(df_mnq)

Distribución de cantidad de registros por día:

+---------------------+--------------------+
|   Registros por día |   Cantidad de días |
|                 511 |               1198 |
+---------------------+--------------------+
|                 510 |                  8 |
+---------------------+--------------------+
|                 509 |                  4 |
+---------------------+--------------------+
|                 508 |                  2 |
+---------------------+--------------------+
|                 504 |                  1 |
+---------------------+--------------------+
|                 503 |                  1 |
+---------------------+--------------------+
|                 502 |                  2 |
+---------------------+--------------------+
|                 500 |                  2 |
+---------------------+--------------------+
|                 499 |                  1 |
+---------------------+--------------------+
|                 458 |                  1 |
+------

Como podemos observar en la tabla, la gran mayoría de días tienen `511` muestras. Y representan más del 95% del total de los datos.


### 5.1. Filtrado de días incompletos

La siguiente función encuentra los indices de las fechas con registros incompletos (menor a 511):

In [21]:
def busqueda_fechas_incompletas(df: pd.DataFrame, gap_minutes: int = 1, registros_esperados: int = registros_dia_completo) -> list:
    """
    Muestra una tabla con los días que tienen menos de los registros esperados o presentan irregularidades temporales.
    Retorna una lista con esas fechas.

    Parámetros:
    - df: DataFrame con índice datetime.
    - gap_minutes: intervalo esperado entre registros consecutivos (en minutos).
    - registros_esperados: cantidad esperada de registros por día.

    Retorna:
    - Lista de fechas (datetime.date) con menos registros de los esperados o problemas temporales.
    """
    df = df.copy()
    df['time_diff'] = df.index.to_series().diff()
    base_time_diff = pd.Timedelta(minutes=gap_minutes)

    conteos = df.groupby(df.index.date).size()
    fechas_problema = []

    for date, group in df.groupby(df.index.date):
        time_diff = group['time_diff'].iloc[1:]
        tiene_irregularidades = (time_diff != base_time_diff).any()
        cantidad = conteos[date]

        if cantidad < registros_esperados or tiene_irregularidades:
            fechas_problema.append((date, cantidad))

    '''
    if fechas_problema:
        print(f"\nDía con menos de {registros_esperados} registros o con problemas temporales:\n")
        print(f"{'+' + '-'*21 + '+' + '-'*20 + '+'}")
        print(f"| {'Fecha'.ljust(19)} | {'Registros'.rjust(18)} |")
        print(f"{'+' + '='*21 + '+' + '='*20 + '+'}")
        for fecha, registros in fechas_problema:
            print(f"| {str(fecha).ljust(19)} | {str(registros).rjust(18)} |")
            print(f"{'+' + '-'*21 + '+' + '-'*20 + '+'}")
    else:
        print("No se encontraron días con irregularidades ni registros incompletos.")
    '''
    # Solo devolver las fechas
    return [fecha for fecha, _ in fechas_problema]

Elimino las fechas con registros incompletos:

In [22]:
# Filtrar eliminando las fechas con problemas
df_mnq = df_mnq[~df_mnq.index.to_series().dt.date.isin(busqueda_fechas_incompletas(df_mnq))]

#Y verifico con:
resumen=analizar_registros_por_dia(df_mnq)

Distribución de cantidad de registros por día:

+---------------------+--------------------+
|   Registros por día |   Cantidad de días |
|                 511 |               1198 |
+---------------------+--------------------+

Cantidad de registros en un día completo: 511
Días con menos de 511 registros: 0 de 1198 (0.00%)


## 6. Verificación de continuidad temporal minuto a minuto

Es necesario verificar que los 451 registros correspondientes a un mismo día estén dispuestos de forma consecutiva, con una separación exacta de un minuto entre cada muestra.

In [23]:
def detectar_gaps(df: pd.DataFrame, gap_minutes: int = 1):
    """
    Verifica si existen saltos mayores al intervalo esperado (por defecto 1 minuto)
    entre registros consecutivos dentro de cada día, en un DataFrame con índice tipo DatetimeIndex.

    Omite el primer registro de cada día.

    Parámetros:
    - df: DataFrame con índice datetime.
    - gap_minutes: tamaño esperado del intervalo en minutos (por defecto 1).

    Retorna:
    - Lista de índices donde se detectaron diferencias mayores al intervalo esperado.
    """
    df = df.copy()
    df['time_diff'] = df.index.to_series().diff()

    base_time_diff = pd.Timedelta(minutes=gap_minutes)
    problem_indices = []

    for date, group in df.groupby(df.index.date):
        time_diff = group['time_diff'].iloc[1:]
        incorrect_indices = time_diff[time_diff != base_time_diff].index
        if len(incorrect_indices) > 0:
            problem_indices.append(incorrect_indices)

    if problem_indices:
        print(f"Se encontraron problemas en {len(problem_indices)} registros con diferencias irregulares.\n")

        # Conteo por fecha
        conteos = df.groupby(df.index.date).size()

        for i in range(len(problem_indices)):
            idx = problem_indices[i][0]
            diff = df.loc[idx, 'time_diff']
            date = idx.date()
            count = conteos[date]
            print(f'\t{idx} -> Diferencia: {diff} | # Registros: {count}')
    else:
        print("No se encontraron problemas, todas las muestras son consecutivas minuto a minuto.")

    return problem_indices

In [24]:
detectar_gaps(df_mnq)

No se encontraron problemas, todas las muestras son consecutivas minuto a minuto.


[]

## 7. Guardado de dataset final



Se guarda un dataset compuesto por 1198 días, cada uno con 511 registros correspondientes a minutos consecutivos.

El conjunto de datos incluye únicamente días hábiles de operación bursátil, y abarca el intervalo horario comprendido entre las 07:30 y las 16:00 horas (US/Eastern).

In [27]:
#Guardamos el dataset
ruta_mnq_intraday = '/content/neural_profit/2_obtencion_preparacion_exploracion_datos/mnq_intraday_data.parquet'
df_mnq.to_parquet(ruta_mnq_intraday, index=True)

In [30]:
#Cambiamos de directorio
%cd neural_profit

/content/neural_profit


In [31]:
token = " " #busca el archivo token en D:\Escritorio\UBA-CEIA\token_neural_profit
!git config --global user.email "gusunapillco@gmail.com"
!git config --global user.name "GUNAPILLCO"
!git add .
!git commit -m "Actualización de datasets desde colab"
!git push https://GUNAPILLCO:{token}@github.com/GUNAPILLCO/neural_profit.git

print("Dataset guardados correctamente")

[main 6f1cbab] Actualización de notebooks y datasets desde colab
 2 files changed, 0 insertions(+), 0 deletions(-)
 create mode 100644 2_obtencion_preparacion_exploracion_datos/mnq_intraday_data.parquet
 create mode 100644 2_obtencion_preparacion_exploracion_datos/mnq_raw_data.parquet
Enumerating objects: 7, done.
Counting objects: 100% (7/7), done.
Delta compression using up to 2 threads
Compressing objects: 100% (5/5), done.
Writing objects: 100% (5/5), 29.94 MiB | 4.55 MiB/s, done.
Total 5 (delta 2), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (2/2), completed with 2 local objects.[K
To https://github.com/GUNAPILLCO/neural_profit.git
   aab5afd..6f1cbab  main -> main
