In [0]:
# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000"
 

# Descargar contenido
response_secop = requests.get(url_secop)

# Convertir contenido a pandas usando StringIO
df_secop_pd = pd.read_csv(StringIO(response_secop.text))

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)

# Mostrar en Databricks
display(df_secop)

In [0]:
# Databricks notebook – Ingesta SECOP II (~19,4 M filas) 
# ============================================================================
# 1️⃣  IMPORTS Y SESIÓN SPARK
import requests, time, math, pandas as pd
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = SparkSession.builder.getOrCreate()

# ============================================================================
# 2️⃣  PARÁMETROS
BASE_URL      = "https://www.datos.gov.co/resource/rpmr-utcd.csv"
CHUNK_SIZE    = 100_000           # filas por bloque
TOTAL_ROWS    = 19_400_000        # aprox. – solo para progreso
MAX_RETRIES   = 3                 # reintentos de red
SLEEP_SEC     = 1                 # pausa entre llamadas
TARGET_TABLE  = "main.diplomado.secop"
START_OFFSET  = 0                 # pon >0 si reanudas
# spark.conf.set("spark.sql.ansi.enabled", "false")  # alternativa rápida

# ============================================================================
# 3️⃣  ESQUEMA DEL DESTINO
target_schema = spark.table(TARGET_TABLE).schema

# ============================================================================
# 4️⃣  FUNCIÓN TRY_CAST SEGURA
def safe_cast(col_name: str, spark_dtype: str):
    return expr(f"try_cast(`{col_name}` AS {spark_dtype})")

# ============================================================================
# 5️⃣  BUCLE PRINCIPAL
n_loops = math.ceil(TOTAL_ROWS / CHUNK_SIZE)
offset  = START_OFFSET
bloc    = (offset // CHUNK_SIZE) + 1

while True:
    url = f"{BASE_URL}?$limit={CHUNK_SIZE}&$offset={offset}"

    # ---------- Descarga con reintentos ----------
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            resp = requests.get(url, timeout=60)
            resp.raise_for_status()
            break
        except Exception as e:
            if attempt == MAX_RETRIES:
                raise RuntimeError(f"Fallo tras {MAX_RETRIES} intentos en offset {offset}: {e}")
            time.sleep(2 * attempt)
    # ---------------------------------------------

    pdf = pd.read_csv(StringIO(resp.text), dtype=str)
    if pdf.empty:
        print(f"Fin de datos en offset {offset:,}. ¡Carga completa!")
        break

    # ---------- Pandas → Spark ----------
    sdf_raw = spark.createDataFrame(pdf)

    # ---------- Alineación + try_cast ----------
    sdf = sdf_raw.select(
        [
            (
                safe_cast(field.name, field.dataType.simpleString())
                if field.name in sdf_raw.columns
                else expr("NULL").cast(field.dataType)
            ).alias(field.name)
            for field in target_schema.fields
        ]
    )

    # ---------- Escritura Delta ----------
    write_mode = "overwrite" if offset == 0 else "append"
    (sdf.write
        .format("delta")
        .mode(write_mode)
        .option("mergeSchema", "true")
        .saveAsTable(TARGET_TABLE)
    )

    print(f"✅ Bloque {bloc}/{n_loops} cargado ({len(pdf):,} filas, offset {offset:,}, modo {write_mode}).")

    # Preparar siguiente iteración
    offset += CHUNK_SIZE
    bloc   += 1
    time.sleep(SLEEP_SEC)