# 1. Spark Session + Crear Dataframe + Filter

In [1]:
# 1. Importar la clase SparkSession desde la librería pyspark.sql
from pyspark.sql import SparkSession

# 2. Crear una instancia de SparkSession, que es el punto de entrada a la funcionalidad de Spark.
# .appName("Test") le da un nombre a la aplicación.
# .getOrCreate() obtiene una sesión existente o crea una nueva.
spark = SparkSession.builder.appName("Test").getOrCreate()

# 3. Definir los datos que se usarán para el DataFrame.
# Es una lista de tuplas, donde cada tupla representa una fila.
data = [("Victor", "Manuel Rámirez", 22, "México", "SLP", "Ingeniero"),
        ("Juan", "Pérez", 30, "México", "CDMX", "Doctor"),
        ("Ana", "García", 25, "México", "Jalisco", "Abogada"),
        ("Luis", "Hernández", 28, "México", "Nuevo León", "Arquitecto"),
        ("María", "López", 35, "México", "Puebla", "Maestra"),
        ("Pedro", "Martínez", 40, "México", "Guanajuato", "Contador")]

# 4. Definir los nombres de las columnas para el DataFrame.
# El orden corresponde al de los datos en las tuplas.
columns = ["Nombre", "Apellido", "Edad", "País", "Estado", "Profesión"]

# 5. Crear el DataFrame a partir de los datos y las columnas definidas.
# Un DataFrame es una estructura de datos distribuida similar a una tabla.
df = spark.createDataFrame(data, columns)

# 6. Mostrar las primeras 20 filas del DataFrame para verificar su contenido.
# Esta es una acción que ejecuta las transformaciones previas.
df.show()

# 7. Aplicar una transformación de filtro para crear un nuevo DataFrame.
# La condición es seleccionar solo las filas donde la columna "Edad" sea mayor a 30.
df_edad = df.filter(df.Edad > 30)

# 8. Mostrar el DataFrame resultante después del filtrado.
df_edad.show()

# 9. Detener la sesión de Spark para liberar los recursos del clúster.
# Es una buena práctica hacerlo al final de cada script.
spark.stop()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/28 20:05:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+------+--------------+----+------+----------+----------+
|Nombre|      Apellido|Edad|  País|    Estado| Profesión|
+------+--------------+----+------+----------+----------+
|Victor|Manuel Rámirez|  22|México|       SLP| Ingeniero|
|  Juan|         Pérez|  30|México|      CDMX|    Doctor|
|   Ana|        García|  25|México|   Jalisco|   Abogada|
|  Luis|     Hernández|  28|México|Nuevo León|Arquitecto|
| María|         López|  35|México|    Puebla|   Maestra|
| Pedro|      Martínez|  40|México|Guanajuato|  Contador|
+------+--------------+----+------+----------+----------+

+------+--------+----+------+----------+---------+
|Nombre|Apellido|Edad|  País|    Estado|Profesión|
+------+--------+----+------+----------+---------+
| María|   López|  35|México|    Puebla|  Maestra|
| Pedro|Martínez|  40|México|Guanajuato| Contador|
+------+--------+----+------+----------+---------+



# 2. GroupBy + Aggregate (agg)

In [2]:
# 1. Importar funciones específicas para realizar agregaciones desde pyspark.sql.functions
from pyspark.sql.functions import avg, count, max

# Importar la clase SparkSession para iniciar la sesión
from pyspark.sql import SparkSession

# 2. Crear o reutilizar una sesión de Spark con el nombre "Agregaciones"
spark = SparkSession.builder.appName("Agregaciones").getOrCreate()

# 3. Definir los datos de ejemplo en una lista de tuplas
data = [("Victor", "Manuel Rámirez", 22, "México", "SLP", "Ingeniero"),
        ("Juan", "Pérez", 30, "México", "CDMX", "Doctor"),
        ("Ana", "García", 25, "México", "Jalisco", "Abogada"),
        ("Luis", "Hernández", 28, "México", "Nuevo León", "Arquitecto"),
        ("María", "López", 35, "México", "Puebla", "Maestra"),
        ("Pedro", "Martínez", 40, "México", "Guanajuato", "Contador")]

# Definir los nombres de las columnas correspondientes
columns = ["Nombre", "Apellido", "Edad", "País", "Estado", "Profesión"]

# Crear el DataFrame inicial a partir de los datos y columnas
df = spark.createDataFrame(data, columns)

# Mostrar el DataFrame original para verificarlo
print("DataFrame Original:")
df.show()

# 4. Agrupar el DataFrame y aplicar funciones de agregación
# df.groupBy("Estado") agrupa todas las filas por los valores únicos de la columna "Estado".
# .agg() aplica una o más funciones de agregación a cada uno de esos grupos.
df_agregado = df.groupBy("Estado").agg(
    # count("*") cuenta el número de filas en cada grupo.
    # .alias() renombra la nueva columna a "numero_personas".
    count("*").alias("numero_personas"),
    
    # avg("Edad") calcula el promedio de la columna "Edad" para cada grupo.
    # Se renombra la columna resultante a "edad_promedio".
    avg("Edad").alias("edad_promedio"),
    
    # max("Edad") encuentra el valor máximo de la columna "Edad" en cada grupo.
    # Se renombra la columna a "edad_maxima".
    max("Edad").alias("edad_maxima")
)

# 5. Mostrar el DataFrame resultante con los datos agregados
print("DataFrame Agregado por Estado:")
df_agregado.show()

# 6. Detener la sesión de Spark para liberar los recursos
spark.stop()

DataFrame Original:


                                                                                

+------+--------------+----+------+----------+----------+
|Nombre|      Apellido|Edad|  País|    Estado| Profesión|
+------+--------------+----+------+----------+----------+
|Victor|Manuel Rámirez|  22|México|       SLP| Ingeniero|
|  Juan|         Pérez|  30|México|      CDMX|    Doctor|
|   Ana|        García|  25|México|   Jalisco|   Abogada|
|  Luis|     Hernández|  28|México|Nuevo León|Arquitecto|
| María|         López|  35|México|    Puebla|   Maestra|
| Pedro|      Martínez|  40|México|Guanajuato|  Contador|
+------+--------------+----+------+----------+----------+

DataFrame Agregado por Estado:
+----------+---------------+-------------+-----------+
|    Estado|numero_personas|edad_promedio|edad_maxima|
+----------+---------------+-------------+-----------+
|       SLP|              1|         22.0|         22|
|      CDMX|              1|         30.0|         30|
|   Jalisco|              1|         25.0|         25|
|Nuevo León|              1|         28.0|         28|
|  

# 3. Leer archivos CSV + InferSchema

In [3]:
# --- 1. Importaciones Necesarias ---
from pyspark.sql import SparkSession
# Importamos las funciones que usaremos para las agregaciones
from pyspark.sql.functions import count, avg, max

# --- 2. Creación de la SparkSession ---
# Es el punto de entrada para cualquier aplicación de Spark
spark = SparkSession.builder.appName("LecturaArchivos").getOrCreate()

# --- 3. Lectura de un Archivo CSV ---
# spark.read es la interfaz para leer datos de fuentes externas.
# .csv() especifica que el formato del archivo es CSV.
df_productos = spark.read.csv(
    "datos.csv",          # Ruta del archivo a leer
    header=True,          # La primera fila del CSV se usará como encabezado (nombres de columna)
    inferSchema=True      # Spark intentará adivinar el tipo de dato de cada columna (Integer, String, etc.)
)

# --- 4. Inspección Inicial del DataFrame ---
# .printSchema() muestra la estructura del DataFrame: nombres de columna y sus tipos de datos.
print("Estructura del DataFrame leído del CSV:")
df_productos.printSchema()

# .show() muestra las primeras 20 filas del DataFrame.
print("Contenido del DataFrame de productos:")
df_productos.show()

# --- 5. Filtrado con Múltiples Condiciones ---
# Se filtran los productos que cumplen dos condiciones a la vez.
# Nota: Cada condición debe ir entre paréntesis y se unen con '&' (AND) o '|' (OR).
print("Productos de la categoría 'Electronica' con más de 100 en stock:")
df_productos.filter(
    (df_productos.categoria == "Electronica") & (df_productos.stock > 100)
).show()
    
print("Productos de la categoría 'Muebles' con más de 100 en stock:")
df_productos.filter(
    (df_productos.categoria == "Muebles") & (df_productos.stock > 100)
).show()

# --- 6. Agregación de Datos por Categoría ---
# .groupBy() agrupa las filas según una columna para poder aplicar cálculos a cada grupo.
# .agg() ejecuta las funciones de agregación sobre los grupos.
print("\nResumen de datos por categoría:")
df_productos.groupBy("categoria").agg(
    count("*").alias("numero_productos"), # Contar cuántos productos hay en cada categoría
    avg("precio").alias("precio_promedio"),   # Calcular el precio promedio por categoría
    max("stock").alias("stock_maximo")        # Encontrar el stock máximo por categoría
).show()

# --- 7. Finalización de la Sesión ---
# Es una buena práctica detener la sesión para liberar los recursos.
spark.stop()

Estructura del DataFrame leído del CSV:
root
 |-- producto_id: integer (nullable = true)
 |-- nombre_producto: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- stock: integer (nullable = true)

Contenido del DataFrame de productos:
+-----------+----------------+-----------+------+-----+
|producto_id| nombre_producto|  categoria|precio|stock|
+-----------+----------------+-----------+------+-----+
|        101|      Laptop Pro|Electronica|1200.0|   50|
|        102|     Silla Gamer|    Muebles| 350.5|  120|
|        103|Teclado Mecanico|Electronica|150.75|  200|
|        104| Mesa de Oficina|    Muebles| 250.0|   80|
|        105|      Monitor 4K|Electronica| 800.0|   75|
+-----------+----------------+-----------+------+-----+

Productos de la categoría 'Electronica' con más de 100 en stock:
+-----------+----------------+-----------+------+-----+
|producto_id| nombre_producto|  categoria|precio|stock|
+-----------+-------------

# 4. Join

In [4]:
# --- 1. Importaciones y Creación de la Sesión ---
from pyspark.sql import SparkSession

# Crear o reutilizar una sesión de Spark con el nombre "EjemploJoins"
spark = SparkSession.builder.appName("EjemploJoins").getOrCreate()

# --- 2. Creación de los DataFrames de Ejemplo ---

# DataFrame de Empleados: Contiene ID de empleado, nombre y el ID del departamento al que pertenece.
datos_empleados = [
    (1, "Ana", 3),
    (2, "Luis", 1),
    (3, "Marta", 2),
    (4, "Juan", 3),
    (5, "Sofia", None) # Sofia no tiene un departamento asignado (valor nulo)
]
df_empleados = spark.createDataFrame(datos_empleados, ["id", "nombre", "departamento_id"])

# DataFrame de Departamentos: Contiene el ID del departamento y su nombre.
datos_deptos = [
    (1, "Ventas"),
    (2, "Marketing"),
    (3, "Ingeniería"),
    (4, "Recursos Humanos") # El depto. de RRHH no tiene empleados asignados en nuestra tabla de empleados.
]
df_deptos = spark.createDataFrame(datos_deptos, ["id", "nombre_depto"])

# --- 3. Mostrar los DataFrames Originales ---
print("Empleados:")
df_empleados.show()

print("Departamentos:")
df_deptos.show()

# --- 4. Unir (Join) los dos DataFrames ---
# La operación 'join' combina filas de dos DataFrames basándose en una condición de igualdad.
# Sintaxis: df1.join(df2, condición_de_unión, tipo_de_join)
df_unido = df_empleados.join(
    df_deptos,                                 # El segundo DataFrame a unir.
    df_empleados.departamento_id == df_deptos.id, # La condición que conecta las dos tablas.
    "inner"                                    # Tipo de join. "inner" solo incluye filas donde la clave existe en AMBAS tablas.
)
# En un 'inner join':
# - 'Sofia' será

Empleados:


                                                                                

+---+------+---------------+
| id|nombre|departamento_id|
+---+------+---------------+
|  1|   Ana|              3|
|  2|  Luis|              1|
|  3| Marta|              2|
|  4|  Juan|              3|
|  5| Sofia|           NULL|
+---+------+---------------+

Departamentos:
+---+----------------+
| id|    nombre_depto|
+---+----------------+
|  1|          Ventas|
|  2|       Marketing|
|  3|      Ingeniería|
|  4|Recursos Humanos|
+---+----------------+



# 5. Write file as parquet & csv

In [5]:
# --- 1. Importaciones y Creación de la Sesión ---
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("EscrituraArchivos").getOrCreate()

# --- 2. Preparación de Datos ---
# Se recrean los DataFrames de empleados y departamentos.
datos_empleados = [(1, "Ana", 3), (2, "Luis", 1), (3, "Marta", 2), (4, "Juan", 3)]
df_empleados = spark.createDataFrame(datos_empleados, ["id", "nombre", "departamento_id"])

datos_deptos = [(1, "Ventas"), (2, "Marketing"), (3, "Ingeniería")]
df_deptos = spark.createDataFrame(datos_deptos, ["id", "nombre_depto"])

# Se realiza el 'join' para combinar los datos.
df_unido = df_empleados.join(df_deptos, df_empleados.departamento_id == df_deptos.id, "inner")

# --- 3. Limpieza del DataFrame ---
# Después de un join, es común tener columnas de ID duplicadas.
# Se elimina la columna 'id' que viene del DataFrame de departamentos para evitar redundancia.
df_unido = df_unido.drop(df_deptos.id)
print("DataFrame final que vamos a guardar:")
df_unido.show()

# --- 4. Escritura de Archivos ---
# 'df.write' es la interfaz para guardar un DataFrame en un sistema de archivos.

# --- Guardar como Parquet ---
# Parquet es un formato de archivo columnar muy eficiente para análisis de Big Data.
df_unido.write.mode("overwrite").parquet("salida_parquet")
# .mode() especifica el comportamiento si el archivo/directorio ya existe:
#  - "overwrite": Reemplaza los datos existentes.
#  - "append": Agrega los nuevos datos a los existentes.
#  - "ignore": No hace nada si ya existen datos.
#  - "error" (o "errorifexists"): Lanza un error (comportamiento por defecto).

# --- Guardar como CSV ---
# CSV es un formato de texto plano, útil por su compatibilidad.
df_unido.write.mode("overwrite").option("header", "true").csv("salida_csv")
# .option("header", "true") es específico para CSV e indica que se debe escribir
# una primera fila con los nombres de las columnas.

print("¡DataFrames guardados exitosamente!")

# --- 5. Finalización de la Sesión ---
spark.stop()

DataFrame final que vamos a guardar:


25/07/28 20:06:04 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+------+---------------+------------+
| id|nombre|departamento_id|nombre_depto|
+---+------+---------------+------------+
|  2|  Luis|              1|      Ventas|
|  3| Marta|              2|   Marketing|
|  1|   Ana|              3|  Ingeniería|
|  4|  Juan|              3|  Ingeniería|
+---+------+---------------+------------+



                                                                                

¡DataFrames guardados exitosamente!


# 6. UDF & withColumn

In [6]:
# --- 1. Importaciones Necesarias ---
from pyspark.sql import SparkSession
# udf: Función para registrar una función de Python como una UDF de Spark.
from pyspark.sql.functions import udf
# StringType: Tipo de dato para especificar qué devolverá nuestra UDF.
from pyspark.sql.types import StringType

# --- 2. Creación de la SparkSession ---
spark = SparkSession.builder.appName("EjemploUDF").getOrCreate()

# --- 3. Creación del DataFrame de Ejemplo ---
datos = [("Ana", 28),
         ("Luis", 35),
         ("Marta", 42),
         ("Juan", 22),
         ("Sofia", 55)]
df = spark.createDataFrame(datos, ["nombre", "edad"])

# --- 4. Definición y Registro de una UDF ---

# Paso 1: Definimos una función de Python normal.
# Esta función toma un valor (la edad) y devuelve una categoría como un string.
def categorizar_experiencia(edad):
    if edad < 25:
        return "Junior"
    elif 25 <= edad < 40:
        return "Semi-Senior"
    else:
        return "Senior"

# Paso 2: Registramos la función de Python como una UDF de Spark.
# Le decimos a Spark cuál función usar (categorizar_experiencia) y
# qué tipo de dato va a devolver (StringType). Esto es crucial.
experiencia_udf = udf(categorizar_experiencia, StringType())

# --- 5. Aplicación de la UDF para Crear una Nueva Columna ---

# Paso 3: Usamos .withColumn para agregar una nueva columna llamada "experiencia".
# Para generar los valores de la nueva columna, llamamos a nuestra UDF
# y le pasamos la columna 'edad' del DataFrame como argumento.
df_con_experiencia = df.withColumn(
    "experiencia",
    experiencia_udf(df.edad)
)

# --- 6. Visualización y Finalización ---
print("DataFrame con la categoría de experiencia:")
df_con_experiencia.show()

spark.stop()

DataFrame con la categoría de experiencia:


                                                                                

+------+----+-----------+
|nombre|edad|experiencia|
+------+----+-----------+
|   Ana|  28|Semi-Senior|
|  Luis|  35|Semi-Senior|
| Marta|  42|     Senior|
|  Juan|  22|     Junior|
| Sofia|  55|     Senior|
+------+----+-----------+



# 7. Spark UI 

In [7]:
# --- 1. Importaciones y Creación de la Sesión ---
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Iniciar la sesión de Spark. El nombre "ExplorandoSparkUI" te ayudará a identificarla.
spark = SparkSession.builder.appName("ExplorandoSparkUI").getOrCreate()

# --- 2. Definición de Transformaciones (Operaciones Perezosas) ---
# Spark no ejecuta nada todavía, solo construye un plan (DAG) de lo que tiene que hacer.

# Leer un archivo CSV de vuelos. Asegúrate de tener un archivo con este nombre.
df_vuelos = spark.read.option("header", "true").csv("vuelos.csv")

# Filtrar para quedarnos solo con los vuelos que originan en "JFK".
df_vuelos = df_vuelos.filter(df_vuelos.origin == "JFK")

# Agrupar por destino y contar el número de vuelos para cada uno.
df_agrupado = df_vuelos.groupBy("dest").agg(count("*").alias("total_vuelos"))

# Ordenar los resultados para ver los destinos más populares primero.
df_ordenado = df_agrupado.orderBy("total_vuelos", ascending=False)

# --- 3. Ejecución de una Acción (Lanzamiento del Job) ---
# La llamada a .show() es una "acción". Obliga a Spark a ejecutar el plan de transformaciones.
# En este momento se lanza un "Job" de Spark, que podrás ver en la Spark UI.
print("Top 5 destinos desde JFK:")
df_ordenado.show(5)

# --- 4. Pausa para Explorar la Spark UI ---
# La Spark UI normalmente solo está activa mientras el script se ejecuta.
# Este 'input()' pausa el script, manteniendo la sesión y la UI activas.
# Abre tu navegador y ve a http://localhost:4040 para explorar los detalles del Job.
input("Presiona Enter en la terminal para terminar el script y cerrar la Spark UI...")

# --- 5. Detención de la Sesión ---
# Una vez que presionas Enter, el script continúa y detiene la sesión.
# Esto también apaga la Spark UI.
spark.stop()

Top 5 destinos desde JFK:
+----+------------+
|dest|total_vuelos|
+----+------------+
| LAX|           3|
| ORD|           2|
| MIA|           1|
| SFO|           1|
+----+------------+



# 8. Lectura y escritura de archivos JSON -> parquet/csv

In [8]:
# --- 1. Importaciones Necesarias ---
from pyspark.sql import SparkSession
# 'explode' es la función clave para trabajar con columnas de tipo Array (listas).
from pyspark.sql.functions import explode

# --- 2. Creación de la Sesión ---
spark = SparkSession.builder.appName("DatosComplejos").getOrCreate()

# --- 3. Lectura de Datos Complejos (JSON) ---
# Spark puede leer archivos JSON directamente y entender su estructura anidada,
# creando columnas de tipo Struct (objetos) y Array (listas).
df = spark.read.json("datos_pedidos.json")

# Es muy útil imprimir el esquema para entender la estructura que Spark ha inferido.
print("Esquema inferido del JSON:")
df.printSchema()

# truncate=False es útil para ver el contenido completo de las columnas complejas.
df.show(truncate=False)

# --- 4. Acceso a Datos Anidados ---

# Se puede acceder a los campos dentro de una estructura (Struct) usando la notación de punto.
print("Accediendo a un campo anidado (ciudad):")
df.select("order_id", "shipping_address.city").show()

# --- 5. "Explotar" un Array para Aplanar los Datos ---

# La función 'explode' toma una columna de tipo Array ('items') y crea una nueva fila
# por cada elemento que contiene el array.
df_exploded = df.withColumn("item", explode("items"))

print("DataFrame después de 'explode' (una fila por cada item del pedido):")
df_exploded.show(truncate=False)

# --- 6. Seleccionar los Campos Finales ---

# Ahora que cada 'item' está en su propia fila, podemos acceder a sus campos
# internos (product_id, quantity) también con la notación de punto.
# Este proceso completo se conoce como "aplanar" la estructura de datos.
df_final = df_exploded.select(
    "order_id",
    "customer_name",
    "shipping_address.city",
    "item.product_id",
    "item.quantity"
)

print("Resultado final y aplanado:")
df_final.show()

# --- 7. Guardar el Resultado ---
# Guardamos el DataFrame aplanado en los formatos deseados.
df_final.write.mode("overwrite").parquet("pedidos_a_planar_parquet")
df_final.write.mode("overwrite").option("header", "true").csv("pedidos_a_planar_csv")

# --- 8. Finalización ---
spark.stop()

Esquema inferido del JSON:
root
 |-- customer_name: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- shipping_address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)

+----------------+----------------------+--------+--------------------------------------+
|customer_name   |items                 |order_id|shipping_address                      |
+----------------+----------------------+--------+--------------------------------------+
|Carlos Ruiz     |[{P001, 2}, {P002, 1}]|101     |{Springfield, Calle Falsa 123}        |
|Ana Lopez       |[{P003, 5}]           |102     |{Shelbyville, Avenida Siempreviva 742}|
|Luis Gomez      |[{P001, 1}, {P004, 3}]|103     |{Springfield, Calle del Sol 456}      |
|Maria Perez     |[{P002, 2}

# 9. EJEMPLO: Dataframe de estudiantes

In [9]:
# --- 1. Importaciones y Creación de la Sesión ---
from pyspark.sql import SparkSession

# Crear la sesión de Spark, que es el punto de entrada a toda su funcionalidad.
spark = SparkSession.builder.appName("estudiantes").getOrCreate()

# --- 2. Carga y Exploración de Datos ---
# Obtener los datos de un archivo JSON. Spark infiere automáticamente el esquema complejo.
df_estudiantes = spark.read.json("estudiantes.json")

# Mostrar el esquema del DataFrame para entender su estructura (columnas, tipos y si son anidados).
df_estudiantes.printSchema()

# Mostrar el contenido del DataFrame. truncate=False evita que el contenido de las columnas se corte.
df_estudiantes.show(truncate=False)

# --- 3. Agrupación y Filtrado ---
# Agrupar por la columna 'gender' y contar cuántas filas (estudiantes) hay en cada grupo.
df_estudiantes.groupBy("gender").count().show()

# Filtrar el DataFrame para crear uno nuevo que solo contenga a los hombres.
df_hombres = df_estudiantes.filter(df_estudiantes.gender == "Masculino")
df_hombres.show()

# Filtrar para crear un DataFrame que solo contenga a las mujeres.
df_mujeres = df_estudiantes.filter(df_estudiantes.gender == "Femenino")
df_mujeres.show()

# --- 4. Cálculo de Promedios Accediendo a Datos Anidados ---
# Importar las funciones 'round' para redondear y 'col' para referenciar columnas.
from pyspark.sql.functions import round, col

# Añadir una nueva columna "promedio" al DataFrame de hombres.
df_hombres_calif = df_hombres.withColumn("promedio", 
    # Usamos round() para redondear el resultado final a 2 decimales.
    round(
        # Se accede a cada calificación anidada con la notación de punto (ej: "grades.parcial_1").
        # Se suman todas las calificaciones y se dividen entre 6 para obtener el promedio.
        ((col("grades.parcial_1") + col("grades.parcial_2") + col("grades.parcial_3") +
          col("grades.parcial_4") + col("grades.parcial_5") + col("grades.parcial_6")) / 6),
        2  # Número de decimales para el redondeo.
    )
)
df_hombres_calif.show()

# Se repite el mismo cálculo para el DataFrame de mujeres.
df_mujeres_calif = df_mujeres.withColumn("promedio",
    round(
        ((col("grades.parcial_1") + col("grades.parcial_2") + col("grades.parcial_3") + 
          col("grades.parcial_4") + col("grades.parcial_5") + col("grades.parcial_6")) / 6),
        2
    )
)
df_mujeres_calif.show()

# --- 5. Aplanar Datos con 'explode' ---
# Importar la función 'explode' para trabajar con columnas de tipo array.
from pyspark.sql.functions import explode

# 'explode' crea una nueva fila por cada elemento en el array 'extracurricular_activities'.
# La nueva columna se llamará 'actividades'.
df_hombres_exploded = df_hombres_calif.withColumn("actividades", explode("extracurricular_activities"))

# Eliminar la columna original del array para evitar datos redundantes.
df_hombres_exploded = df_hombres_exploded.drop("extracurricular_activities")

# Mostrar el resultado final aplanado.
df_hombres_exploded.show(truncate=False)

# --- 6. Finalización ---
# Detener la sesión de Spark para liberar los recursos.
spark.stop()

root
 |-- age: long (nullable = true)
 |-- enrollment_year: long (nullable = true)
 |-- extracurricular_activities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- grades: struct (nullable = true)
 |    |-- parcial_1: double (nullable = true)
 |    |-- parcial_2: double (nullable = true)
 |    |-- parcial_3: double (nullable = true)
 |    |-- parcial_4: double (nullable = true)
 |    |-- parcial_5: double (nullable = true)
 |    |-- parcial_6: double (nullable = true)
 |-- last_name: string (nullable = true)
 |-- student_id: string (nullable = true)

+---+---------------+--------------------------+----------+---------+--------------------------------+---------+----------+
|age|enrollment_year|extracurricular_activities|first_name|gender   |grades                          |last_name|student_id|
+---+---------------+--------------------------+----------+---------+-----------------