In [0]:
#!/usr/bin/env python
# Databricks / PySpark + Delta Lake
# pip install sodapy

from sodapy import Socrata
from delta.tables import DeltaTable
from pyspark.sql import functions as F
import json, os


In [0]:
### ─────────────────────────────────────────────
### 1. Parámetros y utilidades
### ─────────────────────────────────────────────
TOKEN          = dbutils.secrets.get("claves", "token_app")
DATASET_ID     = dbutils.widgets.get("codigo_dataset")                 # p.e. "4k7j-ux6u"
DELTA_TABLE    = "main.diplomado.ids_contratos_procesos"
BATCH_SIZE     = 50_000                                                # máximo seguro para SODA 2.0
CTRL_PATH      = "/dbfs/FileStore/tmp/secop_offset.json"               # para reanudar

client = Socrata("www.datos.gov.co", TOKEN, timeout=60)

In [0]:
# Leer último offset si existe
if os.path.exists(CTRL_PATH):
    with open(CTRL_PATH) as fh:
        offset = json.load(fh).get("offset", 0)
else:
    offset = 0

keys = ["numero_del_contrato",
        "numero_de_proceso",
        "nit_de_la_entidad",
        "documento_proveedor",
        "estado_del_proceso"]

In [0]:
from pyspark.sql.utils import AnalysisException
from delta.tables import DeltaTable

def delta_exists(tbl_name: str) -> bool:
    """
    Comprueba si una tabla Delta existe sin tocar la JVM
    (funciona en Databricks Serverless).
    """
    try:
        _ = DeltaTable.forName(spark, tbl_name)
        return True
    except AnalysisException:
        return False


In [0]:
# ── Antes (falla en Serverless)
# if not spark._jsparkSession.catalog().tableExists(DELTA_TABLE):

# ── Después
if not delta_exists(DELTA_TABLE):
    (spark
       .createDataFrame([], """
            numero_del_contrato  STRING,
            numero_de_proceso    STRING,
            nit_de_la_entidad    STRING,
            documento_proveedor  STRING,
            estado_del_proceso   STRING
        """)
       .write.format("delta")
       .mode("overwrite")
       .saveAsTable(DELTA_TABLE)
    )

# Ya podemos abrirla con la API Delta estándar
delta_tbl = DeltaTable.forName(spark, DELTA_TABLE)


In [0]:
# ────────────────────────────────────────────────────────────────
# 3️⃣  BUCLE DE PAGINACIÓN, UPSERT Y CONTROL DE OFFSET
#     (Serverless-friendly, con retries y back-off)
# ────────────────────────────────────────────────────────────────
#
# • Requiere que en el paso 1 ya tengas:
#     - client         → Socrata(domain, TOKEN, timeout=180)
#     - DATASET_ID     → ID del dataset (widget o string)
#     - BATCH_SIZE     → ej. 20_000  (≤ 50 000)
#     - keys           → lista de las 5 columnas clave
# • Requiere que en el paso 2 ya tengas:
#     - delta_tbl      → DeltaTable destino con esas mismas columnas
# • Este bloque
#     1. crea (si no existe) la tabla de offsets
#     2. lee el último offset
#     3. descarga lotes con retries y back-off
#     4. hace MERGE (upsert) evitando duplicados
#     5. actualiza el offset después de cada lote
# ────────────────────────────────────────────────────────────────

import time
from requests.exceptions import ReadTimeout, ConnectionError
from pyspark.sql.utils import AnalysisException

# --------------------- 3.A  tabla de control --------------------
spark.sql("""
CREATE TABLE IF NOT EXISTS main.diplomado.sec_offsets (
    dataset_id STRING COMMENT 'Socrata dataset ID',
    offset     BIGINT  COMMENT 'Último offset cargado',
    updated_at TIMESTAMP
) USING DELTA
TBLPROPERTIES (delta.minReaderVersion='2', delta.minWriterVersion='5')
""")

def get_offset(ds_id: str) -> int:
    """Devuelve el último offset registrado, o 0 si no existe."""
    row = (spark.sql(f"""
            SELECT offset
            FROM   main.diplomado.sec_offsets
            WHERE  dataset_id = '{ds_id}'
            ORDER  BY updated_at DESC
            LIMIT  1""")
           .collect())
    return row[0].offset if row else 0

def set_offset(ds_id: str, off: int) -> None:
    """Inserta o actualiza el offset del dataset."""
    spark.sql(f"""
        MERGE INTO main.diplomado.sec_offsets AS tgt
        USING (SELECT '{ds_id}' AS dataset_id,
                      {off}     AS offset,
                      current_timestamp() AS updated_at) src
        ON tgt.dataset_id = src.dataset_id
        WHEN MATCHED THEN UPDATE SET offset = src.offset,
                                     updated_at = src.updated_at
        WHEN NOT MATCHED THEN INSERT *
    """)

# --------------------- 3.B  parámetros de retries --------------
MAX_RETRIES  = 5          # nº de intentos por lote
BACKOFF_SECS = 5          # pausa base (aumenta: 5s,10s,15s,…)

# --------------------- 3.C  arranque: offset --------------------
offset      = get_offset(DATASET_ID)
merge_cond  = " AND ".join([f"dest.{c}=src.{c}" for c in keys])

# --------------------- 3.D  bucle de paginación -----------------
while True:
    soql = (f"select {', '.join(keys)} "
            f"order by :id "
            f"limit {BATCH_SIZE} offset {offset}")

    # 3.D-1  Descargar lote con retries
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            batch = client.get(DATASET_ID, query=soql)
            break                                      # ✔ éxito
        except (ReadTimeout, ConnectionError) as e:
            wait = BACKOFF_SECS * attempt              # 5s,10s,…
            print(f"[retry {attempt}/{MAX_RETRIES}] offset {offset:,} – "
                  f"{type(e).__name__}: {e}; reintentando en {wait}s")
            time.sleep(wait)
    else:
        raise RuntimeError(f"❌ agotados {MAX_RETRIES} retries en offset {offset:,}")

    if not batch:          # lote vacío → fin
        break

    batch_df = spark.createDataFrame(batch)

    # 3.D-2  UPSERT (MERGE) para evitar duplicados
    (delta_tbl.alias("dest")
              .merge(batch_df.alias("src"), merge_cond)
              .whenNotMatchedInsertAll()
              .execute())

    # 3.D-3  Avanza offset y lo persiste
    offset += BATCH_SIZE
    set_offset(DATASET_ID, offset)

print(f"✅ Descarga completa. Offset final procesado: {offset:,}")
