In [0]:
# Lista de variables(principales y las faltantes)

variables_principales = ["documento_proveedor", "estado_del_proceso", "numero_de_proceso", "nit_de_la_entidad", "numero_del_contrato"]
variables_complementarias = ["departamento_entidad", "nombre_de_la_entidad", "fecha_de_firma_del_contrato", "objeto_a_contratar", 
                            "url_contrato", "municipio_entidad", "valor_contrato", "tipo_documento_proveedor", 
                            "modalidad_de_contrataci_n", "codigo_entidad_en_secop", "origen", "nivel_entidad", 
                            "fecha_inicio_ejecuci_n", "fecha_fin_ejecuci_n", "nombre_del_contratista", 
                            "tipo_de_contrato", "objeto_del_proceso", "forma_de_pago"]

# Unificación para consulta
variables_totales = variables_principales + variables_complementarias
columnas_sql = ", ".join(variables_totales)

In [0]:
from sodapy import Socrata
from pyspark.sql import SparkSession
import time

# Inicialización de SparkSession y cliente Socrata
spark = SparkSession.builder.getOrCreate()
cliente_secop = Socrata("www.datos.gov.co", app_token)

# Parámetros para paginar la consulta
tamano_lote = 50000
avance = 0
modo_escritura = "overwrite"
intentos_maximos = 4

# Descarga y carga por bloques
while True:
    errores = 0
    while errores < intentos_maximos:
        try:
            consulta = f"SELECT numero_del_contrato, numero_de_proceso, nit_de_la_entidad, documento_proveedor, estado_del_proceso LIMIT {tamano_lote} OFFSET {avance}"
            print(f"Cargando registros desde offset: {avance}")
            respuesta = cliente_secop.get(codigo_dataset, query=consulta)

            if not respuesta:
                print(" Fin de la extracción. No se encontraron más registros.")
                break

            df_parcial = spark.createDataFrame(respuesta)
            df_parcial.write \
                .format("delta") \
                .mode(modo_escritura) \
                .option("overwriteSchema", "true") \
                .saveAsTable("main.diplomado_datos.tabla_restante_secop")

            print(f"Bloque de {len(respuesta)} registros cargado correctamente.")

            avance += tamano_lote
            modo_escritura = "append"
            break
        except Exception as error:
            errores += 1
            print(f"Error en intento {errores}: {error}")
            time.sleep(10)
    else:
        print(" Se agotaron los intentos en este bloque. Terminando proceso.")
        break

print("Todos los datos fueron cargados en la tabla.")


In [0]:
# Agregar una columna hash que combine los campos clave incluyendo estado_del_proceso
col_combinada = concat_ws("||",
    col("numero_del_contrato"),
    col("nit_de_la_entidad"),
    col("documento_proveedor"),
    col("estado_del_proceso").
    col("numero_de_proceso")
)

tabla_identificada = tabla_tmp_faltantes.withColumn("identificador_hash_estado", sha2(col_combinada, 256))

In [0]:
from pyspark.sql.functions import sha2, concat_ws, col

# Traer tabla con registros faltantes
restantes_df = spark.table("main.diplomado_datos.tabla_restante_secop")

# Generar clave hash combinando campos relevantes + estado
restantes_df = restantes_df.withColumn("id_unico_con_estado", sha2(concat_ws("||",col("documento_proveedor"),col("estado_del_proceso"),col("numero_de_proceso"),col("nit_de_la_entidad"),col("numero_del_contrato")),256))

# Verificar los resultados generados
restantes_df.select("id_unico_con_estado").show(5)
     

In [0]:
# Identificar columnas que no están repetidas (salvo la llave hash)
columnas_exclusivas = [campo for campo in restantes_df.columns if campo not in df_ids.columns or campo == "id_unico_con_estado"]

# Crear nuevo DataFrame solo con columnas únicas
df_limpio_restante = restantes_df.select(columnas_exclusivas)

# Mostrar lista final de columnas incluidas
print(f"Variables finales incluidas: {df_limpio_restante.columns}")

In [0]:

# Combinar los datos a través de la llave hash generada previamente
df_completo_secop = df_ids.join(
    df_limpio_restante.dropDuplicates(["id_unico_con_estado"]),
    on="id_unico_con_estado",
    how="left"
)

In [0]:
# Guardar el DataFrame consolidado como tabla Delta
df_completo_secop.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("main.diplomado_datos.tabla_final_SECOP")