Autor: Natalia Betancur Herrera

Fecha: 22/09/2025

# Laboratorio 3 Conexión a Apache Spark y trabajo con RDDs

En esta sesión entenderemos cómo se estructura y opera un entorno de Big Data moderno, y cómo podemos simularlo con herramientas accesibles como Google Colab y PySpark. Este laboratorio te ayudará a visualizar cómo se conectan los componentes clave de una arquitectura distribuida y cómo puedes empezar a trabajar con datos masivos desde la nube, de forma escalable y eficiente.

## ¿Qué es un RDD?

Un **RDD (Resilient Distributed Dataset)** es la estructura de datos fundamental en Apache Spark.  Representa una colección **inmutable, distribuida y tolerante a fallos** de elementos que pueden procesarse en paralelo.

En este laboratorio aprenderás a:
* Crear un RDD desde un DataFrame.
* Aplicar transformaciones como `filter`, `map`, `sortBy`.
* Usar acciones como `take`, `count`, `collect`.

In [3]:
# Instalación de PySpark
!pip install pyspark

# Importar librerías principales
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Crear la sesión de Spark
spark = SparkSession.builder \
    .appName("Laboratorio_3_RDDs") \
    .getOrCreate()

print(" Sesión de Spark iniciada correctamente")

 Sesión de Spark iniciada correctamente


In [7]:
# Crear un DataFrame simulado (similar al del primer laboratorio)
data = [
    (1, "Bogotá", "Femenino", "Primaria", 8.5),
    (2, "Pereira", "Masculino", "Secundaria", 7.0),
    (3, "Manizales", "Femenino", "Primaria", 9.1),
    (4, "Bogotá", "Femenino", "Primaria", 6.8),
    (5, "Armenia", "Masculino", "Secundaria", 8.0),
    (6, "Manizales", "Femenino", "Secundaria", 7.3)
]

schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Ciudad", StringType(), True),
    StructField("Genero", StringType(), True),
    StructField("Grado", StringType(), True),
    StructField("Puntaje", FloatType(), True)
])

df = spark.createDataFrame(data, schema)
print(" Datos cargados en PySpark")
df.show()

 Datos cargados en PySpark
+---+---------+---------+----------+-------+
| ID|   Ciudad|   Genero|     Grado|Puntaje|
+---+---------+---------+----------+-------+
|  1|   Bogotá| Femenino|  Primaria|    8.5|
|  2|  Pereira|Masculino|Secundaria|    7.0|
|  3|Manizales| Femenino|  Primaria|    9.1|
|  4|   Bogotá| Femenino|  Primaria|    6.8|
|  5|  Armenia|Masculino|Secundaria|    8.0|
|  6|Manizales| Femenino|Secundaria|    7.3|
+---+---------+---------+----------+-------+



In [8]:
# Convertir el DataFrame a un RDD
rdd = df.rdd
print(" DataFrame convertido a RDD")

# Ver los primeros elementos del RDD
print(rdd.take(3))

 DataFrame convertido a RDD
[Row(ID=1, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=8.5), Row(ID=2, Ciudad='Pereira', Genero='Masculino', Grado='Secundaria', Puntaje=7.0), Row(ID=3, Ciudad='Manizales', Genero='Femenino', Grado='Primaria', Puntaje=9.100000381469727)]


In [9]:
# Almacenar en memoria cache nuestro RDD para utilizarlo varias veces
rdd.cache()

# Ver cuántas particiones tiene nuestro RDD
print("Número de particiones inicial:", rdd.getNumPartitions())

# Reparticionar nuestro RDD, por ejemplo, en 4 particiones
rdd = rdd.repartition(4)
print("Número de particiones después de reparticionar:", rdd.getNumPartitions())

# Mostrar algunas filas del RDD
rdd.take(5)

Número de particiones inicial: 2
Número de particiones después de reparticionar: 4


[Row(ID=1, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=8.5),
 Row(ID=2, Ciudad='Pereira', Genero='Masculino', Grado='Secundaria', Puntaje=7.0),
 Row(ID=3, Ciudad='Manizales', Genero='Femenino', Grado='Primaria', Puntaje=9.100000381469727),
 Row(ID=4, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=6.800000190734863),
 Row(ID=5, Ciudad='Armenia', Genero='Masculino', Grado='Secundaria', Puntaje=8.0)]

In [10]:
# TRANSFORMACIÓN - Filter: filtrar estudiantes con puntaje mayor a 8.0
rdd_filtrado = rdd.filter(lambda row: row["Puntaje"] > 8.0)
rdd_filtrado.take(5)

[Row(ID=1, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=8.5),
 Row(ID=3, Ciudad='Manizales', Genero='Femenino', Grado='Primaria', Puntaje=9.100000381469727)]

In [12]:
# TRANSFORMACIÓN - Map: convertir el nombre de ciudad a mayúsculas
rdd_mayusculas = rdd.map(lambda row: row["Ciudad"].upper())
rdd_mayusculas.take(5)

['BOGOTÁ', 'PEREIRA', 'MANIZALES', 'BOGOTÁ', 'ARMENIA']

In [13]:
# TRANSFORMACIÓN - Map y Reduce: promedio de puntaje por Ciudad
rdd_map = rdd.map(lambda x: (x["Ciudad"], x["Puntaje"]))
rdd_promedios = rdd_map \
    .mapValues(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
    .mapValues(lambda x: x[0] / x[1])

print("🏅 Promedio de puntaje por Ciudad:")
print(rdd_promedios.collect())

🏅 Promedio de puntaje por Ciudad:
[('Bogotá', 7.650000095367432), ('Manizales', 8.200000286102295), ('Pereira', 7.0), ('Armenia', 8.0)]


In [15]:
# Orden descendente (de mayor a menor puntaje)
rdd_ordenado_desc = rdd.sortBy(lambda row: row['Puntaje'], ascending=False)
print(" Orden descendente por Puntaje:")
rdd_ordenado_desc.take(5)

 Orden descendente por Puntaje:


[Row(ID=3, Ciudad='Manizales', Genero='Femenino', Grado='Primaria', Puntaje=9.100000381469727),
 Row(ID=1, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=8.5),
 Row(ID=5, Ciudad='Armenia', Genero='Masculino', Grado='Secundaria', Puntaje=8.0),
 Row(ID=6, Ciudad='Manizales', Genero='Femenino', Grado='Secundaria', Puntaje=7.300000190734863),
 Row(ID=2, Ciudad='Pereira', Genero='Masculino', Grado='Secundaria', Puntaje=7.0)]

In [17]:
# Acción 1: TAKE Devuelve los primeros N elementos del RDD
print("Ejemplo de registros del RDD original:")
print(rdd.take(5))


# Acción 2: COUNT Retorna el número total de elementos en el RDD
total_registros = rdd.count()
print(f"Total de registros en el RDD: {total_registros}")


# Acción 3: COLLECT Recupera todo el contenido del RDD y lo devuelve al driver.
print("Colectando todos los registros (solo para datasets pequeños):")
todos_los_registros = rdd.collect()

# Mostrar los primeros 5 elementos colectados
for i in todos_los_registros[:5]:
    print(i)

Ejemplo de registros del RDD original:
[Row(ID=1, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=8.5), Row(ID=2, Ciudad='Pereira', Genero='Masculino', Grado='Secundaria', Puntaje=7.0), Row(ID=3, Ciudad='Manizales', Genero='Femenino', Grado='Primaria', Puntaje=9.100000381469727), Row(ID=4, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=6.800000190734863), Row(ID=5, Ciudad='Armenia', Genero='Masculino', Grado='Secundaria', Puntaje=8.0)]
Total de registros en el RDD: 6
Colectando todos los registros (solo para datasets pequeños):
Row(ID=1, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=8.5)
Row(ID=2, Ciudad='Pereira', Genero='Masculino', Grado='Secundaria', Puntaje=7.0)
Row(ID=3, Ciudad='Manizales', Genero='Femenino', Grado='Primaria', Puntaje=9.100000381469727)
Row(ID=4, Ciudad='Bogotá', Genero='Femenino', Grado='Primaria', Puntaje=6.800000190734863)
Row(ID=5, Ciudad='Armenia', Genero='Masculino', Grado='Secundaria', Puntaje=8.0)
