# Guía Práctica de Apache Spark para Big Data

Este notebook proporciona una introducción práctica a Apache Spark, desde conceptos básicos hasta operaciones intermedias, utilizando PySpark en Google Colab.

## Contenido
1. Configuración del entorno en Colab
2. Introducción a Apache Spark
3. Creación y Operaciones con RDDs
4. DataFrames y Datasets
5. Spark SQL
6. Ejercicio práctico: Análisis de datos reales

## 1. Configuración del entorno en Colab

Primero, necesitamos instalar y configurar PySpark en Google Colab. Ejecuta la siguiente celda:

In [1]:
# Reemplaza la sección de instalación con:
import findspark
findspark.init()
print("¡Spark está configurado y listo para usar!")

¡Spark está configurado y listo para usar!


### Crear una SparkSession

La SparkSession es el punto de entrada para interactuar con Spark. A partir de Spark 2.0, la SparkSession proporciona un punto de entrada unificado a todas las funcionalidades de Spark.

In [4]:
from pyspark.sql import SparkSession

# Crear una SparkSession
spark = SparkSession.builder \
    .appName("PySpark Tutorial") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Verificar versión de Spark
print(f"Versión de Apache Spark: {spark.version}")

# SparkContext está disponible como sc
sc = spark.sparkContext
print(f"URL de la interfaz web: {sc.uiWebUrl}")

Versión de Apache Spark: 3.5.0
URL de la interfaz web: http://eeefdd88d7c2:4040


## 2. Introducción a Apache Spark

Apache Spark es un framework de computación distribuida diseñado para el procesamiento de grandes volúmenes de datos. Lo que distingue a Spark de otros frameworks como Hadoop MapReduce es su capacidad para procesar datos en memoria, lo que lo hace significativamente más rápido.

### Características principales de Spark
- **Velocidad**: 100x más rápido que Hadoop MapReduce para procesamiento en memoria
- **Facilidad de uso**: APIs en Java, Scala, Python y R
- **Generalidad**: Combina SQL, streaming y análisis complejo
- **Compatibilidad**: Funciona con diversas fuentes de datos (HDFS, S3, HBase, etc.)

### Arquitectura de Spark
- **Driver Program**: Contiene la aplicación principal y crea el SparkContext
- **Cluster Manager**: Asigna recursos (YARN, Mesos, Kubernetes, Standalone)
- **Worker Nodes**: Ejecutan las tareas de computación
- **Executors**: Procesos JVM que ejecutan tareas en cada nodo

### Componentes del Ecosistema Spark
- **Spark Core**: Base del sistema, APIs para RDDs
- **Spark SQL**: Procesamiento de datos estructurados
- **Spark Streaming**: Procesamiento en tiempo real
- **MLlib**: Biblioteca de machine learning
- **GraphX**: Procesamiento de grafos

## 3. Creación y Operaciones con RDDs

Los RDDs (Resilient Distributed Datasets) son la abstracción fundamental de Spark. Son colecciones inmutables de objetos distribuidos a través de un clúster, que pueden ser procesados en paralelo.

### Creación de RDDs

Hay dos formas principales de crear RDDs:

In [5]:
# 1. Paralelizando una colección existente
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(data, numSlices=4)  # numSlices es el número de particiones

print(f"RDD1: {rdd1}")
print(f"Número de particiones: {rdd1.getNumPartitions()}")
print(f"Primeros 3 elementos: {rdd1.take(3)}")
print(f"Conteo: {rdd1.count()}")

RDD1: ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289
Número de particiones: 4
Primeros 3 elementos: [1, 2, 3]
Conteo: 10


In [6]:
# 2. Referenciando un dataset externo
# Primero creamos un archivo de ejemplo
!echo "Línea 1\nLínea 2\nLínea 3\nLínea 4\nLínea 5" > ejemplo.txt

# Luego creamos un RDD a partir del archivo
rdd2 = sc.textFile("ejemplo.txt")

print(f"RDD2: {rdd2}")
print(f"Contenido: {rdd2.collect()}")

RDD2: ejemplo.txt MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0
Contenido: ['Línea 1\\nLínea 2\\nLínea 3\\nLínea 4\\nLínea 5']


### Operaciones con RDDs

Las operaciones en RDDs se dividen en dos categorías:

- **Transformaciones**: Crean un nuevo RDD a partir de uno existente (map, filter, etc.)
- **Acciones**: Devuelven un valor al driver después de ejecutar un cálculo (reduce, count, etc.)

Las transformaciones son perezosas (lazy), lo que significa que no se ejecutan inmediatamente. En cambio, Spark recuerda las transformaciones aplicadas a un RDD y las ejecuta solo cuando se requiere una acción.

In [7]:
# Ejemplos de transformaciones

# map: aplica una función a cada elemento del RDD
squared = rdd1.map(lambda x: x * x)
print(f"Después de map (x²): {squared.collect()}")

# filter: selecciona elementos que cumplen una condición
evens = rdd1.filter(lambda x: x % 2 == 0)
print(f"Números pares: {evens.collect()}")

# flatMap: aplica función que devuelve múltiples elementos
rdd_text = sc.parallelize(["Hola mundo", "Apache Spark", "Big Data"])
words = rdd_text.flatMap(lambda line: line.split(" "))
print(f"Palabras individuales: {words.collect()}")

Después de map (x²): [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
Números pares: [2, 4, 6, 8, 10]
Palabras individuales: ['Hola', 'mundo', 'Apache', 'Spark', 'Big', 'Data']


In [8]:
# Ejemplos de acciones

# reduce: agrega los elementos usando una función
sum_all = rdd1.reduce(lambda a, b: a + b)
print(f"Suma de todos los elementos: {sum_all}")

# count: devuelve el número de elementos
count = rdd1.count()
print(f"Número de elementos: {count}")

# first: devuelve el primer elemento
first_element = rdd1.first()
print(f"Primer elemento: {first_element}")

# take: devuelve n elementos
first_n = rdd1.take(3)
print(f"Primeros 3 elementos: {first_n}")

# foreach: ejecuta una función en cada elemento (sin retorno)
rdd1.foreach(lambda x: print(f"Elemento: {x}"))

Suma de todos los elementos: 55
Número de elementos: 10
Primer elemento: 1
Primeros 3 elementos: [1, 2, 3]


### RDDs de pares (key-value)

Los RDDs de pares son RDDs con elementos en forma de tuplas (clave, valor). Estos RDDs ofrecen operaciones adicionales como reduceByKey, join, etc.

In [9]:
# Crear un RDD de pares
pairs_rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("c", 4), ("b", 5), ("c", 6)])
print(f"RDD de pares: {pairs_rdd.collect()}")

# reduceByKey: combina valores con la misma clave
sums = pairs_rdd.reduceByKey(lambda a, b: a + b)
print(f"Suma por clave: {sums.collect()}")

# groupByKey: agrupa valores con la misma clave
grouped = pairs_rdd.groupByKey().mapValues(list)
print(f"Agrupado por clave: {grouped.collect()}")

# Crear otro RDD de pares para demostraciones
other_rdd = sc.parallelize([("a", "x"), ("b", "y"), ("c", "z")])

# join: une RDDs por clave
joined = pairs_rdd.join(other_rdd)
print(f"Join de RDDs: {joined.collect()}")

# countByKey: cuenta ocurrencias de cada clave
counts = pairs_rdd.countByKey()
print(f"Conteo por clave: {counts}")

RDD de pares: [('a', 1), ('b', 2), ('a', 3), ('c', 4), ('b', 5), ('c', 6)]
Suma por clave: [('c', 10), ('a', 4), ('b', 7)]
Agrupado por clave: [('c', [4, 6]), ('a', [1, 3]), ('b', [2, 5])]
Join de RDDs: [('c', (4, 'z')), ('c', (6, 'z')), ('a', (1, 'x')), ('a', (3, 'x')), ('b', (2, 'y')), ('b', (5, 'y'))]
Conteo por clave: defaultdict(<class 'int'>, {'a': 2, 'b': 2, 'c': 2})


### Persistencia (Caching)

Cuando queremos reutilizar un RDD múltiples veces, podemos persistirlo en memoria o disco para mejorar el rendimiento.

In [10]:
# Crear un RDD y realizar una operación costosa
large_rdd = sc.parallelize(range(1, 1000000))
filtered = large_rdd.filter(lambda x: x % 10 == 0)

# Sin persistencia (calculará dos veces)
import time

start = time.time()
count1 = filtered.count()
end1 = time.time() - start

start = time.time()
count2 = filtered.count()
end2 = time.time() - start

print(f"Sin persistencia - Primera ejecución: {end1:.2f} segundos")
print(f"Sin persistencia - Segunda ejecución: {end2:.2f} segundos")

# Con persistencia
filtered.cache()  # equivalente a filtered.persist(StorageLevel.MEMORY_ONLY)

start = time.time()
count3 = filtered.count()
end3 = time.time() - start

start = time.time()
count4 = filtered.count()
end4 = time.time() - start

print(f"Con persistencia - Primera ejecución: {end3:.2f} segundos")
print(f"Con persistencia - Segunda ejecución: {end4:.2f} segundos")

# Liberar RDD de memoria
filtered.unpersist()

Sin persistencia - Primera ejecución: 0.26 segundos
Sin persistencia - Segunda ejecución: 0.22 segundos
Con persistencia - Primera ejecución: 0.26 segundos
Con persistencia - Segunda ejecución: 0.12 segundos


PythonRDD[40] at RDD at PythonRDD.scala:53

### Ejemplo clásico: WordCount

Implementaremos el clásico ejemplo de contar palabras en un texto.

In [11]:
# Crear un texto de ejemplo
text = """Spark es un framework de procesamiento distribuido.
Spark es rápido y fácil de usar.
Spark puede procesar datos en memoria.
Spark tiene APIs para Java, Scala, Python y R.
Spark incluye módulos para SQL, streaming, machine learning y procesamiento de grafos."""

# Crear archivo de texto
with open('ejemplo_wordcount.txt', 'w') as f:
    f.write(text)

# WordCount en Spark
lines = sc.textFile("ejemplo_wordcount.txt")

# Separar líneas en palabras y convertir a minúsculas
words = lines.flatMap(lambda line: line.lower().split())

# Eliminar signos de puntuación
import re
clean_words = words.map(lambda word: re.sub(r'[^\w]', '', word))

# Filtrar palabras vacías
filtered_words = clean_words.filter(lambda word: word != '')

# Crear pares (palabra, 1)
word_pairs = filtered_words.map(lambda word: (word, 1))

# Sumar ocurrencias de cada palabra
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

# Ordenar por frecuencia (de mayor a menor)
sorted_counts = word_counts.sortBy(lambda x: -x[1])

# Mostrar resultados
for word, count in sorted_counts.collect():
    print(f"{word}: {count}")

spark: 5
de: 3
y: 3
es: 2
procesamiento: 2
para: 2
un: 1
framework: 1
distribuido: 1
rápido: 1
fácil: 1
usar: 1
puede: 1
memoria: 1
java: 1
python: 1
incluye: 1
streaming: 1
machine: 1
learning: 1
procesar: 1
datos: 1
en: 1
tiene: 1
apis: 1
scala: 1
r: 1
módulos: 1
sql: 1
grafos: 1


## 4. DataFrames y Datasets

A partir de Spark 1.3, se introdujo el concepto de DataFrames, una abstracción de más alto nivel que los RDDs. Los DataFrames representan datos estructurados en formato de tabla, similares a las tablas en bases de datos relacionales.

### Creación de DataFrames

Hay varias formas de crear DataFrames en Spark:

In [12]:
# 1. Desde una lista de datos
data = [("Juan", 30), ("Ana", 25), ("Carlos", 35), ("María", 28)]
df1 = spark.createDataFrame(data, ["nombre", "edad"])

print("DataFrame desde lista:")
df1.show()
df1.printSchema()

DataFrame desde lista:
+------+----+
|nombre|edad|
+------+----+
|  Juan|  30|
|   Ana|  25|
|Carlos|  35|
| María|  28|
+------+----+

root
 |-- nombre: string (nullable = true)
 |-- edad: long (nullable = true)



In [13]:
# 2. Desde un RDD
rdd = sc.parallelize([("Marketing", 1000), ("Ventas", 2000), ("IT", 1500), ("RH", 800)])
df2 = rdd.toDF(["departamento", "presupuesto"])

print("DataFrame desde RDD:")
df2.show()

DataFrame desde RDD:
+------------+-----------+
|departamento|presupuesto|
+------------+-----------+
|   Marketing|       1000|
|      Ventas|       2000|
|          IT|       1500|
|          RH|        800|
+------------+-----------+



In [14]:
# 3. Desde datos con esquema explícito
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Definir esquema
schema = StructType([
    StructField("producto", StringType(), True),
    StructField("precio", DoubleType(), True),
    StructField("cantidad", IntegerType(), True)
])

# Datos
productos = [
    ("Laptop", 1200.50, 10),
    ("Smartphone", 800.99, 20),
    ("Tablet", 450.75, 15),
    ("Auriculares", 150.25, 30)
]

df3 = spark.createDataFrame(productos, schema)

print("DataFrame con esquema explícito:")
df3.show()
df3.printSchema()

DataFrame con esquema explícito:
+-----------+------+--------+
|   producto|precio|cantidad|
+-----------+------+--------+
|     Laptop|1200.5|      10|
| Smartphone|800.99|      20|
|     Tablet|450.75|      15|
|Auriculares|150.25|      30|
+-----------+------+--------+

root
 |-- producto: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- cantidad: integer (nullable = true)



In [15]:
# 4. Desde archivos
# Crear un CSV simple para el ejemplo
# Borrar el archivo mal creado
!rm ciudades.csv

# Crear el CSV correctamente
csv_content = """id,ciudad,pais,poblacion
1,Madrid,España,3200000
2,Barcelona,España,1600000
3,Lisboa,Portugal,500000
4,Paris,Francia,2200000"""

with open('ciudades.csv', 'w') as f:
    f.write(csv_content)
# Leer CSV
df4 = spark.read.csv("ciudades.csv", header=True, inferSchema=True)

print("DataFrame desde CSV:")
df4.show()
df4.printSchema()

rm: cannot remove 'ciudades.csv': No such file or directory
DataFrame desde CSV:
+---+---------+--------+---------+
| id|   ciudad|    pais|poblacion|
+---+---------+--------+---------+
|  1|   Madrid|  España|  3200000|
|  2|Barcelona|  España|  1600000|
|  3|   Lisboa|Portugal|   500000|
|  4|    Paris| Francia|  2200000|
+---+---------+--------+---------+

root
 |-- id: integer (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- poblacion: integer (nullable = true)



### Operaciones con DataFrames

Los DataFrames ofrecen una API rica para manipular datos estructurados.

In [16]:
# Operaciones básicas con DataFrames

# Seleccionar columnas
print("Selección de columnas:")
df3.select("producto", "precio").show()

# Filtrar filas
print("\nFiltrando productos con precio > 500:")
df3.filter(df3.precio > 500).show()

# Ordenar
print("\nProductos ordenados por precio (descendente):")
df3.orderBy(df3.precio.desc()).show()

# Añadir columna calculada
print("\nAñadiendo columna de valor total:")
df3.withColumn("valor_total", df3.precio * df3.cantidad).show()

Selección de columnas:
+-----------+------+
|   producto|precio|
+-----------+------+
|     Laptop|1200.5|
| Smartphone|800.99|
|     Tablet|450.75|
|Auriculares|150.25|
+-----------+------+


Filtrando productos con precio > 500:
+----------+------+--------+
|  producto|precio|cantidad|
+----------+------+--------+
|    Laptop|1200.5|      10|
|Smartphone|800.99|      20|
+----------+------+--------+


Productos ordenados por precio (descendente):
+-----------+------+--------+
|   producto|precio|cantidad|
+-----------+------+--------+
|     Laptop|1200.5|      10|
| Smartphone|800.99|      20|
|     Tablet|450.75|      15|
|Auriculares|150.25|      30|
+-----------+------+--------+


Añadiendo columna de valor total:
+-----------+------+--------+-----------+
|   producto|precio|cantidad|valor_total|
+-----------+------+--------+-----------+
|     Laptop|1200.5|      10|    12005.0|
| Smartphone|800.99|      20|    16019.8|
|     Tablet|450.75|      15|    6761.25|
|Auriculares|150.25

In [17]:
# Operaciones de agregación

# Estadísticas descriptivas
print("Estadísticas descriptivas:")
df3.describe().show()

# Agregaciones específicas
print("\nAgregaciones específicas:")
from pyspark.sql.functions import sum, avg, min, max, count

df3.select(
    sum("cantidad").alias("total_unidades"),
    avg("precio").alias("precio_promedio"),
    min("precio").alias("precio_minimo"),
    max("precio").alias("precio_maximo")
).show()

# Agrupar y agregar
print("\nAgrupación por país y cálculo de población promedio:")
df4.groupBy("pais").agg(
    count("ciudad").alias("numero_ciudades"),
    sum("poblacion").alias("poblacion_total"),
    avg("poblacion").alias("poblacion_promedio")
).show()

Estadísticas descriptivas:
+-------+-----------+------------------+-----------------+
|summary|   producto|            precio|         cantidad|
+-------+-----------+------------------+-----------------+
|  count|          4|                 4|                4|
|   mean|       NULL|          650.6225|            18.75|
| stddev|       NULL|452.87868319414935|8.539125638299666|
|    min|Auriculares|            150.25|               10|
|    max|     Tablet|            1200.5|               30|
+-------+-----------+------------------+-----------------+


Agregaciones específicas:
+--------------+---------------+-------------+-------------+
|total_unidades|precio_promedio|precio_minimo|precio_maximo|
+--------------+---------------+-------------+-------------+
|            75|       650.6225|       150.25|       1200.5|
+--------------+---------------+-------------+-------------+


Agrupación por país y cálculo de población promedio:
+--------+---------------+---------------+------------

In [18]:
# Operaciones de join

# Crear DataFrames para demostrar joins
clientes = spark.createDataFrame([
    (1, "Juan Pérez", "juan@example.com"),
    (2, "María García", "maria@example.com"),
    (3, "Carlos López", "carlos@example.com"),
    (4, "Ana Martínez", "ana@example.com")
], ["id", "nombre", "email"])

pedidos = spark.createDataFrame([
    (101, 1, "2023-01-15", 120.50),
    (102, 3, "2023-01-18", 85.75),
    (103, 2, "2023-01-20", 220.00),
    (104, 1, "2023-01-25", 65.30),
    (105, 5, "2023-01-27", 110.25)  # Cliente 5 no existe
], ["id_pedido", "id_cliente", "fecha", "importe"])

print("DataFrame de Clientes:")
clientes.show()

print("\nDataFrame de Pedidos:")
pedidos.show()

# Inner join
print("\nInner Join:")
clientes.join(pedidos, clientes.id == pedidos.id_cliente).show()

# Left join
print("\nLeft Join:")
clientes.join(pedidos, clientes.id == pedidos.id_cliente, "left").show()

# Right join
print("\nRight Join:")
clientes.join(pedidos, clientes.id == pedidos.id_cliente, "right").show()

DataFrame de Clientes:
+---+------------+------------------+
| id|      nombre|             email|
+---+------------+------------------+
|  1|  Juan Pérez|  juan@example.com|
|  2|María García| maria@example.com|
|  3|Carlos López|carlos@example.com|
|  4|Ana Martínez|   ana@example.com|
+---+------------+------------------+


DataFrame de Pedidos:
+---------+----------+----------+-------+
|id_pedido|id_cliente|     fecha|importe|
+---------+----------+----------+-------+
|      101|         1|2023-01-15|  120.5|
|      102|         3|2023-01-18|  85.75|
|      103|         2|2023-01-20|  220.0|
|      104|         1|2023-01-25|   65.3|
|      105|         5|2023-01-27| 110.25|
+---------+----------+----------+-------+


Inner Join:
+---+------------+------------------+---------+----------+----------+-------+
| id|      nombre|             email|id_pedido|id_cliente|     fecha|importe|
+---+------------+------------------+---------+----------+----------+-------+
|  1|  Juan Pérez|  jua

### Funciones definidas por el usuario (UDFs)

Las UDFs permiten aplicar funciones personalizadas a los datos en DataFrames.

In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

# UDF para convertir nombres a mayúsculas
upper_udf = udf(lambda x: x.upper(), StringType())

print("Nombres en mayúsculas:")
clientes.withColumn("nombre_mayus", upper_udf(clientes.nombre)).show()

# UDF para clasificar productos según su precio
def clasificar_precio(precio):
    if precio < 200:
        return "Económico"
    elif precio < 800:
        return "Medio"
    else:
        return "Premium"

clasificar_udf = udf(clasificar_precio, StringType())

print("\nProductos clasificados por precio:")
df3.withColumn("categoria", clasificar_udf(df3.precio)).show()

Nombres en mayúsculas:
+---+------------+------------------+------------+
| id|      nombre|             email|nombre_mayus|
+---+------------+------------------+------------+
|  1|  Juan Pérez|  juan@example.com|  JUAN PÉREZ|
|  2|María García| maria@example.com|MARÍA GARCÍA|
|  3|Carlos López|carlos@example.com|CARLOS LÓPEZ|
|  4|Ana Martínez|   ana@example.com|ANA MARTÍNEZ|
+---+------------+------------------+------------+


Productos clasificados por precio:
+-----------+------+--------+---------+
|   producto|precio|cantidad|categoria|
+-----------+------+--------+---------+
|     Laptop|1200.5|      10|  Premium|
| Smartphone|800.99|      20|  Premium|
|     Tablet|450.75|      15|    Medio|
|Auriculares|150.25|      30|Económico|
+-----------+------+--------+---------+



## 5. Spark SQL

Spark SQL permite ejecutar consultas SQL sobre datos estructurados. Es una capa encima de la API de DataFrames que proporciona una interfaz para programación con SQL.

In [20]:
# Registrar DataFrames como vistas temporales
df3.createOrReplaceTempView("productos")
df4.createOrReplaceTempView("ciudades")
clientes.createOrReplaceTempView("clientes")
pedidos.createOrReplaceTempView("pedidos")

# Ejecutar consultas SQL
print("Productos con precio > 500:")
spark.sql("""
SELECT 
    producto, 
    precio, 
    cantidad, 
    precio * cantidad AS valor_total
FROM productos
WHERE precio > 500
ORDER BY precio DESC
""").show()

print("\nGrupos de ciudades por país:")
spark.sql("""
SELECT 
    pais, 
    COUNT(*) AS num_ciudades, 
    SUM(poblacion) AS poblacion_total, 
    AVG(poblacion) AS poblacion_promedio
FROM ciudades
GROUP BY pais
HAVING COUNT(*) > 0
""").show()

print("\nClientes con sus pedidos:")
spark.sql("""
SELECT 
    c.id, 
    c.nombre, 
    p.id_pedido, 
    p.fecha, 
    p.importe
FROM clientes c
LEFT JOIN pedidos p ON c.id = p.id_cliente
ORDER BY c.id, p.fecha
""").show()

Productos con precio > 500:
+----------+------+--------+-----------+
|  producto|precio|cantidad|valor_total|
+----------+------+--------+-----------+
|    Laptop|1200.5|      10|    12005.0|
|Smartphone|800.99|      20|    16019.8|
+----------+------+--------+-----------+


Grupos de ciudades por país:
+--------+------------+---------------+------------------+
|    pais|num_ciudades|poblacion_total|poblacion_promedio|
+--------+------------+---------------+------------------+
| Francia|           1|        2200000|         2200000.0|
|  España|           2|        4800000|         2400000.0|
|Portugal|           1|         500000|          500000.0|
+--------+------------+---------------+------------------+


Clientes con sus pedidos:
+---+------------+---------+----------+-------+
| id|      nombre|id_pedido|     fecha|importe|
+---+------------+---------+----------+-------+
|  1|  Juan Pérez|      101|2023-01-15|  120.5|
|  1|  Juan Pérez|      104|2023-01-25|   65.3|
|  2|María Gar

In [21]:
# Ejemplos de funciones de ventana con SQL
print("Análisis de pedidos por cliente con Window Functions:")
spark.sql("""
SELECT 
    c.id, 
    c.nombre, 
    p.id_pedido, 
    p.fecha, 
    p.importe,
    SUM(p.importe) OVER (PARTITION BY c.id) AS total_cliente,
    RANK() OVER (PARTITION BY c.id ORDER BY p.importe DESC) AS ranking_importe
FROM clientes c
JOIN pedidos p ON c.id = p.id_cliente
ORDER BY c.id, ranking_importe
""").show()

Análisis de pedidos por cliente con Window Functions:
+---+------------+---------+----------+-------+-------------+---------------+
| id|      nombre|id_pedido|     fecha|importe|total_cliente|ranking_importe|
+---+------------+---------+----------+-------+-------------+---------------+
|  1|  Juan Pérez|      101|2023-01-15|  120.5|        185.8|              1|
|  1|  Juan Pérez|      104|2023-01-25|   65.3|        185.8|              2|
|  2|María García|      103|2023-01-20|  220.0|        220.0|              1|
|  3|Carlos López|      102|2023-01-18|  85.75|        85.75|              1|
+---+------------+---------+----------+-------+-------------+---------------+



### Optimización de consultas

Spark SQL utiliza el optimizador Catalyst para transformar consultas y mejorar su rendimiento. Podemos ver el plan de ejecución con el método `explain()`.

In [22]:
from pyspark.sql.functions import col, lit, countDistinct, sum, avg, monotonically_increasing_id

# Opción 2: Asociar productos a ciudades (con relación artificial)
# Agregar ID temporal a productos
productos_con_id = df3.withColumn("temp_id", monotonically_increasing_id())

# Crear relación producto-ciudad (ejemplo: rotación modular)
relacion_producto_ciudad = productos_con_id.withColumn(
    "ciudad_id",
    (col("temp_id") % 4) + 1  # Asigna a ciudades con ID 1-4
)

# Hacer join con ciudades
analisis_por_pais = relacion_producto_ciudad.join(
    df4, 
    relacion_producto_ciudad.ciudad_id == df4.id
).groupBy("pais").agg(
    countDistinct("producto").alias("num_productos"),
    sum(col("precio") * col("cantidad")).alias("valor_total"),
    avg("precio").alias("precio_promedio")
).orderBy("valor_total", ascending=False)

analisis_por_pais.show()

+------+-------------+-----------+---------------+
|  pais|num_productos|valor_total|precio_promedio|
+------+-------------+-----------+---------------+
|España|            4|   39293.55|       650.6225|
+------+-------------+-----------+---------------+



## 6. Ejercicio práctico: Análisis de datos reales

Vamos a trabajar con un conjunto de datos real: el dataset de E-Commerce. Primero, descarguemos el dataset desde Kaggle.

In [23]:
# Instalamos la API de Kaggle y configuramos credenciales
!pip install kaggle

# Nota: Para usar la API de Kaggle, necesitas tus credenciales
# Puedes descargar kaggle.json desde tu cuenta de Kaggle y subir manualmente
# O usar un dataset de ejemplo que creamos aquí

# Creamos un dataset de ejemplo (simulando datos de E-Commerce)
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Generar datos aleatorios
np.random.seed(42)
n_rows = 1000

# Productos
productos = ['Laptop', 'Smartphone', 'Tablet', 'Auriculares', 'Monitor', 'Teclado', 'Mouse', 'Impresora', 'Cámara', 'Altavoces']
categorias = ['Electrónica', 'Informática', 'Accesorios', 'Audio', 'Fotografía']
product_category = {
    'Laptop': 'Informática',
    'Smartphone': 'Electrónica',
    'Tablet': 'Electrónica',
    'Auriculares': 'Audio',
    'Monitor': 'Informática',
    'Teclado': 'Accesorios',
    'Mouse': 'Accesorios',
    'Impresora': 'Informática',
    'Cámara': 'Fotografía',
    'Altavoces': 'Audio'
}

# Precios base
precios_base = {
    'Laptop': 1200,
    'Smartphone': 800,
    'Tablet': 500,
    'Auriculares': 150,
    'Monitor': 300,
    'Teclado': 80,
    'Mouse': 50,
    'Impresora': 250,
    'Cámara': 600,
    'Altavoces': 200
}

# Países
paises = ['España', 'México', 'Argentina', 'Colombia', 'Chile', 'Perú', 'Estados Unidos', 'Reino Unido', 'Francia', 'Alemania']

# Generar datos
invoice_no = np.arange(1, n_rows+1)
stock_code = np.random.randint(10000, 99999, size=n_rows)
description = np.random.choice(productos, size=n_rows)
quantity = np.random.randint(1, 10, size=n_rows)

# Fechas entre 2022-01-01 y 2022-12-31 (parte corregida)
start_date = datetime(2022, 1, 1)
end_date = datetime(2022, 12, 31)
days_between = (end_date - start_date).days
random_days = np.random.randint(0, days_between, size=n_rows)
invoice_date = [start_date + timedelta(days=int(day)) for day in random_days]  # Conversión a int

# Precios con variación aleatoria
unit_price = [precios_base[prod] * (0.9 + np.random.random() * 0.2) for prod in description]

# Clientes (500 clientes únicos)
customer_id = np.random.randint(10000, 19999, size=n_rows)
country = np.random.choice(paises, size=n_rows)

# Categoría
category = [product_category[prod] for prod in description]

# Crear DataFrame
ecommerce_df = pd.DataFrame({
    'InvoiceNo': invoice_no,
    'StockCode': stock_code,
    'Description': description,
    'Quantity': quantity,
    'InvoiceDate': invoice_date,
    'UnitPrice': unit_price,
    'CustomerID': customer_id,
    'Country': country,
    'Category': category
})

# Guardar como CSV
ecommerce_df.to_csv('../data/ecommerce_data.csv', index=False)
print("Dataset de ejemplo creado con éxito!")
print(ecommerce_df.head())

Collecting kaggle
  Downloading kaggle-1.7.4.2-py3-none-any.whl.metadata (16 kB)
Collecting python-slugify (from kaggle)
  Downloading python_slugify-8.0.4-py2.py3-none-any.whl.metadata (8.5 kB)
Collecting text-unidecode (from kaggle)
  Downloading text_unidecode-1.3-py2.py3-none-any.whl.metadata (2.4 kB)
Downloading kaggle-1.7.4.2-py3-none-any.whl (173 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.2/173.2 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0mm
[?25hDownloading python_slugify-8.0.4-py2.py3-none-any.whl (10 kB)
Downloading text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.2/78.2 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: text-unidecode, python-slugify, kaggle
Successfully installed kaggle-1.7.4.2 python-slugify-8.0.4 text-unidecode-1.3
Dataset de ejemplo creado con éxito!
   InvoiceNo  StockCode Description  Quantity InvoiceDate   Unit

In [24]:
# Cargar dataset en Spark
from pyspark.sql.functions import col, year, month, dayofmonth, to_date, expr

# Definir esquema para mejor rendimiento
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", TimestampType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Category", StringType(), True)
])

# Cargar datos CSV
sales_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/ecommerce_data.csv")

# Mostrar esquema y primeras filas
sales_df.printSchema()
sales_df.show(5)

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Category: string (nullable = true)

+---------+---------+-----------+--------+-----------+-----------------+----------+-----------+-----------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|        UnitPrice|CustomerID|    Country|   Category|
+---------+---------+-----------+--------+-----------+-----------------+----------+-----------+-----------+
|        1|    25795|     Tablet|       4| 2022-06-05|487.3554801047682|     12350|     México|Electrónica|
|        2|    10860|    Monitor|       3| 2022-06-21|273.0293366647445|     11269|    Francia|Informática|
|        3|    86820|    Monitor|       8| 2022-12-01|307.6491943070568|     11260

In [26]:
from pyspark.sql.functions import col, year, month, dayofmonth

# Preprocesamiento de datos

# Añadir columna de importe total
sales_df = sales_df.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))

# Filtrar registros válidos (VERSIÓN CORREGIDA)
valid_sales = sales_df.filter(
    (col("Quantity") > 0) &  # Cada condición entre paréntesis
    (col("UnitPrice") > 0) & 
    (col("CustomerID").isNotNull())
)

# Extraer componentes de fecha
sales_with_date = valid_sales \
    .withColumn("InvoiceYear", year(col("InvoiceDate"))) \
    .withColumn("InvoiceMonth", month(col("InvoiceDate"))) \
    .withColumn("InvoiceDay", dayofmonth(col("InvoiceDate")))

# Guardar como vista temporal para SQL
sales_with_date.createOrReplaceTempView("sales")

# Mostrar el resultado del preprocesamiento
sales_with_date.show(5)

+---------+---------+-----------+--------+-----------+-----------------+----------+-----------+-----------+------------------+-----------+------------+----------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|        UnitPrice|CustomerID|    Country|   Category|       TotalAmount|InvoiceYear|InvoiceMonth|InvoiceDay|
+---------+---------+-----------+--------+-----------+-----------------+----------+-----------+-----------+------------------+-----------+------------+----------+
|        1|    25795|     Tablet|       4| 2022-06-05|487.3554801047682|     12350|     México|Electrónica|1949.4219204190729|       2022|           6|         5|
|        2|    10860|    Monitor|       3| 2022-06-21|273.0293366647445|     11269|    Francia|Informática| 819.0880099942335|       2022|           6|        21|
|        3|    86820|    Monitor|       8| 2022-12-01|307.6491943070568|     11260|Reino Unido|Informática|2461.1935544564544|       2022|          12|         1|
|        4|    64886| 

In [27]:
# Análisis 1: Ventas por país
country_sales = valid_sales \
    .groupBy("Country") \
    .agg(
        sum("TotalAmount").alias("TotalSales"),
        count("InvoiceNo").alias("NumberOfTransactions"),
        countDistinct("CustomerID").alias("UniqueCustomers"),
        avg("TotalAmount").alias("AverageTransactionValue")
    ) \
    .orderBy(col("TotalSales").desc())

print("Ventas por país:")
country_sales.show()

Ventas por país:
+--------------+------------------+--------------------+---------------+-----------------------+
|       Country|        TotalSales|NumberOfTransactions|UniqueCustomers|AverageTransactionValue|
+--------------+------------------+--------------------+---------------+-----------------------+
|      Alemania| 269689.3437270837|                 123|            122|      2192.596290464095|
|          Perú|255152.10721917168|                 101|             99|     2526.2584873185315|
|Estados Unidos|249191.25035328942|                 103|            102|      2419.332527701839|
|        México|247759.50095566988|                 105|            104|     2359.6142948159036|
|         Chile| 207585.8689953313|                 109|            108|     1904.4575137186357|
|      Colombia| 206236.5155897261|                 101|            100|      2041.945698908179|
|        España| 184932.9844480791|                  93|             91|     1988.5267144954744|
|   Reino Uni

In [28]:
# Análisis 2: Ventas por mes
monthly_sales = sales_with_date \
    .groupBy("InvoiceYear", "InvoiceMonth") \
    .agg(sum("TotalAmount").alias("MonthlySales")) \
    .orderBy("InvoiceYear", "InvoiceMonth")

print("Ventas mensuales:")
monthly_sales.show()

Ventas mensuales:
+-----------+------------+------------------+
|InvoiceYear|InvoiceMonth|      MonthlySales|
+-----------+------------+------------------+
|       2022|           1| 218126.2293537872|
|       2022|           2| 157076.3029914686|
|       2022|           3| 144535.6833339949|
|       2022|           4| 231195.9429719927|
|       2022|           5|166185.54915347442|
|       2022|           6| 213549.5577464724|
|       2022|           7| 188387.0938485728|
|       2022|           8|141107.81360383847|
|       2022|           9|   138842.63058643|
|       2022|          10|157190.78718466626|
|       2022|          11| 189568.6890119039|
|       2022|          12|180221.20192642946|
+-----------+------------+------------------+



In [29]:
# Análisis 3: Productos más vendidos
top_products = valid_sales \
    .groupBy("Description", "Category") \
    .agg(
        sum("Quantity").alias("TotalQuantity"),
        sum("TotalAmount").alias("TotalRevenue"),
        count("InvoiceNo").alias("TimesOrdered")
    ) \
    .orderBy(col("TotalQuantity").desc())

print("Top productos más vendidos:")
top_products.show(10)

Top productos más vendidos:
+-----------+-----------+-------------+------------------+------------+
|Description|   Category|TotalQuantity|      TotalRevenue|TimesOrdered|
+-----------+-----------+-------------+------------------+------------+
|  Impresora|Informática|          587|147819.45644712358|         104|
| Smartphone|Electrónica|          556|446559.81206883455|         110|
|    Monitor|Informática|          538|159613.39130268156|         103|
|     Cámara| Fotografía|          514|305325.25830868346|         100|
|     Laptop|Informática|          511| 617639.7801776406|         106|
|    Teclado| Accesorios|          507| 40690.36405932423|          98|
|      Mouse| Accesorios|          501|  25065.4345990297|          97|
|     Tablet|Electrónica|          458|230217.90379250183|          97|
|  Altavoces|      Audio|          456|  91458.7916752584|          96|
|Auriculares|      Audio|          412| 61597.28928195333|          89|
+-----------+-----------+-----------

In [30]:
# Análisis 4: Ventas por categoría de producto
category_sales = valid_sales \
    .groupBy("Category") \
    .agg(
        sum("TotalAmount").alias("TotalSales"),
        sum("Quantity").alias("TotalQuantity"),
        countDistinct("Description").alias("UniqueProducts")
    ) \
    .orderBy(col("TotalSales").desc())

print("Ventas por categoría:")
category_sales.show()

Ventas por categoría:
+-----------+------------------+-------------+--------------+
|   Category|        TotalSales|TotalQuantity|UniqueProducts|
+-----------+------------------+-------------+--------------+
|Informática| 925072.6279274457|         1636|             3|
|Electrónica| 676777.7158613363|         1014|             2|
| Fotografía|305325.25830868346|          514|             1|
|      Audio|153056.08095721173|          868|             2|
| Accesorios| 65755.79865835393|         1008|             2|
+-----------+------------------+-------------+--------------+



In [31]:
# Análisis 5: RFM Analysis (Recency, Frequency, Monetary)
from pyspark.sql.window import Window
from pyspark.sql.functions import datediff, max as sql_max, lit, when, col, countDistinct, sum, count, avg
from pyspark.sql.functions import desc  # <-- Esta es la importación que faltaba

# Obtener fecha máxima en el dataset
max_date = valid_sales.agg(sql_max("InvoiceDate")).collect()[0][0]

# Calcular métricas RFM
rfm = valid_sales \
    .groupBy("CustomerID") \
    .agg(
        datediff(lit(max_date), sql_max("InvoiceDate")).alias("Recency"),
        countDistinct("InvoiceNo").alias("Frequency"),
        sum("TotalAmount").alias("MonetaryValue")
    )

# Categorizar clientes por segmentos
rfm = rfm \
    .withColumn("RecencyScore", 
                when(col("Recency") <= 30, 5)
                .when(col("Recency") <= 60, 4)
                .when(col("Recency") <= 90, 3)
                .when(col("Recency") <= 120, 2)
                .otherwise(1)) \
    .withColumn("FrequencyScore",
                when(col("Frequency") >= 20, 5)
                .when(col("Frequency") >= 10, 4)
                .when(col("Frequency") >= 5, 3)
                .when(col("Frequency") >= 2, 2)
                .otherwise(1)) \
    .withColumn("MonetaryScore",
                when(col("MonetaryValue") >= 5000, 5)
                .when(col("MonetaryValue") >= 2500, 4)
                .when(col("MonetaryValue") >= 1000, 3)
                .when(col("MonetaryValue") >= 500, 2)
                .otherwise(1))

# Calcular RFM Score final
rfm = rfm.withColumn("RFMScore", 
                     col("RecencyScore") + col("FrequencyScore") + col("MonetaryScore"))

# Mostrar resultados de clientes por segmento
rfm_segments = rfm \
    .withColumn("Segment", 
                when(col("RFMScore") >= 13, "Champions")
                .when(col("RFMScore") >= 10, "Loyal Customers")
                .when(col("RFMScore") >= 7, "Potential Loyalists")
                .when(col("RFMScore") >= 4, "At Risk")
                .otherwise("Hibernating"))

segment_summary = rfm_segments \
    .groupBy("Segment") \
    .agg(
        count("CustomerID").alias("CustomerCount"),
        avg("MonetaryValue").alias("AverageSpend")
    ) \
    .orderBy(desc("AverageSpend"))  # <-- Ahora funcionará correctamente

print("Segmentación de clientes (RFM):")
segment_summary.show()

# Mostrar algunos ejemplos de clientes por segmento
print("\nEjemplos de clientes por segmento:")
rfm_segments.orderBy(desc("RFMScore")).show(5)

Segmentación de clientes (RFM):
+-------------------+-------------+------------------+
|            Segment|CustomerCount|      AverageSpend|
+-------------------+-------------+------------------+
|    Loyal Customers|           42|  5837.11226771252|
|Potential Loyalists|          263|3692.9203136183496|
|            At Risk|          507|1718.0587056503234|
|        Hibernating|          135| 285.4441497982637|
+-------------------+-------------+------------------+


Ejemplos de clientes por segmento:
+----------+-------+---------+------------------+------------+--------------+-------------+--------+---------------+
|CustomerID|Recency|Frequency|     MonetaryValue|RecencyScore|FrequencyScore|MonetaryScore|RFMScore|        Segment|
+----------+-------+---------+------------------+------------+--------------+-------------+--------+---------------+
|     11301|      6|        2| 9125.115667016027|           5|             2|            5|      12|Loyal Customers|
|     10178|     36|   

In [32]:
# Guardar resultados procesados
country_sales.write.mode("overwrite").csv("country_sales.csv")
top_products.write.mode("overwrite").csv("top_products.csv")
rfm_segments.write.mode("overwrite").csv("customer_segments.csv")

print("Resultados guardados!")

Resultados guardados!


## Conclusión

En este notebook, hemos aprendido los fundamentos de Apache Spark, trabajando con RDDs, DataFrames y Spark SQL. También hemos aplicado estos conocimientos a un análisis de datos real de E-Commerce.

Conceptos clave que hemos cubierto:
1. RDDs: operaciones básicas, transformaciones, acciones y persistencia
2. DataFrames: creación, operaciones y agregaciones
3. Spark SQL: consultas SQL en datos estructurados
4. Análisis de datos reales con técnicas como RFM

Próximos pasos para seguir aprendiendo:
- Explorar Spark Streaming para datos en tiempo real
- Aprender MLlib para machine learning distribuido
- Profundizar en GraphX para procesamiento de grafos
- Trabajar con clusters de Spark reales

Para más información, consulta la [documentación oficial de Apache Spark](https://spark.apache.org/docs/latest/).

In [33]:
# Detener la sesión de Spark
spark.stop()
print("Sesión de Spark detenida.")

Sesión de Spark detenida.
