# Notebook 1 - Carga de Datos

Este notebook deja el dataset listo para N2 con enfoque `fail-fast`: si un control critico falla, se detiene la ejecucion.

Flujo del notebook:
1. Configuracion y parametros operativos.
2. Carga del parquet base y validaciones tecnicas.
3. Resumen de cobertura y validacion de benchmark (SPY).
4. Exportes finales + checklist de readiness para N2.


## 1) Configuracion del Notebook
Se definen parametros del enunciado, rutas de artefactos y esquema esperado.
Por que: mantener trazabilidad y consistencia desde N1 hasta N5.


In [1]:
# Bloque 1: configuracion global, esquema esperado y utilidades fail-fast.
import numpy as np
import pandas as pd
import pyarrow as pa
import yfinance as yf

# Parametros maestros del enunciado para N1-N5.
PARAMS = {
    "initial_capital": 250000,
    "backtest_start": "2015-01-01",
    "benchmark_ticker": "SPY",
    "universe_rule": "in_sp500_point_in_time_last_13_months",
    "transaction_fee_rate": 0.0023,
    "min_fee_per_order": 23.0,
}

# Rutas de entrada/salida usadas por el pipeline de datos.
INPUT_PARQUET = "data/raw/sp500_history.parquet"
OUT_CANONICAL = "data/processed/sp500_canonical.parquet"
OUT_SPY_PARQUET = "data/raw/spy_yfinance.parquet"
OUT_CHECKS_CSV = "data/processed/n1_data_quality_checks.csv"
OUT_COVERAGE_CSV = "data/processed/n1_coverage_summary.csv"
OUT_RUNTIME_PARAMS_CSV = "data/processed/n1_runtime_params.csv"

# Esquema esperado para asegurar consistencia entre notebooks.
EXPECTED_COLUMNS = [
    "date",
    "symbol",
    "assetid",
    "security_name",
    "sector",
    "industry",
    "subsector",
    "in_sp500",
    "open",
    "high",
    "low",
    "close",
    "volume",
    "unadjusted_close",
]

# Columnas que deben ser numericas para calcular senales y costes despues.
NUMERIC_COLUMNS = [
    "in_sp500",
    "open",
    "high",
    "low",
    "close",
    "volume",
    "unadjusted_close",
]

# Campos minimos para mantener integridad del backtest.
CRITICAL_COLUMNS = [
    "date",
    "symbol",
    "in_sp500",
    "open",
    "high",
    "low",
    "close",
    "volume",
]


def fail_if(condition: bool, message: str) -> None:
    """Aplica fail-fast para cortar ejecucion ante datos no confiables."""
    if condition:
        raise ValueError(message)


def record_check(
    results: list,
    check_name: str,
    failed: bool,
    details: str,
) -> None:
    """Guarda un check tecnico y aborta el pipeline cuando falla."""
    passed = not failed
    results.append({"check": check_name, "passed": passed, "details": details})
    fail_if(failed, f"[{check_name}] {details}")



## 2) Carga y Validacion Tecnica del Dataset Base
Se carga `sp500_history.parquet` y se ejecutan checks de integridad, tipos, nulos, duplicados y coherencia OHLCV.
Por que: evitar propagar errores a N2/N3/N4.


In [2]:
# Bloque 2: carga del parquet base y controles tecnicos de calidad.
def load_input_data(
    input_parquet: str,
    expected_columns: list[str],
) -> pd.DataFrame:
    """Carga el parquet base y normaliza tipos clave para evitar errores aguas abajo."""
    input_exists = pd.io.common.file_exists(input_parquet)
    fail_if(
        not input_exists,
        f"Input parquet no encontrado: {input_parquet}",
    )

    # Seleccion explicita de columnas para detectar schema drift temprano.
    df = pd.read_parquet(
        input_parquet,
        columns=expected_columns,
        engine="pyarrow",
    )

    # Tipado base para joins/orden temporal determinista en todos los notebooks.
    df.columns = [str(col).lower() for col in df.columns]
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df["symbol"] = df["symbol"].astype("string")
    df = df.sort_values(["date", "symbol"]).reset_index(drop=True)
    return df


def run_data_quality_checks(
    df: pd.DataFrame,
    expected_columns: list[str],
) -> pd.DataFrame:
    """Ejecuta controles de estructura y coherencia para habilitar el backtest."""
    validation_rows = []

    # 1) Integridad de esquema.
    missing_columns = sorted(set(expected_columns) - set(df.columns))
    record_check(
        validation_rows,
        "required_columns_present",
        bool(missing_columns),
        f"missing_columns={missing_columns}",
    )

    # 2) Tipos esenciales.
    date_is_datetime = pd.api.types.is_datetime64_any_dtype(df["date"])
    record_check(
        validation_rows,
        "date_is_datetime",
        not date_is_datetime,
        "date dtype invalido",
    )

    symbol_is_string = pd.api.types.is_string_dtype(df["symbol"])
    record_check(
        validation_rows,
        "symbol_is_string",
        not symbol_is_string,
        "symbol dtype invalido",
    )

    numeric_type_failures = [
        col for col in NUMERIC_COLUMNS
        if not pd.api.types.is_numeric_dtype(df[col])
    ]
    record_check(
        validation_rows,
        "numeric_columns_dtype",
        bool(numeric_type_failures),
        f"invalid_numeric_dtype={numeric_type_failures}",
    )

    # 3) Unicidad y nulos criticos.
    duplicate_count = int(df.duplicated(["date", "symbol"]).sum())
    record_check(
        validation_rows,
        "duplicate_date_symbol",
        duplicate_count > 0,
        f"duplicate_rows={duplicate_count}",
    )

    critical_null_count = int(df[CRITICAL_COLUMNS].isna().sum().sum())
    record_check(
        validation_rows,
        "critical_nulls",
        critical_null_count > 0,
        f"critical_null_count={critical_null_count}",
    )

    # 4) Reglas economicas minimas para OHLCV.
    price_array = df[["open", "high", "low", "close"]].to_numpy(dtype=float)
    invalid_price_mask = (~np.isfinite(price_array)) | (price_array <= 0)
    invalid_price_count = int(invalid_price_mask.any(axis=1).sum())
    record_check(
        validation_rows,
        "prices_positive",
        invalid_price_count > 0,
        f"invalid_price_rows={invalid_price_count}",
    )

    volume_array = df["volume"].to_numpy(dtype=float)
    invalid_volume_mask = (~np.isfinite(volume_array)) | (volume_array < 0)
    invalid_volume_count = int(invalid_volume_mask.sum())
    record_check(
        validation_rows,
        "volume_non_negative",
        invalid_volume_count > 0,
        f"invalid_volume_rows={invalid_volume_count}",
    )

    high_inconsistency = df["high"] < df[["open", "close", "low"]].max(axis=1)
    low_inconsistency = df["low"] > df[["open", "close", "high"]].min(axis=1)
    invalid_ohlc_count = int((high_inconsistency | low_inconsistency).sum())
    record_check(
        validation_rows,
        "ohlc_consistency",
        invalid_ohlc_count > 0,
        f"invalid_ohlc_rows={invalid_ohlc_count}",
    )

    # 5) Bandera point-in-time del universo en dominio binario.
    invalid_in_sp500_count = int((~df["in_sp500"].isin([0, 1])).sum())
    record_check(
        validation_rows,
        "in_sp500_binary",
        invalid_in_sp500_count > 0,
        f"invalid_in_sp500_rows={invalid_in_sp500_count}",
    )

    checks_df = pd.DataFrame(validation_rows)
    return checks_df



## 3) Cobertura del Universo y Benchmark
Se resume cobertura temporal/universo y se descarga/valida SPY con fallback local.
Por que: asegurar comparativa con benchmark y continuidad operativa offline.


In [3]:
# Bloque 3: cobertura historica del universo y benchmark SPY.
def build_coverage_summary(df: pd.DataFrame) -> dict:
    """Resume cobertura temporal y amplitud del universo historico."""
    date_min = df["date"].min()
    date_max = df["date"].max()
    total_symbols = int(df["symbol"].nunique())

    last_data_date = date_max
    symbols_on_last_date = int(
        df.loc[df["date"] == last_data_date, "symbol"].nunique()
    )

    # Activo/inactivo se define por presencia en la ultima fecha disponible.
    last_date_by_symbol = df.groupby("symbol", observed=True)["date"].max()
    active_symbols = int((last_date_by_symbol == last_data_date).sum())
    inactive_symbols = int(total_symbols - active_symbols)

    # Conteo diario point-in-time de miembros del S&P500.
    sp500_daily_count = (
        df.loc[df["in_sp500"] == 1]
        .groupby("date", observed=True)["symbol"]
        .nunique()
        .rename("sp500_members")
    )

    coverage_summary = {
        "date_min": date_min.date().isoformat(),
        "date_max": date_max.date().isoformat(),
        "total_rows": int(len(df)),
        "total_symbols": total_symbols,
        "symbols_on_last_date": symbols_on_last_date,
        "active_symbols": active_symbols,
        "inactive_symbols": inactive_symbols,
        "sp500_members_min": int(sp500_daily_count.min()),
        "sp500_members_median": float(sp500_daily_count.median()),
        "sp500_members_max": int(sp500_daily_count.max()),
    }
    return coverage_summary


def _normalize_yfinance_columns(spy_raw: pd.DataFrame) -> pd.DataFrame:
    """Convierte columnas de yfinance a minusculas y snake_case."""
    if isinstance(spy_raw.columns, pd.MultiIndex):
        spy_raw.columns = [
            str(col[0]).lower().replace(" ", "_")
            for col in spy_raw.columns
        ]
    else:
        spy_raw.columns = [
            str(col).lower().replace(" ", "_")
            for col in spy_raw.columns
        ]
    return spy_raw


def download_benchmark(
    benchmark_ticker: str,
    start_date: str,
    out_spy_parquet: str,
) -> tuple[pd.DataFrame, dict, str]:
    """Descarga SPY y aplica fallback local para permitir reejecucion offline."""
    spy_source = "yfinance_download"
    try:
        spy_raw = yf.download(
            tickers=benchmark_ticker,
            start=start_date,
            progress=False,
            auto_adjust=False,
        )
        fail_if(spy_raw.empty, "No se pudo descargar SPY desde yfinance.")

        spy_raw = _normalize_yfinance_columns(spy_raw)
        spy_df = spy_raw.reset_index()
        spy_df.columns = [
            str(col).lower().replace(" ", "_")
            for col in spy_df.columns
        ]
        spy_df["date"] = pd.to_datetime(spy_df["date"], errors="coerce")
        spy_df["symbol"] = benchmark_ticker
    except Exception as exc:
        spy_source = "local_parquet_fallback"
        fallback_exists = pd.io.common.file_exists(out_spy_parquet)
        fail_if(
            not fallback_exists,
            "No se pudo descargar SPY y no existe fallback local. "
            f"Error original: {exc}",
        )
        spy_df = pd.read_parquet(out_spy_parquet, engine="pyarrow")

    # Columnas minimas requeridas para benchmark en notebooks posteriores.
    required_columns = ["date", "symbol", "open", "high", "low", "close", "volume"]
    missing_columns = sorted(set(required_columns) - set(spy_df.columns))

    spy_check = {
        "check": "spy_required_columns_present",
        "passed": not bool(missing_columns),
        "details": f"missing_columns={missing_columns}",
    }
    fail_if(
        bool(missing_columns),
        f"[spy_required_columns_present] {spy_check['details']}",
    )

    spy_df["date"] = pd.to_datetime(spy_df["date"], errors="coerce")
    spy_df = spy_df[required_columns].sort_values("date").reset_index(drop=True)
    spy_df.to_parquet(out_spy_parquet, engine="pyarrow", index=False)

    return spy_df, spy_check, spy_source


def export_artifacts(
    canonical_df: pd.DataFrame,
    checks_df: pd.DataFrame,
    coverage_summary: dict,
    runtime_params: dict,
    out_canonical: str,
    out_checks_csv: str,
    out_coverage_csv: str,
    out_runtime_csv: str,
) -> dict:
    """Exporta datasets y tablas de control para continuidad N2-N5."""
    canonical_df.to_parquet(out_canonical, engine="pyarrow", index=False)
    checks_df.to_csv(out_checks_csv, index=False)
    pd.DataFrame([coverage_summary]).to_csv(out_coverage_csv, index=False)

    runtime_df = pd.DataFrame(
        [{"parameter": key, "value": value} for key, value in runtime_params.items()]
    )
    runtime_df.to_csv(out_runtime_csv, index=False)

    artifacts = {
        "canonical_exists": pd.io.common.file_exists(out_canonical),
        "benchmark_exists": pd.io.common.file_exists(OUT_SPY_PARQUET),
        "checks_csv_exists": pd.io.common.file_exists(out_checks_csv),
        "coverage_csv_exists": pd.io.common.file_exists(out_coverage_csv),
        "runtime_params_csv_exists": pd.io.common.file_exists(out_runtime_csv),
    }
    return artifacts



## 4) Orquestacion y Exportes de N1
Se ejecuta el pipeline completo, se exportan artefactos y se valida readiness para N2.
Por que: dejar un punto de entrada estable y auditable para el siguiente notebook.


In [4]:
# Bloque 4: orquestacion, exportes y checklist de readiness.
def run_notebook_1() -> dict:
    """Orquesta todo N1 y devuelve objetos de control para inspeccion."""
    # 1) Carga y validacion del dataset principal del universo.
    df = load_input_data(INPUT_PARQUET, EXPECTED_COLUMNS)
    checks_df = run_data_quality_checks(df, EXPECTED_COLUMNS)
    coverage_summary = build_coverage_summary(df)

    # 2) Carga del benchmark para comparativas y metricas futuras.
    spy_df, spy_check, spy_source = download_benchmark(
        benchmark_ticker=PARAMS["benchmark_ticker"],
        start_date=PARAMS["backtest_start"],
        out_spy_parquet=OUT_SPY_PARQUET,
    )
    checks_df = pd.concat([checks_df, pd.DataFrame([spy_check])], ignore_index=True)

    # 3) Parametros operativos centralizados en CSV (sin JSON global).
    runtime_params = {
        "initial_capital": PARAMS["initial_capital"],
        "backtest_start": PARAMS["backtest_start"],
        "benchmark_ticker": PARAMS["benchmark_ticker"],
        "universe_rule": PARAMS["universe_rule"],
        "transaction_fee_rate": PARAMS["transaction_fee_rate"],
        "min_fee_per_order": PARAMS["min_fee_per_order"],
        "backtest_end_data": coverage_summary["date_max"],
        "input_parquet": INPUT_PARQUET,
        "canonical_parquet": OUT_CANONICAL,
        "benchmark_parquet": OUT_SPY_PARQUET,
        "checks_csv": OUT_CHECKS_CSV,
        "coverage_csv": OUT_COVERAGE_CSV,
        "runtime_params_csv": OUT_RUNTIME_PARAMS_CSV,
        "spy_source": spy_source,
        "numpy_version": np.__version__,
        "pandas_version": pd.__version__,
        "pyarrow_version": pa.__version__,
        "yfinance_version": yf.__version__,
    }

    # 4) Exportes finales del notebook.
    artifacts = export_artifacts(
        canonical_df=df,
        checks_df=checks_df,
        coverage_summary=coverage_summary,
        runtime_params=runtime_params,
        out_canonical=OUT_CANONICAL,
        out_checks_csv=OUT_CHECKS_CSV,
        out_coverage_csv=OUT_COVERAGE_CSV,
        out_runtime_csv=OUT_RUNTIME_PARAMS_CSV,
    )

    # 5) Checklist de readiness para desbloquear Notebook_2.
    readiness_checklist = {
        "all_critical_checks_passed": bool(checks_df["passed"].all()),
        "canonical_dataset_created": artifacts["canonical_exists"],
        "benchmark_dataset_created": artifacts["benchmark_exists"],
        "checks_csv_created": artifacts["checks_csv_exists"],
        "coverage_csv_created": artifacts["coverage_csv_exists"],
        "runtime_params_csv_created": artifacts["runtime_params_csv_exists"],
    }

    readiness_df = pd.DataFrame(
        [{"item": key, "passed": value} for key, value in readiness_checklist.items()]
    )

    fail_if(
        not all(readiness_checklist.values()),
        "Data readiness FAILED: revisar checks y artefactos.",
    )

    return {
        "readiness_df": readiness_df,
        "checks_df": checks_df,
        "coverage_summary": coverage_summary,
        "runtime_params": runtime_params,
        "artifacts": artifacts,
        "spy_rows": int(len(spy_df)),
    }


# Ejecucion principal del notebook.
results = run_notebook_1()
print("DATA READINESS: PASS. El proyecto puede continuar en Notebook_2.")
results["readiness_df"]



DATA READINESS: PASS. El proyecto puede continuar en Notebook_2.


Unnamed: 0,item,passed
0,all_critical_checks_passed,True
1,canonical_dataset_created,True
2,benchmark_dataset_created,True
3,checks_csv_created,True
4,coverage_csv_created,True
5,runtime_params_csv_created,True


## 5) Salidas de Control
Se muestran checks, cobertura y estado de artefactos para revision rapida.


In [5]:
# Resumen de checks tecnicos ejecutados en N1.
results["checks_df"]


Unnamed: 0,check,passed,details
0,required_columns_present,True,missing_columns=[]
1,date_is_datetime,True,date dtype invalido
2,symbol_is_string,True,symbol dtype invalido
3,numeric_columns_dtype,True,invalid_numeric_dtype=[]
4,duplicate_date_symbol,True,duplicate_rows=0
5,critical_nulls,True,critical_null_count=0
6,prices_positive,True,invalid_price_rows=0
7,volume_non_negative,True,invalid_volume_rows=0
8,ohlc_consistency,True,invalid_ohlc_rows=0
9,in_sp500_binary,True,invalid_in_sp500_rows=0


In [6]:
# Resumen de cobertura temporal y de universo.
pd.DataFrame([results["coverage_summary"]])


Unnamed: 0,date_min,date_max,total_rows,total_symbols,symbols_on_last_date,active_symbols,inactive_symbols,sp500_members_min,sp500_members_median,sp500_members_max
0,1990-01-02,2026-01-30,7250110,1289,650,650,639,498,500.0,507


In [7]:
# Estado de artefactos exportados por el pipeline.
pd.DataFrame([results["artifacts"]])


Unnamed: 0,canonical_exists,benchmark_exists,checks_csv_exists,coverage_csv_exists,runtime_params_csv_exists
0,True,True,True,True,True


## Fin Notebook 1
Todos los controles criticos deben quedar en `passed=True` para continuar en N2.
