In [0]:
#!/usr/bin/env python
import pandas as pd
from sodapy import Socrata

#client = Socrata("www.datos.gov.co", None)

token = dbutils.secrets.get("claves","token_app")
codigo_dataset = dbutils.widgets.get("codigo_dataset")
# Example authenticated client (needed for non-public datasets):
client = Socrata("www.datos.gov.co", str(token), timeout=100)

In [0]:
import time
# 3. Parámetros de paginación
limit = 500000
offset = 0
write_mode = "overwrite"
reintentos=5

print(f"Iniciando carga por lotes para el dataset: {codigo_dataset}")

# 4. Bucle para obtener y cargar los datos por lotes
while True:
    intentos = 0

    while intentos < reintentos:
        try:
            print(f"Obteniendo y cargando lote {offset}...")
            # Construye y ejecuta la consulta para el lote actual
            query = f"SELECT numero_del_contrato, numero_de_proceso, nit_de_la_entidad, documento_proveedor, estado_del_proceso LIMIT {limit} OFFSET {offset}"
            results = client.get(codigo_dataset, query=query) 

            # Si la API no devuelve más registros, se termina el bucle
            if not results:
                print("Carga de datos finalizada.")
                break

            # Convierte el lote a un DataFrame de Spark y lo escribe en la tabla Delta
            spark.createDataFrame(results).write \
                .format("delta") \
                .mode(write_mode) \
                .option("overwriteSchema", "true") \
                .saveAsTable("main.diplomado_datos.ids_contratos_procesos")

            print(f"Lote de {len(results)} registros desde offset {offset} cargado.")

            # Se cambia a modo 'append' para las siguientes iteraciones y se incrementa el offset
            write_mode = "append"
            offset += limit
            break

        except Exception as e:
            intentos +=1
            print("Error al obtener o cargar el lote:", e)
            print(f"Intento {intentos} de {reintentos}...")
            time.sleep(20)
    else:
        print("Se alcanzó el número máximo de intentos. Terminando la carga.")
        break

In [0]:
df_secop_id=spark.table("main.diplomado_datos.ids_contratos_procesos")
df_secop_id.count()

In [0]:

# Importar las funciones necesarias de PySpark
from pyspark.sql.functions import sha2, concat_ws, col

# 1. Cargar la tabla correcta desde el catálogo a un DataFrame
# Se asume que esta es la tabla que contiene las columnas que mencionaste.
df_secop_id = spark.table("main.diplomado_datos.ids_contratos_procesos")

# 2. Definir la lista corregida de columnas para el identificador único
columnas_para_hash = [
    "numero_del_contrato",
    "numero_de_proceso",
    "nit_de_la_entidad",
    "documento_proveedor",
    "estado_del_proceso"
]

In [0]:
# 3. Añadir la nueva columna 'id_unico'
# Se concatenan las columnas clave con un separador y se les aplica un hash SHA-2.
df_con_id = df_secop_id.withColumn(
    "id_unico_con_estado",
    sha2(concat_ws("||", *[col(c) for c in columnas_para_hash]), 256)
)

columnas_para_hash_se = [
    "numero_del_contrato",
    "numero_de_proceso",
    "nit_de_la_entidad",
    "documento_proveedor"
]
df_con_id = df_con_id.withColumn(
    "id_unico_sin_estado",
    sha2(concat_ws("||", *[col(c) for c in columnas_para_hash_se]), 256)
)
df_con_id.display()

In [0]:

df_con_id.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("main.diplomado_datos.secop_id_bronze")

In [0]:
%sql
-- Explorar dataset
SELECT * FROM main.diplomado_datos.secop_id_bronze LIMIT 10;

## Trabajo en clase

In [0]:
# Columnas clave iniciales
columnas_clave = [
    "numero_del_contrato",
    "numero_de_proceso",
    "nit_de_la_entidad",
    "documento_proveedor",
    "estado_del_proceso"
]

# lo que faltar
columnas_faltantes = [
    "nivel_entidad",
    "codigo_entidad_en_secop",
    "nombre_de_la_entidad",
    "departamento_entidad",
    "municipio_entidad",
    "modalidad_de_contrataci_n",
    "objeto_a_contratar",
    "objeto_del_proceso",
    "tipo_de_contrato",
    "fecha_de_firma_del_contrato",
    "fecha_inicio_ejecuci_n",
    "fecha_fin_ejecuci_n",
    "valor_contrato",
    "nom_raz_social_contratista",
    "url_contrato",
    "origen",
    "tipo_documento_proveedor"
]

columnas_total = columnas_clave + columnas_faltantes

query_cols = ", ".join(columnas_total)

In [0]:
### traer por lotes

limit = 500000
offset = 0
write_mode = "overwrite"
reintentos = 5

while True:
    intentos = 0
    
    while intentos < reintentos:
        try:
            print(f"Obteniendo y cargando lote con offset {offset} ...")

            # Construye la query
            query = f"SELECT {query_cols} LIMIT {limit} OFFSET {offset}"
            
            results = client.get(codigo_dataset, query=query)

            if not results:
                print("✅ No hay más registros que descargar.")
                break

            # Crea Spark DataFrame
            df_restante = spark.createDataFrame(results)

            # Guarda en Delta (en tabla intermedia)
            df_restante.write \
                .format("delta") \
                .mode(write_mode) \
                .option("overwriteSchema", "true") \
                .saveAsTable("main.diplomado_datos.secop_restante_tmp")

            print(f"✓ Lote de {len(results)} registros cargado.")

            # A partir del segundo lote, usa append
            write_mode = "append"
            offset += limit
            break

        except Exception as e:
            intentos += 1
            print(f"Error en intento {intentos}: {e}")
            time.sleep(10)
    else:
        print("⚠️ Se alcanzó el número máximo de reintentos.")
        break

In [0]:
# tabla restante
df_restante = spark.table("main.diplomado_datos.secop_restante_tmp")

# id_unico_con_estado
df_restante = df_restante.withColumn(
    "id_unico_con_estado",
    sha2(
        concat_ws("||",
                  col("numero_del_contrato"),
                  col("numero_de_proceso"),
                  col("nit_de_la_entidad"),
                  col("documento_proveedor"),
                  col("estado_del_proceso")
        ), 256
    )
)

df_restante.select("id_unico_con_estado").show(5)

In [0]:
# columnas unicas (excluyendo duplicadas)
columnas_restante = [
    c for c in df_restante.columns
    if c not in df_ids.columns or c == "id_unico_con_estado"
]

df_restante_reduced = df_restante.select(columnas_restante)

print(f"Columnas seleccionadas en df_restante_reduced: {df_restante_reduced.columns}")

In [0]:

# join usando el id único
df_final = df_ids.join(
    df_restante_reduced.dropDuplicates(["id_unico_con_estado"]),
    on="id_unico_con_estado",
    how="left"
)

In [0]:
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("main.diplomado_datos.secop_completo")

In [0]:
%sql
SELECT COUNT(*) AS total_filas
FROM main.diplomado_datos.secop_completo;

In [0]:
%sql
DESCRIBE main.diplomado_datos.secop_completo;

In [0]:

%sql
SELECT *
FROM main.diplomado_datos.secop_completo
LIMIT 10;
