# Cuaderno ingesta de datos 

En este cuaderno traeremos la informaciòn de datos abiertos

In [0]:
# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv"
url_men = "https://www.datos.gov.co/resource/nudc-7mev.csv"

# Descargar contenido
response_secop = requests.get(url_secop)
response_men = requests.get(url_men)

# Convertir contenido a pandas usando StringIO
df_secop_pd = pd.read_csv(StringIO(response_secop.text))
df_men_pd = pd.read_csv(StringIO(response_men.text))

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)
df_men = spark.createDataFrame(df_men_pd)

# Mostrar en Databricks
display(df_secop)
display(df_men)
 
 

In [0]:
display(df_secop_pd.head())


In [0]:
display(df_men_pd.head())

In [0]:

# Celda 2: Guardar los DataFrames como tablas Delta
# La función .saveAsTable() guarda los datos y registra la tabla en el Unity Catalog.
# El modo "overwrite" reemplaza la tabla si ya existe, ideal para actualizaciones.
df_secop.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.secop")
df_men.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.men_estadisticas")

print("¡Tablas guardadas exitosamente en el catálogo 'main', esquema 'diplomado_datos'!")

In [0]:
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Inicializar Spark
spark = SparkSession.builder.getOrCreate()

# Configuración
base_url = "https://www.datos.gov.co/resource/rpmr-utcd.csv"
limit = 100000
total_registros = 16000000
max_iter = total_registros // limit

# Obtener esquema de la tabla destino
target_schema = spark.table("main.diplomado_datos.secop").schema

# Descarga por lotes
for i in range(max_iter):
    offset = i * limit
    print(f"⏳ Iteración {i+1}/{max_iter} — Offset: {offset}")

    url = f"{base_url}?$limit={limit}&$offset={offset}"
    response = requests.get(url)

    if response.status_code != 200:
        print(f"❌ Error al descargar en offset {offset}. Código: {response.status_code}")
        break

    if len(response.text.strip()) == 0:
        print("✅ No hay más datos disponibles.")
        break

    # ✅ Corrección aquí: forzar todas las columnas a tipo texto
    df_pd = pd.read_csv(StringIO(response.text), delimiter=",", header=0, dtype=str, low_memory=False)

    if df_pd.empty:
        print("✅ Descarga terminada: datos vacíos.")
        break

    # Convertir a Spark y alinear tipos con el esquema destino
    df_spark = spark.createDataFrame(df_pd)
    df_aligned = df_spark.select(
        [col(field.name).cast(field.dataType) for field in target_schema.fields if field.name in df_spark.columns]
    )

    # Guardar lote en tabla Delta
    df_aligned.write.format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable("main.diplomado_datos.secop")

    print(f"✅ Guardados {df_pd.shape[0]} registros. Total acumulado: {(i+1)*limit}")

print("🎉 Proceso completado.")


In [0]:
# Contar registros en la tabla Delta
df_secop = spark.table("main.diplomado_datos.secop")
total_registros = df_secop.count()

print(f"🔎 Total de registros en 'main.diplomado_datos.secop': {total_registros:,}")


In [0]:
# Mostrar primeras filas
display(df_secop.limit(10))
