<a href="https://colab.research.google.com/github/PXamir/RDD_Tranformaciones-Acciones/blob/main/Ejemplos_Acciones_Transformaciones_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# Instala PySpark si no lo tienes ya instalado
!pip install pyspark

from pyspark.sql import SparkSession

# Crear una sesión de Spark
spark = SparkSession.builder.appName("Tranformacion/Accion").getOrCreate()

# Obtener el SparkContext desde SparkSession
sc = spark.sparkContext



In [None]:
######     MAP     ######

# Crear datos
datos_ventas = [
#   (  producto,  cantidad, precio_por_unidad)
    ("Producto A",   3,          10.0),
    ("Producto B",   2,          15.5),
    ("Producto C",   5,          7.2)
]

# Crear el RDD a partir de la lista
rdd_ventas = sc.parallelize(datos_ventas)

# Usar map para calcular el precio total y agregarlo a cada tupla
rdd_ventas_totales = rdd_ventas.map(lambda x: (x[0], x[1], x[2], x[1] * x[2]))

# Mostrar los resultados
print(rdd_ventas_totales.collect())

[('Producto A', 3, 10.0, 30.0), ('Producto B', 2, 15.5, 31.0), ('Producto C', 5, 7.2, 36.0)]


In [None]:
######     FILTER     ######

# Crear datos
datos_empleados = [
#   (nombre,   edad)
    ("Ana",     25),
    ("José",    32),
    ("Carlos",  28),
    ("Daniela", 35),
    ("Elisa",   30)
]

# Crear el RDD a partir de la lista
rdd_empleados = sc.parallelize(datos_empleados)

# Usar filter para seleccionar empleados mayores de 30 años
rdd_empleados_mayores_30 = rdd_empleados.filter(lambda x: x[1] > 30)

# Contar empleados mayores de 30 años
num_empleados_mayores_30 = rdd_empleados_mayores_30.count()
print("Número de empleados mayores de 30 años:", num_empleados_mayores_30)

# Mostrar los resultados
print(rdd_empleados_mayores_30.collect())

Número de empleados mayores de 30 años: 2
[('José', 32), ('Daniela', 35)]


In [5]:
#Ejemplo con map, reduceByKey, sortBy y first
# Datos de uso de gimnasio: (tipo_membresia, cantidad_usos)
gimnasio_data = [
    ("premium", 15), ("básico", 7), ("premium", 10),
    ("vip", 5), ("básico", 9), ("vip", 8), ("vip", 1),
]

# Crear RDD de gimnasio
gimnasio_rdd = sc.parallelize(gimnasio_data)

# Transformar a (tipo_membresia, cantidad_usos)
usos_map = gimnasio_rdd.map(lambda x: (x[0], x[1]))

# Sumar los usos por cada tipo de membresía
usos_totales = usos_map.reduceByKey(lambda x, y: x + y)

# Ordenar por cantidad de usos de mayor a menor
usos_ordenados = usos_totales.sortBy(lambda x: x[1], ascending=False)

# Obtener la primera membresía en la lista ordenada por cantidad de usos
membresia_primera = usos_ordenados.first()

print("Membresía con mayor cantidad de usos:", membresia_primera)

Membresía con mayor cantidad de usos: ('premium', 25)


In [6]:
#Ejemplo con map, distinct, countByKey y reduce
# Datos de préstamos en una biblioteca: (categoría, cantidad_préstamos)
prestamos_data = [
    ("Novela", 12), ("Ciencia", 5), ("Tecnología", 8),
    ("Novela", 3), ("Historia", 7), ("Ciencia", 4),
]

# Crear RDD de préstamos
prestamos_rdd = sc.parallelize(prestamos_data)

# 1. `distinct`: Obtener categorías únicas de libros
categorias_unicas = prestamos_rdd.map(lambda x: x[0]).distinct()

# 2. `countByKey`: Contar los préstamos por cada categoría
conteo_por_categoria = prestamos_rdd.countByKey()

# 3. `reduce`: Calcular el total de préstamos
total_prestamos = prestamos_rdd.map(lambda x: x[1]).reduce(lambda x, y: x + y)

print("Categorías únicas:", categorias_unicas.collect())
print("Conteo por categoría:", dict(conteo_por_categoria))
print("Total de préstamos:", total_prestamos)

Categorías únicas: ['Ciencia', 'Tecnología', 'Historia', 'Novela']
Conteo por categoría: {'Novela': 2, 'Ciencia': 2, 'Tecnología': 1, 'Historia': 1}
Total de préstamos: 39


In [7]:
#Ejemplo con map, reduceByKey, filter y saveAsTextFile
# Datos de viajes en un servicio de transporte: (destino, cantidad_viajes, tarifa)
transporte_data = [
    ("Centro", 10, 15), ("Aeropuerto", 5, 30), ("Centro", 3, 15),
    ("Estación", 8, 10), ("Aeropuerto", 2, 30), ("Estación", 6, 10),
]

# Crear RDD de transporte
transporte_rdd = sc.parallelize(transporte_data)

# 1. `map`: Calcular el ingreso por cada viaje
tarifas_totales = transporte_rdd.map(lambda x: (x[0], x[1] * x[2]))

# 2. `reduceByKey`: Sumar las tarifas totales por destino
ingresos_por_destino = tarifas_totales.reduceByKey(lambda x, y: x + y)

# 3. `filter`: Filtrar destinos con ingresos mayores a 100
destinos_con_altas_tarifas = ingresos_por_destino.filter(lambda x: x[1] > 100)

# 4. `saveAsTextFile`: Guardar el resultado en un archivo
destinos_con_altas_tarifas.saveAsTextFile("/content/altas_tarifas.txt")

print("Destinos con tarifas altas guardados en archivo.")

Destinos con tarifas altas guardados en archivo.


In [8]:
#Ejemplo con groupByKey, reduceByKey, sortByKey y collect
# Datos de mascotas atendidas en una veterinaria: (tipo_mascota, cantidad_visitas)
mascotas_data = [
    ("perro", 5), ("gato", 3), ("perro", 2),
    ("conejo", 1), ("gato", 4), ("conejo", 2),
]

# Crear RDD de mascotas
mascotas_rdd = sc.parallelize(mascotas_data)

# 1. `groupByKey`: Agrupar las visitas por tipo de mascota
visitas_agrupadas = mascotas_rdd.groupByKey().mapValues(list)

# 2. `reduceByKey`: Sumar las visitas totales por tipo de mascota
visitas_totales = mascotas_rdd.reduceByKey(lambda x, y: x + y)

# 3. `sortByKey`: Ordenar las visitas por tipo de mascota alfabéticamente
visitas_ordenadas = visitas_totales.sortByKey()

# 4. `collect`: Recoger los resultados ordenados
resultado = visitas_ordenadas.collect()

print("Visitas por tipo de mascota:", resultado)

Visitas por tipo de mascota: [('conejo', 3), ('gato', 7), ('perro', 7)]


In [9]:
#Ejemplo con flatMap, count y take
# Datos: (tipo de visitante, cantidad de visitas por grupo)
visitantes_data = [
    ("adulto", 3), ("niño", 1), ("adulto", 2),
    ("ancianos", 1), ("niño", 6), ("adulto", 6), ("ancianos", 5)
]
visitantes_rdd = sc.parallelize(visitantes_data)

# 1. `flatMap`: Duplicar cada visita individual en el registro (ej. "adulto" con 5 visitas se convierte en 5 entradas de "adulto")
visitas_expandidas = visitantes_rdd.flatMap(lambda x: [x[0]] * x[1])

# 2. `count`: Contar el total de entradas registradas
total_visitas = visitas_expandidas.count()
print("Total de visitas individuales:", total_visitas)

# 3. `take`: Ver las primeras 5 visitas para una inspección rápida
primeras_visitas = visitas_expandidas.take(12)
print("Primeras visitas:", primeras_visitas)

Total de visitas individuales: 24
Primeras visitas: ['adulto', 'adulto', 'adulto', 'niño', 'adulto', 'adulto', 'ancianos', 'niño', 'niño', 'niño', 'niño', 'niño']


In [10]:
#Ejemplo con union, intersection y count
# Datos: IDs de usuarios en dos servicios diferentes
usuarios_streaming = sc.parallelize([201, 202, 203, 204, 205])
usuarios_noticias = sc.parallelize([203, 204, 206, 207, 208])

# 1. `union`: Unir ambos RDDs para obtener la lista de todos los usuarios de ambos servicios
todos_usuarios = usuarios_streaming.union(usuarios_noticias)

# 2. `intersection`: Encontrar usuarios que utilizan ambos servicios
usuarios_ambos_servicios = usuarios_streaming.intersection(usuarios_noticias)

# 3. `count`: Contar el total de usuarios que usan ambos servicios
total_usuarios_comunes = usuarios_ambos_servicios.count()
print("Total de usuarios en ambos servicios:", total_usuarios_comunes)

Total de usuarios en ambos servicios: 2


In [11]:
#Ejemplo con join y foreach
# Datos: información de estudiantes y sus notas
estudiantes = sc.parallelize([
    ("1", "Ana García"),
    ("2", "Carlos López"),
    ("3", "María Rodríguez"),
    ("4", "Juan Pérez"),
    ("5", "Laura Martínez")
])

notas = sc.parallelize([
    ("1", (85, "Matemáticas")),
    ("2", (92, "Matemáticas")),
    ("3", (78, "Matemáticas")),
    ("4", (95, "Matemáticas")),
    ("5", (88, "Matemáticas"))
])

# 1. `join`: Combinar información de estudiantes con sus notas
estudiantes_notas = estudiantes.join(notas)

print("Lista de Estudiantes y Calificaciones:")
print("-" * 50)

# 2. `foreach`: Mostrar la información de cada estudiante con su nota
resultados = estudiantes_notas.collect()
for resultado in resultados:
    id_estudiante = resultado[0]
    nombre = resultado[1][0]
    nota = resultado[1][1][0]
    asignatura = resultado[1][1][1]
    print(f"ID: {id_estudiante}, Estudiante: {nombre}, Asignatura: {asignatura}, Nota: {nota}")

print("-" * 50)

Lista de Estudiantes y Calificaciones:
--------------------------------------------------
ID: 4, Estudiante: Juan Pérez, Asignatura: Matemáticas, Nota: 95
ID: 3, Estudiante: María Rodríguez, Asignatura: Matemáticas, Nota: 78
ID: 1, Estudiante: Ana García, Asignatura: Matemáticas, Nota: 85
ID: 2, Estudiante: Carlos López, Asignatura: Matemáticas, Nota: 92
ID: 5, Estudiante: Laura Martínez, Asignatura: Matemáticas, Nota: 88
--------------------------------------------------


In [12]:
#Ejemplo con cogroup, coalesce y max, min
# Crear dos RDDs con datos de ventas por región
ventas_enero = sc.parallelize([
    ("Norte", 1500),
    ("Sur", 2000),
    ("Este", 1800),
    ("Oeste", 1600)
])

ventas_febrero = sc.parallelize([
    ("Norte", 1600),
    ("Sur", 1900),
    ("Este", 2100),
    ("Oeste", 1700)
])

# 1. Transformación cogroup: Agrupar ventas por región
ventas_agrupadas = ventas_enero.cogroup(ventas_febrero)

# Mostrar resultado del cogroup
print("Resultado del cogroup:")
for region in ventas_agrupadas.collect():
    print(f"Región: {region[0]}")
    print(f"Ventas Enero: {list(region[1][0])}")
    print(f"Ventas Febrero: {list(region[1][1])}\n")

# 2. Transformación coalesce: Reducir el número de particiones a 2
ventas_reducidas = ventas_agrupadas.coalesce(2)
print(f"Número de particiones después de coalesce: {ventas_reducidas.getNumPartitions()}")

# 3. Acciones max y min
# Primero convertimos los datos a un formato más manejable
ventas_totales_enero = ventas_enero.map(lambda x: x[1])
ventas_totales_febrero = ventas_febrero.map(lambda x: x[1])

# Encontrar máximo y mínimo de ventas por mes
max_enero = ventas_totales_enero.max()
min_enero = ventas_totales_enero.min()
max_febrero = ventas_totales_febrero.max()
min_febrero = ventas_totales_febrero.min()

print("\nEstadísticas de ventas:")
print(f"Enero - Venta máxima: {max_enero}, Venta mínima: {min_enero}")
print(f"Febrero - Venta máxima: {max_febrero}, Venta mínima: {min_febrero}")

Resultado del cogroup:
Región: Norte
Ventas Enero: [1500]
Ventas Febrero: [1600]

Región: Sur
Ventas Enero: [2000]
Ventas Febrero: [1900]

Región: Oeste
Ventas Enero: [1600]
Ventas Febrero: [1700]

Región: Este
Ventas Enero: [1800]
Ventas Febrero: [2100]

Número de particiones después de coalesce: 2

Estadísticas de ventas:
Enero - Venta máxima: 2000, Venta mínima: 1500
Febrero - Venta máxima: 2100, Venta mínima: 1600
