# Ingesta de datos

In [0]:
# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000"
url_men = "https://www.datos.gov.co/resource/nudc-7mev.csv?$limit=100000"

# Descargar contenido
response_secop = requests.get(url_secop)
response_men = requests.get(url_men)

# Convertir contenido a pandas usando StringIO
df_secop_pd = pd.read_csv(StringIO(response_secop.text))
df_men_pd = pd.read_csv(StringIO(response_men.text))

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)
df_men = spark.createDataFrame(df_men_pd)

# Mostrar en Databricks
display(df_secop)
display(df_men)

In [0]:
df_secop.count()
df_men.count()

In [0]:
# Celda 2: Guardar los DataFrames como tablas Delta
# La función .saveAsTable() guarda los datos y registra la tabla en el Unity Catalog.
# El modo "overwrite" reemplaza la tabla si ya existe, ideal para actualizaciones.
df_secop.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.secop")
df_men.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.men_estadisticas")

print("¡Tablas guardadas exitosamente en el catálogo 'main', esquema 'diplomado_datos'!")

In [0]:
for i in range(10):
    ### 1. revisar si hay datos en la nube que no esten en mi dataset 

    ## DF_revisar 


In [0]:


# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000&$offset=100000"
 
# Descargar contenido
response_secop = requests.get(url_secop)

# Convertir contenido a pandas usando StringIO
df_secop_pd = pd.read_csv(StringIO(response_secop.text), delimiter=',', header=0)

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)

# Mostrar en Databricks
display(df_secop)
 

# Datasets

In [0]:

from pyspark.sql.functions import col


target_schema = spark.table("main.diplomado_datos.secop").schema

df_secop_aligned = df_secop.select(
    [col(field.name).cast(field.dataType) for field in target_schema.fields]
)


df_secop_aligned.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("main.diplomado_datos.secop")

In [0]:
total_registros = 19446266
offset_inicial = 200000
limite = 100000
paginas_faltantes = ((total_registros - offset_inicial) // limite) + 1

print(f"Quedan {paginas_faltantes} bloques por descargar...")
     

In [0]:
from pyspark.sql.functions import col, when, trim, expr
import requests
import pandas as pd
from io import StringIO

for i in range(paginas_faltantes):
    offset = offset_inicial + (i * limite)
    url = f"https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit={limite}&$offset={offset}"
    
    print(f"Descargando página {i+1} con offset {offset}")
    
    response = requests.get(url)
    
    if response.status_code == 200:
        df_secop_pd = pd.read_csv(
            StringIO(response.text),
            dtype=str,
            low_memory=False
        )
        
        if not df_secop_pd.empty:
            df_secop = spark.createDataFrame(df_secop_pd)
            
            target_schema = spark.table("main.diplomado_datos.secop").schema
            
            invalid_values = ["NO DEFINIDO", "NO APLICA", "N/A", "SIN DATO", "NULL", "S/I", "", "NO_REGISTRA"]
            
            # Limpiar TODAS las columnas numéricas según el schema de la Delta table
            for field in target_schema.fields:
                col_name = field.name
                data_type = field.dataType.typeName()
                
                if col_name in df_secop.columns:
                    if data_type in ["bigint", "int"]:
                        df_secop = df_secop.withColumn(
                            col_name,
                            when(
                                (trim(col(col_name)).rlike("^[0-9]+$")) &
                                (~trim(col(col_name)).isin(invalid_values)),
                                col(col_name).cast("bigint")
                            ).otherwise(None)
                        )
                    
                    elif data_type in ["double", "float", "decimal"]:
                        df_secop = df_secop.withColumn(
                            col_name,
                            when(
                                (trim(col(col_name)).rlike("^[0-9]+([.][0-9]+)?$")) &
                                (~trim(col(col_name)).isin(invalid_values)),
                                col(col_name).cast("double")
                            ).otherwise(None)
                        )
                    else:
                        df_secop = df_secop.withColumn(
                            col_name,
                            col(col_name).cast(field.dataType)
                        )
            
            # Alinear DataFrame con schema
            df_secop_aligned = df_secop.select(
                [
                    col(field.name).cast(field.dataType)
                    if field.name in df_secop.columns
                    else expr("NULL").cast(field.dataType).alias(field.name)
                    for field in target_schema.fields
                ]
            )
            
            df_secop_aligned.write.format("delta") \
                .mode("append") \
                .option("mergeSchema", "true") \
                .saveAsTable("main.diplomado_datos.secop")
            
            print(f"✓ Página {i+1} almacenada con {df_secop_pd.shape[0]} filas")
        else:
            print(f"⚠️ Página {i+1} llegó vacía. Posiblemente sin más datos.")
            break
    else:
        print(f"⚠️ Error HTTP {response.status_code} en página {i+1}")
        break