In [1]:
from pyspark.sql import SparkSession
import time

# Creamos la sesión conectada al Master
spark = SparkSession.builder \
    .appName("Prueba_Tour_Spark") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

print("¡Sesión Iniciada! Revisa el Master UI.")

# Dormimos el programa 5 minutos para que te de tiempo de ver las otras webs
time.sleep(300)
 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/17 00:30:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


¡Sesión Iniciada! Revisa el Master UI.


26/01/17 00:30:25 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
# %% [markdown]
# # 1. Ingesta (Capa Bronce)
# Convertimos CSV crudo a formato Delta Lake.

# %%
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from delta import *
import os

# URL del Master (definida en docker-compose)
master_url = "spark://spark-master:7077"

# Configuración: Añadimos Delta Lake
builder = SparkSession.builder \
    .appName("Lab_SECOP_Bronze") \
    .master(master_url) \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executor.memory", "1g") 

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# %%
# LECTURA CSV
print("Leyendo CSV crudo...")
df_raw = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("inferSchema", "true") \
    .load("../data/SECOP_II_Contratos_Electronicos.csv")




Leyendo CSV crudo...


In [5]:
df_raw.show()

[Stage 4:>                                                          (0 + 1) / 1]

+--------------+-----------+------------+------+------+-----------------------+----------------+-------------------------+---------------------------------------+--------------+----------------------------+-------------------------+-----------+-----------+------------+
|       Entidad|Nit Entidad|Departamento|Ciudad|Estado|Descripcion del Proceso|Tipo de Contrato|Modalidad de Contratacion|Justificacion Modalidad de Contratacion|Fecha de Firma|Fecha de Inicio del Contrato|Fecha de Fin del Contrato|Precio Base|Valor Total|Valor Pagado|
+--------------+-----------+------------+------+------+-----------------------+----------------+-------------------------+---------------------------------------+--------------+----------------------------+-------------------------+-----------+-----------+------------+
|   ALCALDIA 90|       NULL|     BOLIVAR|  NULL|  NULL|                   NULL|            NULL|                     NULL|                                   NULL|    2023-02-28|             

                                                                                

In [7]:
nuevas_columnas= [

    c.strip().replace(" ", "_").replace("(", "").replace(")", "").lower() 
for c in df_raw.columns

]


# Aplicamos los nuevos nombres al DataFrame
df_raw = df_raw.toDF(*nuevas_columnas)
 
print("Nuevas columnas:", df_raw.columns)

Nuevas columnas: ['entidad', 'nit_entidad', 'departamento', 'ciudad', 'estado', 'descripcion_del_proceso', 'tipo_de_contrato', 'modalidad_de_contratacion', 'justificacion_modalidad_de_contratacion', 'fecha_de_firma', 'fecha_de_inicio_del_contrato', 'fecha_de_fin_del_contrato', 'precio_base', 'valor_total', 'valor_pagado']


In [8]:
# %%
# ESCRITURA BRONCE (Delta)
print("Escribiendo en capa Bronce...")
output_path = "../data/lakehouse/bronze/secop"
df_raw.write.format("delta").mode("overwrite").save(output_path)

print(f"Ingesta completada. Registros procesados: {df_raw.count()}")

Escribiendo en capa Bronce...


26/01/17 00:59:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Ingesta completada. Registros procesados: 1000


In [14]:
delta_secop = "/app/data/lakehouse/bronze/secop"
spark.sql(f"DESCRIBE HISTORY delta.`{delta_secop}`").show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2026-01-17 00:59:...|  NULL|    NULL|    WRITE|{mode -> Overwrit...|NULL|    NULL|     NULL|       NULL|  Serializable|        false|{numFiles -> 1, n...|        NULL|Apache-Spark/3.5....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+



Transformación: (Ej: filter, select) Spark solo planea. Es instantáneo.
Acción: (Ej: show, count, save) Aquí es donde Spark realmente trabaja.
Por qué importa: Si tu código da error, a menudo el error salta en la última línea (show), aunque el problema esté al principio.

In [15]:
df_raw.printSchema()

root
 |-- entidad: string (nullable = true)
 |-- nit_entidad: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- descripcion_del_proceso: string (nullable = true)
 |-- tipo_de_contrato: string (nullable = true)
 |-- modalidad_de_contratacion: string (nullable = true)
 |-- justificacion_modalidad_de_contratacion: string (nullable = true)
 |-- fecha_de_firma: date (nullable = true)
 |-- fecha_de_inicio_del_contrato: string (nullable = true)
 |-- fecha_de_fin_del_contrato: string (nullable = true)
 |-- precio_base: long (nullable = true)
 |-- valor_total: string (nullable = true)
 |-- valor_pagado: string (nullable = true)



In [17]:
df_raw.show(5,vertical=True)

-RECORD 0-------------------------------------------------
 entidad                                 | ALCALDIA 90    
 nit_entidad                             | NULL           
 departamento                            | BOLIVAR        
 ciudad                                  | NULL           
 estado                                  | NULL           
 descripcion_del_proceso                 | NULL           
 tipo_de_contrato                        | NULL           
 modalidad_de_contratacion               | NULL           
 justificacion_modalidad_de_contratacion | NULL           
 fecha_de_firma                          | 2023-02-28     
 fecha_de_inicio_del_contrato            | NULL           
 fecha_de_fin_del_contrato               | NULL           
 precio_base                             | 3582652        
 valor_total                             | NULL           
 valor_pagado                            | NULL           
-RECORD 1-----------------------------------------------

In [18]:
df_raw.count()

1000

In [20]:
df_raw.columns

['entidad',
 'nit_entidad',
 'departamento',
 'ciudad',
 'estado',
 'descripcion_del_proceso',
 'tipo_de_contrato',
 'modalidad_de_contratacion',
 'justificacion_modalidad_de_contratacion',
 'fecha_de_firma',
 'fecha_de_inicio_del_contrato',
 'fecha_de_fin_del_contrato',
 'precio_base',
 'valor_total',
 'valor_pagado']

2. Seleccionar y Filtrar (El "Bisturí")
Recortar los datos a lo que necesitas.
Python
 

In [24]:
# Seleccionar columnas específicas
df_nuevo = df_raw.select("entidad", "nit_entidad", "precio_base")
df_nuevo.show(5)
# Filtrar (WHERE)# Opción A (Estilo SQL):
df_filtrado = df_raw.filter("precio_base > 100000 ")
df_filtrado.count()
# Opción B (Estilo Pythonico - Recomendado
df_filtrado = df_raw.filter((col("edad") > 18) & (col("ciudad") == "Bogota"))


+--------------+-----------+-----------+
|       entidad|nit_entidad|precio_base|
+--------------+-----------+-----------+
|   ALCALDIA 90|       NULL|    3582652|
|   ALCALDIA 78|       NULL| 2745675528|
|   HOSPITAL 23|       NULL| 3704121732|
|UNIVERSIDAD 87|       NULL| 1993838284|
|  INSTITUTO 61|       NULL| 4437422507|
+--------------+-----------+-----------+
only showing top 5 rows



1000

In [25]:
# Crear una columna nueva (o reemplazar una existente)# Ejemplo: Crear columna 'edad_doble'
df_mod = df_raw.withColumn("precio_base_doble", col("precio_base") * 2)
df_mod.show()

+--------------+-----------+------------+------+------+-----------------------+----------------+-------------------------+---------------------------------------+--------------+----------------------------+-------------------------+-----------+-----------+------------+-----------------+
|       entidad|nit_entidad|departamento|ciudad|estado|descripcion_del_proceso|tipo_de_contrato|modalidad_de_contratacion|justificacion_modalidad_de_contratacion|fecha_de_firma|fecha_de_inicio_del_contrato|fecha_de_fin_del_contrato|precio_base|valor_total|valor_pagado|precio_base_doble|
+--------------+-----------+------------+------+------+-----------------------+----------------+-------------------------+---------------------------------------+--------------+----------------------------+-------------------------+-----------+-----------+------------+-----------------+
|   ALCALDIA 90|       NULL|     BOLIVAR|  NULL|  NULL|                   NULL|            NULL|                     NULL|              

In [None]:
# Renombrar columna
df_renom = df.withColumnRenamed("nombre_viejo", "nombre_nuevo")
# Borrar columna
df_menos =df.drop("columna_innecesaria")
# Condicionales (CASE WHEN)
df_cat = df.withColumn("categoria", 
             when(col("edad") < 18, "Menor")
             .when(col("edad") >= 65, "Jubilado")
             .otherwise("Adulto")
         )
 

In [30]:
help(sum)

Help on built-in function sum in module builtins:

sum(iterable, /, start=0)
    Return the sum of a 'start' value (default: 0) plus an iterable of numbers
    
    When the iterable is empty, return the start value.
    This function is intended specifically for use with numeric values and may
    reject non-numeric types.



In [32]:
# Agrupar por ciudad y sumar salarios
from pyspark.sql.functions import sum, avg, count, col
 
 

df_resumen = df_raw.groupBy("ciudad") \
               .agg(                   sum("precio_base").alias("inversion_total"),
                   avg("precio_base").alias("promedio_precio_base"),
                   count("*").alias("cantidad")
               )
    
df_resumen.show() 

+------+---------------+--------------------+--------+
|ciudad|inversion_total|promedio_precio_base|cantidad|
+------+---------------+--------------------+--------+
|  NULL|  2565343069588|    2.565343069588E9|    1000|
+------+---------------+--------------------+--------+



In [28]:
df_raw.printSchema()

root
 |-- entidad: string (nullable = true)
 |-- nit_entidad: string (nullable = true)
 |-- departamento: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- descripcion_del_proceso: string (nullable = true)
 |-- tipo_de_contrato: string (nullable = true)
 |-- modalidad_de_contratacion: string (nullable = true)
 |-- justificacion_modalidad_de_contratacion: string (nullable = true)
 |-- fecha_de_firma: date (nullable = true)
 |-- fecha_de_inicio_del_contrato: string (nullable = true)
 |-- fecha_de_fin_del_contrato: string (nullable = true)
 |-- precio_base: long (nullable = true)
 |-- valor_total: string (nullable = true)
 |-- valor_pagado: string (nullable = true)



In [33]:
# 1. Registras el DataFrame como una "tabla temporal" (vista)

df_raw.createOrReplaceTempView("temporal_tabla")



# 2. Escribes SQL puro

df_sql = spark.sql("""

    SELECT ciudad, avg(precio_base) as precio_base_promedio

    FROM temporal_tabla

    GROUP BY ciudad

""")



df_sql.show()

+------+--------------------+
|ciudad|precio_base_promedio|
+------+--------------------+
|  NULL|    2.565343069588E9|
+------+--------------------+

