#Cuaderno de ingesta de datos
En este bloque traeremos la información desde datos abiertos. 

In [0]:
# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000"
url_men = "https://www.datos.gov.co/resource/nudc-7mev.csv?$limit=100000"
 

# Descargar contenido
response_secop = requests.get(url_secop)
response_men = requests.get(url_men)

# Convertir contenido a pandas usando StringIO
df_secop_pd = pd.read_csv(StringIO(response_secop.text))
df_men_pd = pd.read_csv(StringIO(response_men.text))

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)
df_men = spark.createDataFrame(df_men_pd)

# Mostrar en Databricks
display(df_secop)
display(df_men)
     

In [0]:
df_secop.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.secop")

df_men.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.men_estadisticas")


In [0]:
# Celda 2: Guardar los DataFrames como tablas Delta
# La función .saveAsTable() guarda los datos y registra la tabla en el Unity Catalog.
# El modo "overwrite" reemplaza la tabla si ya existe, ideal para actualizaciones.
df_secop.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.secop")
df_men.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.men_estadisticas")

print("¡Tablas guardadas exitosamente en el catálogo")

### Cargue Datasets Secop

In [0]:
# Forzar la columna conflictiva a tipo str antes de pasarlo a Spark
df_pd["nit_de_la_entidad"] = df_pd["nit_de_la_entidad"].astype(str)


In [0]:
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Crear sesión Spark si no existe
spark = SparkSession.builder.getOrCreate()

# Leer esquema base de la tabla Delta destino
target_schema = spark.table("main.diplomado_datos.secop").schema

# Parámetros para la paginación
base_url = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000&$offset={offset}"
offset = 0
chunk_size = 100000
max_rows = 19450000  # Número estimado de registros
chunk_number = 0

while offset < max_rows:
    print(f"📦 Descargando bloque desde offset {offset}")

    # Construir URL para el bloque
    url = base_url.format(offset=offset)

    # Descargar el contenido
    response = requests.get(url)
    if response.status_code != 200:
        print(f"❌ Error al descargar datos en offset {offset}")
        break

    # Leer CSV con pandas y forzar a string para evitar errores de conversión
    df_pd = pd.read_csv(StringIO(response.text), delimiter=',', header=0, low_memory=False)
    df_pd = df_pd.astype(str)

    # Verificar si está vacío
    if df_pd.empty:
        print("✅ Ingesta completa")
        break

    # Convertir a Spark y alinear con el esquema objetivo
    df_spark = spark.createDataFrame(df_pd)
    df_spark_aligned = df_spark.select([col(f.name).try_cast(f.dataType) for f in target_schema.fields])

    # Guardar en Delta Lake en modo append
    df_spark_aligned.write.format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable("main.diplomado_datos.secop")

    print(f"✅ Bloque {chunk_number} ingresado con éxito")

    # Actualizar offset
    offset += chunk_size
    chunk_number += 1

In [0]:
# Contar registros ya cargados
df_secop = spark.table("main.diplomado_datos.secop")
conteo_total = df_secop.count()

# Calcular offset siguiente
next_offset = (conteo_total // 100000) * 100000
print(f"👉 Puedes continuar desde offset = {next_offset}")


In [0]:
from io import StringIO
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Crear sesión Spark si no existe
spark = SparkSession.builder.getOrCreate()

# Leer esquema base de la tabla Delta destino
target_schema = spark.table("main.diplomado_datos.secop").schema

# Parámetros para la paginación
base_url = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000&$offset={offset}"
offset = 16900000 
chunk_size = 100000
max_rows = 19549032  # Total datos
chunk_number = offset // chunk_size

while offset < max_rows:
    print(f"📦 Descargando bloque {chunk_number} desde offset {offset}")
    
    try:
        url = base_url.format(offset=offset)
        response = requests.get(url, timeout=30)  # timeout explícito
        if response.status_code != 200:
            print(f"❌ Error al descargar datos en offset {offset}: HTTP {response.status_code}")
            break

        # Leer CSV como texto plano con pandas
        df_pd = pd.read_csv(StringIO(response.text), delimiter=',', header=0, low_memory=False)
        df_pd = df_pd.astype(str)  # Forzar todo a string

        if df_pd.empty:
            print("✅ Ingesta finalizada: No hay más registros disponibles.")
            break

        # Convertir a Spark DataFrame
        df_spark = spark.createDataFrame(df_pd)

        # Aplicar try_cast para que errores de conversión no detengan el proceso
        df_spark_aligned = df_spark.select([
            expr(f"try_cast(`{f.name}` as {f.dataType.simpleString()})").alias(f.name)
            for f in target_schema.fields
        ])

        # Guardar en Delta Lake
        df_spark_aligned.write.format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .saveAsTable("main.diplomado_datos.secop")

        print(f"✅ Bloque {chunk_number} ingresado con éxito ({len(df_pd)} filas)")
        
        offset += chunk_size
        chunk_number += 1

    except Exception as e:
        print(f"❌ Error inesperado en offset {offset}: {e}")
        break


In [0]:
df_secop = spark.table("main.diplomado_datos.secop")
total_registros = df_secop.count()

print(f"🔎 Total de registros en 'main.diplomado_datos.secop': {total_registros:,}")

In [0]:
df_secop = spark.table("main.diplomado_datos.secop")

In [0]:
display(df_secop.limit(15))
