# Limpieza y enriquecimiento de índices bursátiles – Capa Silver

**Descripcion:**<br>
Toma los archivos Parquet de la capa *Bronze* que contienen precios diarios históricos de los principales índices bursátiles de EE. UU. y construye un **panel diario completo** imputando fechas faltantes mediante *forward‑fill*, calculando rendimientos diarios y marcando la calidad de cada registro. El resultado se escribe como Parquet particionado por `ticker/year` en la capa *Silver* dentro de Azure Data Lake Storage Gen2.<br>

**Objetivos**:<br>
- Leer y consolidar todos los datos Bronze de índices.<br>
- Construir un calendario uniforme (sin huecos) para cada índice.<br>
- Imputar valores faltantes de precios y volumen mediante *forward‑fill*.<br>
- Registrar la procedencia** de cada punto con la columna `data_quality_flag` ("source" vs "ffill").<br>
- Calcular el rendimiento diario (`daily_return`)** con precisión de 6 decimales.<br>
- Persistir en Parquet particionado** (`ticker` / `year`) en la capa Silver para lecturas eficientes.

## 1. Configuración básica, librerías y Parámetros globales del pipeline

In [1]:
# ================================================================
# 1.1 Configuración básica y librerías
# ================================================================
import os, io, pandas as pd, datetime as dt
import numpy as np  # para vectorización de cálculos
from dotenv import load_dotenv  # lee variables de entorno locales
import adlfs                              # interfaz fsspec para ADLS

# Utilidades propias para subir archivos y obtener un cliente ADLS
from utils_adls import upload_bytes, _client, CONTAINER

# Carga credenciales (AZ_STORAGE_ACCOUNT, AZ_ACCOUNT_KEY, etc.)--
load_dotenv()
ACCOUNT  = os.getenv("AZ_STORAGE_ACCOUNT")
KEY      = os.getenv("AZ_ACCOUNT_KEY")

# Prefijos / rutas de trabajo -----------------------------------
BRONZE_PREFIX = "bronze/indices"
SILVER_PREFIX = "silver/indices_clean"
LOCAL_TMP     = "data/silver_tmp"; os.makedirs(LOCAL_TMP, exist_ok=True)

# Instancia del sistema de archivos ADLS vía fsspec ------------
fs = adlfs.AzureBlobFileSystem(account_name=ACCOUNT,
                               account_key=KEY,
                               use_ssl=True)

## 2. Leer Capa Bronze, limpieza y muestreo de datos iniciales

In [2]:
# ================================================================
# 2.1 Leer Bronze
# ================================================================
# Carga parquet Bronze -------------------------------------------
#   • Localiza todos los Parquet de Bronze (particionados por year).
#   • Concatena en un único DataFrame ordenado por ticker + date.
# -----------------------------------------------------------------

CONTAINER = "market"                                # contenedor ADLS
BRONZE_PREFIX = f"{CONTAINER}/bronze/indices"       # prefijo completo

# Lista recursiva de archivos Year‑Partitioned
paths = fs.glob(f"{BRONZE_PREFIX}/*/year=*/*.parquet")

# Lectura perezosa (abre cada blob como archivo binario)
open_files = [fs.open(p, "rb") for p in paths]

# DataFrame consolidado
bronze_df = (
    pd.concat(pd.read_parquet(f) for f in open_files)
      .sort_values(["ticker", "date"])
)
bronze_df.tail(30)

Unnamed: 0,date,close,high,low,open,volume,ticker,year
110,2025-06-12,60366.988281,60378.179688,59969.980469,60189.710938,0,WILSHIRE5000,2025
111,2025-06-13,59645.230469,60366.988281,59511.320312,60366.988281,0,WILSHIRE5000,2025
112,2025-06-16,60226.660156,60399.5,59645.230469,59645.230469,0,WILSHIRE5000,2025
113,2025-06-17,59723.570312,60226.660156,59661.53125,60226.660156,0,WILSHIRE5000,2025
114,2025-06-18,59744.488281,60108.558594,59651.929688,59723.570312,0,WILSHIRE5000,2025
115,2025-06-20,59614.179688,60116.371094,59457.75,59744.488281,0,WILSHIRE5000,2025
116,2025-06-23,60174.429688,60206.789062,59321.78125,59614.179688,0,WILSHIRE5000,2025
117,2025-06-24,60870.570312,60964.851562,60174.429688,60174.429688,0,WILSHIRE5000,2025
118,2025-06-25,60801.589844,61020.851562,60707.96875,60870.570312,0,WILSHIRE5000,2025
119,2025-06-26,61334.011719,61380.960938,60801.589844,60801.589844,0,WILSHIRE5000,2025


In [3]:
# ================================================================
# 2.2 Limpieza y enriquecimiento de Bronze
# ================================================================
#   • Para cada ticker se construye el calendario diario completo.
#   • Se imputan valores con forward‑fill y se crea una bandera de calidad.
#   • Se calcula el retorno porcentual diario si existe la columna 'close'.
# ----------------------------------------------------------------

# Rango de fechas overall (desde el primer al último día disponible)
date_range = pd.date_range(
    bronze_df["date"].min(),
    bronze_df["date"].max(),
    freq="D"
)

panel_list = []  # almacenará cada panel de ticker limpio

for tk, grp in bronze_df.groupby("ticker"):
    # ---------------- sub‑bloque por ticker --------------------
    grp = grp.set_index("date").sort_index()

    # calendario completo
    filled = grp.reindex(date_range)

    # Identificar NAs originales (antes de imputación)
    orig_na = filled["close"].isna() if "close" in filled.columns else pd.Series(False, index=filled.index)

    # Forward‑fill de columnas numéricas
    want_numeric = ["open", "high", "low", "close", "adj_close", "volume"]
    numeric_cols = [c for c in want_numeric if c in filled.columns]
    filled[numeric_cols] = filled[numeric_cols].ffill()

    # Forward‑fill de metadatos (ingest_ts, source) para trazabilidad
    for c in ["ingest_ts", "source"]:
        if c in filled.columns:
            filled[c] = filled[c].ffill()

    # Flag de calidad: 'source' si el valor es real, 'ffill' si fue imputado
    filled["data_quality_flag"] = np.where(orig_na, "ffill", "source")

    # Cálculo de retorno diario (% change) si existe 'close'
    if "close" in filled.columns:
        filled["daily_return"] = filled["close"].pct_change().round(6)

    # Añade metadatos y acumula resultado
    filled = (
        filled.reset_index()
              .rename(columns={"index": "date"})
              .assign(ticker=tk,
                      year=lambda x: x["date"].dt.year)
    )
    panel_list.append(filled)


# DataFrame Silver final --------------------------------------
silver_df = (
    pd.concat(panel_list, ignore_index=True)
      .sort_values(["ticker", "date"])
)

silver_df.head()

Unnamed: 0,date,close,high,low,open,volume,ticker,year,data_quality_flag,daily_return
0,1992-01-02,3172.399902,3172.629883,3139.310059,3152.100098,23550000.0,DJIA,1992,source,
1,1992-01-03,3201.5,3210.639893,3165.919922,3172.399902,23620000.0,DJIA,1992,source,0.009173
2,1992-01-04,3201.5,3210.639893,3165.919922,3172.399902,23620000.0,DJIA,1992,ffill,0.0
3,1992-01-05,3201.5,3210.639893,3165.919922,3172.399902,23620000.0,DJIA,1992,ffill,0.0
4,1992-01-06,3200.100098,3213.330078,3191.860107,3201.5,27280000.0,DJIA,1992,source,-0.000437


## 3. Escribir capa Silver (Local y Azure Data Lake Storage)

In [4]:
# ================================================================
# 3.1 Escribir Silver local + subir a ADLS
# ================================================================
#   • Parquet particionado `ticker/year`.
#   • Carga cada archivo al contenedor ADLS en la ruta Silver.
# ------------------------------------------------------------------------------

for (tk, yr), sub in silver_df.groupby(["ticker", "year"]):
    # Guarda archivo temporal local
    tmp_file = f"{LOCAL_TMP}/{tk}_{yr}.parquet"
    sub.to_parquet(tmp_file, index=False, engine="pyarrow")

    # Sube a ADLS (sobrescribe si existe)
    remote = f"{SILVER_PREFIX}/{tk}/year={yr}/{tk}_{yr}.parquet"
    with open(tmp_file, "rb") as f:
        upload_bytes(remote, f.read())       # overwrite=True
    print(f"▲ {remote}  ({len(sub)} filas)")

▲ silver/indices_clean/DJIA/year=1992/DJIA_1992.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=1993/DJIA_1993.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=1994/DJIA_1994.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=1995/DJIA_1995.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=1996/DJIA_1996.parquet  (366 filas)
▲ silver/indices_clean/DJIA/year=1997/DJIA_1997.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=1998/DJIA_1998.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=1999/DJIA_1999.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=2000/DJIA_2000.parquet  (366 filas)
▲ silver/indices_clean/DJIA/year=2001/DJIA_2001.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=2002/DJIA_2002.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=2003/DJIA_2003.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=2004/DJIA_2004.parquet  (366 filas)
▲ silver/indices_clean/DJIA/year=2005/DJIA_2005.parquet  (365 filas)
▲ silver/indices_clean/DJIA/year=2

In [5]:
"""
**No funciona de momento***
# incremental Silver

Incremental Silver – índices bursátiles
Lee Bronze del año vigente, crea panel diario con forward-fill,
y sube a silver/indices_clean/… únicamente las fechas posteriores
a la última ya guardada en Silver.


import os, io, datetime as dt, pandas as pd, numpy as np
from dotenv import load_dotenv
import adlfs
from utils_adls import _client, upload_bytes, CONTAINER

# ─────────────────── 0. Config  ─────────────────────────────────────────
load_dotenv()
ACCOUNT   = os.getenv("AZ_STORAGE_ACCOUNT")
KEY       = os.getenv("AZ_ACCOUNT_KEY")

BRONZE_PREFIX = "market/bronze/indices"        # contenedor + carpeta
SILVER_PREFIX = "market/silver/indices_clean"  # destino Silver
LOCAL_TMP     = "data/silver_tmp"; os.makedirs(LOCAL_TMP, exist_ok=True)

fs  = adlfs.AzureBlobFileSystem(account_name=ACCOUNT, account_key=KEY)
svc = _client()

# ────────────────── 1. Leer Bronze (solo año actual) ────────────────────
year_now = dt.date.today().year
paths = fs.glob(f"{BRONZE_PREFIX}/*/year={year_now}/*.parquet")
if not paths:
    print("⚠️  No hay ficheros Bronze para el año actual"); exit()

open_files = [fs.open(p, "rb") for p in paths]
bronze_df  = pd.concat(pd.read_parquet(f) for f in open_files)

# ────────────────── 2. Construir silver_df (panel diario) ───────────────
date_range = pd.date_range(bronze_df["date"].min(),
                           bronze_df["date"].max(),
                           freq="D")

panel_list = []
for tk, grp in bronze_df.groupby("ticker"):
    grp = grp.set_index("date").sort_index()
    filled = grp.reindex(date_range)

    # ** forward-fill **
    want_num = ["open", "high", "low", "close", "adj_close", "volume"]
    num_cols = [c for c in want_num if c in filled.columns]
    orig_na  = filled["close"].isna() if "close" in filled.columns else pd.Series(False, index=filled.index)

    filled[num_cols] = filled[num_cols].ffill()
    for c in ["ingest_ts", "source"]:
        if c in filled.columns: filled[c] = filled[c].ffill()

    filled["data_quality_flag"] = np.where(orig_na, "ffill", "source")
    if "close" in filled.columns:
        filled["daily_return"] = filled["close"].pct_change().round(6)

    filled = (filled.reset_index()
                     .rename(columns={"index":"date"})
                     .assign(ticker=tk,
                             year=lambda x: x["date"].dt.year))
    panel_list.append(filled)

silver_df = (pd.concat(panel_list)
               .query("year == @year_now")      # aseguramos solo año actual
               .sort_values(["ticker","date"])
               .reset_index(drop=True))

# ────────────────── 3. Filtra solo filas nuevas ────────────────────────
def last_date_silver(ticker:str, year:int):
    file_path = f"{SILVER_PREFIX}/{ticker}/year={year}/{ticker}_{year}.parquet"
    full_path = f"{CONTAINER}/{file_path}"
    if not fs.exists(full_path):
        return None
    with fs.open(full_path, "rb") as f:
        return pd.read_parquet(f, columns=["date"])["date"].max().date()

uploads = 0
for (tk, yr), sub in silver_df.groupby(["ticker","year"]):
    last_dt = last_date_silver(tk, yr)
    if last_dt:
        sub = sub[sub["date"] > pd.to_datetime(last_dt)]
    if sub.empty:
        print(f"{tk} {yr}: sin novedades"); continue

    tmp = f"{LOCAL_TMP}/{tk}_{yr}.parquet"
    sub.to_parquet(tmp, index=False)

    remote = f"{SILVER_PREFIX}/{tk}/year={yr}/{tk}_{yr}.parquet"
    with open(tmp, "rb") as f:
        upload_bytes(remote, f.read())         # overwrite=True
    uploads += len(sub)
    print(f"▲ {tk} {yr}: +{len(sub)} filas cargadas")

print("✅ Incremental Silver terminado – filas nuevas:", uploads)
"""

'\n**No funciona de momento***\n# incremental Silver\n\nIncremental Silver – índices bursátiles\nLee Bronze del año vigente, crea panel diario con forward-fill,\ny sube a silver/indices_clean/… únicamente las fechas posteriores\na la última ya guardada en Silver.\n\n\nimport os, io, datetime as dt, pandas as pd, numpy as np\nfrom dotenv import load_dotenv\nimport adlfs\nfrom utils_adls import _client, upload_bytes, CONTAINER\n\n# ─────────────────── 0. Config  ─────────────────────────────────────────\nload_dotenv()\nACCOUNT   = os.getenv("AZ_STORAGE_ACCOUNT")\nKEY       = os.getenv("AZ_ACCOUNT_KEY")\n\nBRONZE_PREFIX = "market/bronze/indices"        # contenedor + carpeta\nSILVER_PREFIX = "market/silver/indices_clean"  # destino Silver\nLOCAL_TMP     = "data/silver_tmp"; os.makedirs(LOCAL_TMP, exist_ok=True)\n\nfs  = adlfs.AzureBlobFileSystem(account_name=ACCOUNT, account_key=KEY)\nsvc = _client()\n\n# ────────────────── 1. Leer Bronze (solo año actual) ────────────────────\nyear_now =