In [None]:
import pandas as pd
import aiohttp
import os
from datetime import datetime
import asyncio
import duckdb


In [2]:
def control_panel():

    # Load metadata
    df_metadata = pd.read_csv(
        "https://estadisticas.bcrp.gob.pe/estadisticas/series/metadata",
        sep=";",
        encoding="latin1"
    )

    # Keep only necessary columns and rename them
    df_control = df_metadata[["Código de serie", "Nombre de serie"]].copy()
    df_control = df_control.rename(columns={
        "Código de serie": "codigo_serie",
        "Nombre de serie": "nombre_serie"
    })

    # Add control columns
    df_control["estado"] = "pendiente"
    df_control["ultima_actualizacion"] = ""
    df_control["observaciones"] = ""

    # Save control panel index
    df_control.to_csv("../data/metadata/control_series_index.csv", sep='|', index=False, encoding="utf-8")

    print(f"Saved control panel with {len(df_control)} series to metadata/control_series_index.csv")

In [16]:
async def fetch_series(session, codigo_serie, formato, inicio, fin, idioma, output_dir):
    url = f"https://estadisticas.bcrp.gob.pe/estadisticas/series/api/{codigo_serie}/{formato}/{inicio}/{fin}/{idioma}"

    try:
        async with session.get(url, timeout=30) as response:
            if response.status == 200:
                json_data = await response.json()
                periods = json_data.get("periods", [])

                if not periods:
                    return codigo_serie, "vacio", "", "Sin datos en API"

                df = pd.DataFrame(periods)

                os.makedirs(output_dir, exist_ok=True)
                output_path = os.path.join(output_dir, f"{codigo_serie}.parquet")
                df.to_parquet(output_path, index=False)

                return codigo_serie, "exito", datetime.utcnow().isoformat(), ""
            else:
                return codigo_serie, "fallo", "", f"HTTP {response.status}"

    except Exception as e:
        return codigo_serie, "fallo", "", str(e)

In [13]:
async def run_batch(df_control, formato, inicio, fin, idioma, output_dir, batch_size=100):
    import time
    from tqdm.notebook import tqdm

    # Keep only pendientes
    df_pending = df_control[df_control["estado"] == "pendiente"].copy()

    total = len(df_pending)
    print(f"Total series pendientes: {total}")
    
    updates = []

    async with aiohttp.ClientSession() as session:
        for i in range(0, total, batch_size):
            batch = df_pending.iloc[i:i+batch_size]
            print(f"\n🔄 Processing batch {i//batch_size + 1} ({i} to {i+len(batch)-1})")

            start = time.time()
            tasks = [
                fetch_series(session, row.codigo_serie, formato, inicio, fin, idioma, output_dir)
                for _, row in batch.iterrows()
            ]
            results = await asyncio.gather(*tasks)

            successes = 0
            failures = 0
            empties = 0

            for codigo, estado, timestamp, obs in results:
                updates.append((codigo, estado, timestamp, obs))

                if estado == "exito":
                    successes += 1
                    print(f"✅ {codigo} descargado.")
                elif estado == "vacio":
                    empties += 1
                    print(f"⚪ {codigo} sin datos.")
                else:
                    failures += 1
                    print(f"❌ {codigo} falló: {obs}")

            elapsed = round(time.time() - start, 2)
            print(f"⏱️ Batch done in {elapsed}s — ✅ {successes} | ⚪ {empties} | ❌ {failures}")

    # Apply updates
    for codigo, estado, timestamp, obs in updates:
        df_control.loc[df_control["codigo_serie"] == codigo, "estado"] = estado
        df_control.loc[df_control["codigo_serie"] == codigo, "ultima_actualizacion"] = timestamp
        df_control.loc[df_control["codigo_serie"] == codigo, "observaciones"] = obs

    df_control.to_csv("../data/metadata/control_series_index.csv", sep="|", index=False)
    print(f"\n✅ Control panel updated with {len(updates)} results.")


In [9]:
control_panel()

Saved control panel with 16418 series to metadata/control_series_index.csv


In [14]:
df_control = pd.read_csv("../data/metadata/control_series_index.csv", sep="|")

In [18]:
await run_batch(
    df_control=df_control,
    formato="json",
    inicio="2000-1",
    fin="2025-12",
    idioma="esp",
    output_dir="../data/bronze/",
    batch_size=100  # or tweak as needed
)


Total series pendientes: 16418

🔄 Processing batch 1 (0 to 99)


  return codigo_serie, "exito", datetime.utcnow().isoformat(), ""


✅ PN00001MM descargado.
✅ PN00002MM descargado.
✅ PN00003MM descargado.
✅ PN00004MM descargado.
✅ PN00005MM descargado.
✅ PN00006MM descargado.
✅ PN00007MM descargado.
✅ PN00008MM descargado.
✅ PN00009MM descargado.
✅ PN00010MM descargado.
✅ PN00011MM descargado.
✅ PN00012MM descargado.
✅ PN00013MM descargado.
✅ PN00014MM descargado.
✅ PN00015MM descargado.
✅ PN00016MM descargado.
✅ PN00021MM descargado.
✅ PN00023MM descargado.
✅ PN00025MM descargado.
✅ PN00026MM descargado.
✅ PN00030MM descargado.
✅ PN00031MM descargado.
✅ PN00032MM descargado.
✅ PN00033MM descargado.
✅ PN00034MM descargado.
✅ PN00039MM descargado.
✅ PN00040MM descargado.
✅ PN00041MM descargado.
✅ PN00042MM descargado.
✅ PN00043MM descargado.
✅ PN00044MM descargado.
✅ PN00045MM descargado.
✅ PN00046MM descargado.
✅ PN00050MM descargado.
✅ PN00051MM descargado.
✅ PN00052MM descargado.
✅ PN00053MM descargado.
✅ PN00054MM descargado.
✅ PN00058MM descargado.
✅ PN00059MM descargado.
✅ PN00060MM descargado.
✅ PN00061MM desc

  df_control.loc[df_control["codigo_serie"] == codigo, "ultima_actualizacion"] = timestamp
  df_control.loc[df_control["codigo_serie"] == codigo, "observaciones"] = obs



✅ Control panel updated with 16418 results.


In [24]:
# Read all 16k Parquet files and write into one
duckdb.sql("""
    COPY (
        SELECT * FROM read_parquet('../data/bronze/*.parquet')
    ) TO '../data/silver/series_all.parquet' (FORMAT PARQUET);
""")