# Fundamentos de Apache Spark

## Objetivos de Aprendizaje
- Entender la diferencia entre transformaciones y acciones en Spark
- Aprender las operaciones b√°sicas con DataFrames
- Practicar lectura y escritura de datos
- Comprender el concepto de "lazy evaluation"

## Prerequisitos
- Haber completado `01_environment_check.ipynb`
- Entorno de Spark funcionando

## Tiempo Estimado
‚è±Ô∏è 30 minutos

## M√≥dulo AWS Academy Relacionado
üìö M√≥dulo 9: Big Data Processing (EMR, Spark)

---
# === SECCI√ìN 1 ===
## 1. Inicializar Spark

### Explicaci√≥n Conceptual
Antes de trabajar con Spark, siempre necesitamos crear una sesi√≥n. Es como abrir una aplicaci√≥n antes de usarla.

In [None]:
# Importamos SparkSession para crear nuestra conexion con Spark
from pyspark.sql import SparkSession

# Importamos funciones de Spark que usaremos frecuentemente
# El alias 'F' es una convencion comun para acceder a las funciones
from pyspark.sql import functions as F

# Importamos tipos de datos para definir esquemas
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Creamos la sesion de Spark
# master("local[*]") usa todos los cores disponibles en modo local
spark = SparkSession.builder \
    .appName("SparkBasics") \
    .master("local[*]") \
    .getOrCreate()

# Reducimos la verbosidad de los logs
spark.sparkContext.setLogLevel("WARN")

print("Spark inicializado correctamente")
print(f"Version: {spark.version}")

---
# === SECCI√ìN 2 ===
## 2. Crear DataFrames

### Explicaci√≥n Conceptual
Hay varias formas de crear un DataFrame en Spark:
1. **Desde listas de Python** - Para datos peque√±os o pruebas
2. **Desde archivos** - CSV, JSON, Parquet, etc.
3. **Desde bases de datos** - JDBC, Hive, etc.

**Analog√≠a del mundo real:** Crear un DataFrame es como preparar una hoja de c√°lculo. Puedes escribir los datos a mano, importarlos de un archivo, o conectarte a un sistema externo.

In [None]:
# METODO 1: Crear DataFrame desde una lista de tuplas
# Cada tupla es una fila de datos

# Datos de ventas de ejemplo
ventas_data = [
    ("2024-01-01", "Laptop", "Electronica", 2, 15000.0),
    ("2024-01-01", "Mouse", "Electronica", 10, 350.0),
    ("2024-01-02", "Escritorio", "Muebles", 1, 5000.0),
    ("2024-01-02", "Silla", "Muebles", 4, 2500.0),
    ("2024-01-03", "Monitor", "Electronica", 3, 8000.0),
    ("2024-01-03", "Teclado", "Electronica", 8, 800.0),
    ("2024-01-04", "Lampara", "Muebles", 5, 600.0),
    ("2024-01-04", "Laptop", "Electronica", 1, 15000.0),
    ("2024-01-05", "Mouse", "Electronica", 15, 350.0),
    ("2024-01-05", "Silla", "Muebles", 2, 2500.0)
]

# Nombres de las columnas
ventas_columnas = ["fecha", "producto", "categoria", "cantidad", "precio_unitario"]

# Creamos el DataFrame
df_ventas = spark.createDataFrame(ventas_data, ventas_columnas)

# Mostramos las primeras filas
print("DataFrame de ventas:")
df_ventas.show()

# Output esperado:
# +----------+----------+-----------+--------+---------------+
# |     fecha|  producto|  categoria|cantidad|precio_unitario|
# +----------+----------+-----------+--------+---------------+
# |2024-01-01|    Laptop|Electronica|       2|        15000.0|
# |2024-01-01|     Mouse|Electronica|      10|          350.0|
# ...

In [None]:
# METODO 2: Crear DataFrame con esquema explicito
# Esto da mas control sobre los tipos de datos

# Definimos el esquema usando StructType y StructField
# StructField(nombre, tipo, puede_ser_nulo)
esquema_clientes = StructType([
    StructField("cliente_id", StringType(), False),    # No puede ser nulo
    StructField("nombre", StringType(), True),         # Puede ser nulo
    StructField("edad", IntegerType(), True),
    StructField("ciudad", StringType(), True),
    StructField("credito_limite", DoubleType(), True)
])

# Datos de clientes
clientes_data = [
    ("C001", "Ana Garcia", 28, "CDMX", 50000.0),
    ("C002", "Carlos Lopez", 35, "Guadalajara", 75000.0),
    ("C003", "Maria Rodriguez", 42, "Monterrey", 100000.0),
    ("C004", "Juan Martinez", 31, "CDMX", 60000.0),
    ("C005", "Laura Sanchez", 25, "Puebla", 40000.0)
]

# Creamos el DataFrame con el esquema definido
df_clientes = spark.createDataFrame(clientes_data, esquema_clientes)

# Mostramos el esquema para verificar los tipos
print("Esquema del DataFrame de clientes:")
df_clientes.printSchema()

# Mostramos los datos
df_clientes.show()

# Output esperado:
# root
#  |-- cliente_id: string (nullable = false)
#  |-- nombre: string (nullable = true)
#  |-- edad: integer (nullable = true)
#  ...

---
# === SECCI√ìN 3 ===
## 3. Transformaciones vs Acciones (Lazy Evaluation)

### Explicaci√≥n Conceptual
Spark usa un concepto llamado **"lazy evaluation"** (evaluaci√≥n perezosa):

- **Transformaciones**: Definen QU√â hacer, pero NO ejecutan nada todav√≠a. Son como escribir una receta.
  - Ejemplos: `filter()`, `select()`, `groupBy()`, `withColumn()`

- **Acciones**: Ejecutan TODAS las transformaciones pendientes. Son como cocinar la receta.
  - Ejemplos: `show()`, `count()`, `collect()`, `write()`

**Analog√≠a del mundo real:** Imagina que est√°s en un restaurante. Las transformaciones son como agregar platos a tu orden (no se cocina nada a√∫n). La acci√≥n es cuando dices "eso es todo" y el mesero env√≠a la orden a la cocina.

**¬øPor qu√© lazy?** Porque Spark puede optimizar todas las operaciones juntas antes de ejecutarlas.

In [None]:
# TRANSFORMACIONES (no ejecutan nada todavia)

# Transformacion 1: Filtrar ventas de Electronica
# filter() selecciona filas que cumplen una condicion
df_electronica = df_ventas.filter(df_ventas["categoria"] == "Electronica")

# Transformacion 2: Agregar columna de total
# withColumn() agrega o reemplaza una columna
# F.col() referencia una columna existente
df_con_total = df_electronica.withColumn(
    "total",                                      # Nombre de la nueva columna
    F.col("cantidad") * F.col("precio_unitario")  # Calculo: cantidad * precio
)

# Transformacion 3: Seleccionar solo algunas columnas
# select() elige que columnas mostrar
df_resultado = df_con_total.select("fecha", "producto", "total")

# NOTA: Hasta aqui, Spark NO ha procesado ningun dato
# Solo ha guardado las instrucciones de que hacer

print("Transformaciones definidas (aun no ejecutadas)")
print("Tipo del resultado:", type(df_resultado))

In [None]:
# ACCION: Ahora si ejecutamos todo
# show() es una ACCION que dispara la ejecucion

print("Ejecutando transformaciones con show():")
df_resultado.show()

# Output esperado:
# +----------+-------+-------+
# |     fecha|producto|  total|
# +----------+-------+-------+
# |2024-01-01| Laptop|30000.0|
# |2024-01-01|  Mouse| 3500.0|
# |2024-01-03|Monitor|24000.0|
# ...

In [None]:
# Podemos ver el "plan de ejecucion" que Spark genera
# explain() muestra como Spark planea ejecutar las transformaciones

print("Plan de ejecucion:")
df_resultado.explain()

# Esto muestra las operaciones que Spark realizara
# Es util para entender y optimizar consultas complejas

---
# === SECCI√ìN 4 ===
## 4. Operaciones B√°sicas con DataFrames

### Explicaci√≥n Conceptual
Las operaciones m√°s comunes en DataFrames son:
- **select()**: Elegir columnas (como SELECT en SQL)
- **filter()**: Filtrar filas (como WHERE en SQL)
- **withColumn()**: Agregar o modificar columnas
- **groupBy()**: Agrupar datos (como GROUP BY en SQL)
- **orderBy()**: Ordenar datos (como ORDER BY en SQL)

In [None]:
# SELECT: Elegir columnas especificas

# Forma 1: Usando nombres de columnas como strings
df_ventas.select("producto", "cantidad").show(5)

# Forma 2: Usando F.col() para mas flexibilidad
df_ventas.select(
    F.col("producto"),
    F.col("cantidad"),
    (F.col("cantidad") * F.col("precio_unitario")).alias("total")  # Columna calculada
).show(5)

# Output esperado: tabla con producto, cantidad, y total calculado

In [None]:
# FILTER: Filtrar filas por condiciones

# Filtro simple: cantidad mayor a 5
print("Ventas con cantidad > 5:")
df_ventas.filter(F.col("cantidad") > 5).show()

# Filtros multiples: usando & (AND) y | (OR)
# IMPORTANTE: Cada condicion debe estar entre parentesis
print("Electronica con cantidad >= 5:")
df_ventas.filter(
    (F.col("categoria") == "Electronica") &  # AND
    (F.col("cantidad") >= 5)
).show()

In [None]:
# WITHCOLUMN: Agregar o modificar columnas

# Agregar columna de total
df_con_total = df_ventas.withColumn(
    "total",
    F.col("cantidad") * F.col("precio_unitario")
)

# Agregar columna de fecha como tipo Date (conversion)
df_con_fecha = df_con_total.withColumn(
    "fecha_date",
    F.to_date(F.col("fecha"), "yyyy-MM-dd")  # Convertir string a fecha
)

# Agregar columna con valor constante
df_con_region = df_con_fecha.withColumn(
    "region",
    F.lit("Norte")  # F.lit() crea un valor literal/constante
)

df_con_region.show(5)
df_con_region.printSchema()

In [None]:
# GROUPBY + AGREGACIONES: Resumir datos

# Agrupar por categoria y calcular metricas
print("Resumen por categoria:")
df_ventas.groupBy("categoria").agg(
    F.count("*").alias("num_ventas"),           # Contar filas
    F.sum("cantidad").alias("cantidad_total"),  # Sumar cantidades
    F.avg("precio_unitario").alias("precio_promedio"),  # Promedio
    F.max("precio_unitario").alias("precio_max")  # Maximo
).show()

# Agrupar por multiples columnas
print("Resumen por fecha y categoria:")
df_ventas.groupBy("fecha", "categoria").agg(
    F.sum("cantidad").alias("cantidad_total"),
    F.sum(F.col("cantidad") * F.col("precio_unitario")).alias("ingreso_total")
).show()

In [None]:
# ORDERBY: Ordenar resultados

# Ordenar por precio descendente
print("Ventas ordenadas por precio (mayor a menor):")
df_ventas.orderBy(F.col("precio_unitario").desc()).show(5)

# Ordenar por multiples columnas
print("Ordenado por categoria ASC, luego por cantidad DESC:")
df_ventas.orderBy(
    F.col("categoria").asc(),
    F.col("cantidad").desc()
).show()

---
# === SECCI√ìN 5 ===
## 5. Spark SQL

### Explicaci√≥n Conceptual
Spark permite usar SQL directamente sobre DataFrames. Esto es √∫til si ya conoces SQL o prefieres esa sintaxis.

**Analog√≠a del mundo real:** Es como tener dos idiomas para comunicarte: puedes usar la API de DataFrames (como Python) o SQL (como el ingl√©s del mundo de bases de datos). Ambos logran lo mismo.

In [None]:
# Primero, registramos el DataFrame como una "tabla temporal"
# Esto permite que SQL acceda a los datos
df_ventas.createOrReplaceTempView("ventas")

print("Tabla 'ventas' registrada para SQL")

In [None]:
# Ahora podemos usar SQL directamente
# spark.sql() ejecuta consultas SQL y retorna un DataFrame

# Consulta simple
print("SELECT con SQL:")
spark.sql("""
    SELECT producto, cantidad, precio_unitario
    FROM ventas
    WHERE cantidad > 5
    ORDER BY cantidad DESC
""").show()

In [None]:
# Consulta con agregaciones
print("Agregacion con SQL:")
spark.sql("""
    SELECT 
        categoria,
        COUNT(*) as num_ventas,
        SUM(cantidad) as total_unidades,
        ROUND(AVG(precio_unitario), 2) as precio_promedio,
        SUM(cantidad * precio_unitario) as ingreso_total
    FROM ventas
    GROUP BY categoria
    ORDER BY ingreso_total DESC
""").show()

In [None]:
# Consulta con subconsulta
print("Productos con precio mayor al promedio:")
spark.sql("""
    SELECT producto, precio_unitario
    FROM ventas
    WHERE precio_unitario > (SELECT AVG(precio_unitario) FROM ventas)
    ORDER BY precio_unitario DESC
""").show()

---
# === SECCI√ìN 6 ===
## 6. Lectura y Escritura de Archivos

### Explicaci√≥n Conceptual
Spark puede leer y escribir datos en m√∫ltiples formatos:
- **CSV**: Texto separado por comas (f√°cil de leer, pero no eficiente)
- **JSON**: Datos estructurados (flexible pero verboso)
- **Parquet**: Formato columnar binario (muy eficiente para Big Data)

**Analog√≠a del mundo real:** Es como guardar fotos: puedes usar JPG (peque√±o pero pierde calidad), PNG (sin p√©rdida pero grande), o RAW (m√°xima calidad para profesionales). Parquet es como el RAW del mundo de datos.

In [None]:
# Definimos las rutas de salida
ruta_csv = "/home/jovyan/data/sample/ventas_ejemplo.csv"
ruta_parquet = "/home/jovyan/data/sample/ventas_ejemplo.parquet"
ruta_json = "/home/jovyan/data/sample/ventas_ejemplo.json"

# ESCRIBIR CSV
# mode("overwrite") reemplaza si ya existe
# header=True incluye los nombres de columnas
df_ventas.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(ruta_csv)

print(f"CSV guardado en: {ruta_csv}")

# ESCRIBIR PARQUET (formato recomendado para Big Data)
df_ventas.write \
    .mode("overwrite") \
    .parquet(ruta_parquet)

print(f"Parquet guardado en: {ruta_parquet}")

# ESCRIBIR JSON
df_ventas.write \
    .mode("overwrite") \
    .json(ruta_json)

print(f"JSON guardado en: {ruta_json}")

In [None]:
# LEER CSV
df_desde_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(ruta_csv)

print("Datos leidos desde CSV:")
df_desde_csv.show(3)

# LEER PARQUET (mas rapido, preserva tipos)
df_desde_parquet = spark.read.parquet(ruta_parquet)

print("Datos leidos desde Parquet:")
df_desde_parquet.show(3)

# LEER JSON
df_desde_json = spark.read.json(ruta_json)

print("Datos leidos desde JSON:")
df_desde_json.show(3)

In [None]:
# Comparar esquemas de cada formato
print("Esquema desde CSV (inferido):")
df_desde_csv.printSchema()

print("Esquema desde Parquet (preservado exacto):")
df_desde_parquet.printSchema()

# Parquet preserva los tipos exactos, CSV necesita inferirlos

---
# === EJERCICIOS PR√ÅCTICOS ===

### üéØ Ejercicio 2.1: An√°lisis de Ventas

Usando el DataFrame `df_ventas`:
1. Calcula el ingreso total por producto (cantidad √ó precio)
2. Encuentra el producto con mayor ingreso total
3. Muestra solo los 3 productos principales

**Pistas:**
- Usa `groupBy()` y `agg()` con `F.sum()`
- Usa `orderBy()` con `.desc()` para orden descendente
- Usa `limit(3)` para mostrar solo 3 filas

In [None]:
# TODO: Calcula el top 3 productos por ingreso



### ‚úÖ Soluci√≥n Ejercicio 2.1

In [None]:
# Solucion: Top 3 productos por ingreso

# Paso 1: Calcular ingreso por fila
# withColumn agrega una columna con el calculo
df_con_ingreso = df_ventas.withColumn(
    "ingreso",
    F.col("cantidad") * F.col("precio_unitario")
)

# Paso 2: Agrupar por producto y sumar ingresos
# groupBy agrupa, agg aplica funciones de agregacion
df_por_producto = df_con_ingreso.groupBy("producto").agg(
    F.sum("ingreso").alias("ingreso_total"),      # Suma de ingresos
    F.sum("cantidad").alias("unidades_vendidas")  # Suma de unidades
)

# Paso 3: Ordenar descendente y limitar a 3
top_3 = df_por_producto \
    .orderBy(F.col("ingreso_total").desc()) \
    .limit(3)

print("Top 3 productos por ingreso:")
top_3.show()

# Output esperado:
# +--------+-------------+-----------------+
# |producto|ingreso_total|unidades_vendidas|
# +--------+-------------+-----------------+
# |  Laptop|      45000.0|                3|
# | Monitor|      24000.0|                3|
# |   Silla|      15000.0|                6|
# +--------+-------------+-----------------+

### üéØ Ejercicio 2.2: Filtrado Avanzado

Encuentra todas las ventas que cumplan TODAS estas condiciones:
1. Categor√≠a es "Electronica"
2. Precio unitario mayor a 500
3. Cantidad mayor o igual a 2

**Pistas:**
- Usa `filter()` con condiciones m√∫ltiples
- Conecta condiciones con `&` (AND)
- Cada condici√≥n debe estar entre par√©ntesis

In [None]:
# TODO: Filtra las ventas que cumplan todas las condiciones



### ‚úÖ Soluci√≥n Ejercicio 2.2

In [None]:
# Solucion: Filtrado avanzado

# Definimos cada condicion
# F.col() referencia la columna para la comparacion
condicion_categoria = F.col("categoria") == "Electronica"
condicion_precio = F.col("precio_unitario") > 500
condicion_cantidad = F.col("cantidad") >= 2

# Aplicamos todas las condiciones con & (AND)
df_filtrado = df_ventas.filter(
    condicion_categoria & 
    condicion_precio & 
    condicion_cantidad
)

print("Ventas filtradas (Electronica, precio>500, cantidad>=2):")
df_filtrado.show()

# Forma alternativa en una sola expresion:
df_ventas.filter(
    (F.col("categoria") == "Electronica") &
    (F.col("precio_unitario") > 500) &
    (F.col("cantidad") >= 2)
).show()

# Output esperado:
# +----------+-------+-----------+--------+---------------+
# |     fecha|producto|  categoria|cantidad|precio_unitario|
# +----------+-------+-----------+--------+---------------+
# |2024-01-01| Laptop|Electronica|       2|        15000.0|
# |2024-01-03|Monitor|Electronica|       3|         8000.0|
# |2024-01-03|Teclado|Electronica|       8|          800.0|
# +----------+-------+-----------+--------+---------------+

### üéØ Ejercicio 2.3: Spark SQL

Usando Spark SQL (no la API de DataFrame), escribe una consulta que:
1. Calcule el ingreso total por fecha
2. Ordene por fecha
3. Muestre fecha e ingreso total

**Pistas:**
- La tabla ya est√° registrada como "ventas"
- Usa `SUM(cantidad * precio_unitario)`
- Usa `GROUP BY fecha`

In [None]:
# TODO: Escribe la consulta SQL



### ‚úÖ Soluci√≥n Ejercicio 2.3

In [None]:
# Solucion: Ingreso por fecha con SQL

resultado_sql = spark.sql("""
    SELECT 
        fecha,
        SUM(cantidad * precio_unitario) as ingreso_total
    FROM ventas
    GROUP BY fecha
    ORDER BY fecha
""")

print("Ingreso total por fecha:")
resultado_sql.show()

# Output esperado:
# +----------+-------------+
# |     fecha|ingreso_total|
# +----------+-------------+
# |2024-01-01|      33500.0|
# |2024-01-02|      15000.0|
# |2024-01-03|      30400.0|
# |2024-01-04|      18000.0|
# |2024-01-05|      10250.0|
# +----------+-------------+

---
# === RESUMEN FINAL ===

## Resumen

### Conceptos Clave
- **Lazy Evaluation**: Spark no ejecuta nada hasta que hay una acci√≥n. Las transformaciones son "recetas", las acciones son "cocinar".
- **Transformaciones**: `select()`, `filter()`, `withColumn()`, `groupBy()`, `orderBy()` - definen qu√© hacer
- **Acciones**: `show()`, `count()`, `collect()`, `write()` - ejecutan el plan
- **Spark SQL**: Permite usar SQL est√°ndar sobre DataFrames
- **Formatos de archivo**: CSV (legible), Parquet (eficiente), JSON (flexible)

### Conexi√≥n con AWS
- **Amazon EMR**: Ejecuta estas mismas operaciones en clusters grandes
- **AWS Glue**: Servicio ETL que usa PySpark internamente
- **Amazon Athena**: Usa SQL similar a Spark SQL sobre datos en S3
- **Amazon S3**: Almacena archivos Parquet/CSV que Spark puede leer

### Siguiente Paso
Contin√∫a con: `labs/01_data_fundamentals/` para aprender sobre las 5 Vs del Big Data

In [None]:
# Limpieza final (opcional)
import shutil
import os

# Eliminar archivos de ejemplo creados
for ruta in [ruta_csv, ruta_parquet, ruta_json]:
    if os.path.exists(ruta):
        shutil.rmtree(ruta)
        print(f"Eliminado: {ruta}")

print("\nLimpieza completada")