In [None]:
pip install pyspark



In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("VentasPorCiudad").setMaster("local")
sc = SparkContext(conf=conf)

ventas = [
    ("Lima", [("Producto1", 5, 10.0), ("Producto2", 2, 20.0)]),
    ("Cusco", [("Producto1", 3, 10.0), ("Producto3", 1, 15.0)]),
    ("Lima", [("Producto1", 1, 10.0), ("Producto2", 1, 20.0)]),
    ("Cusco", [("Producto3", 5, 15.0), ("Producto2", 1, 20.0)]),
    ("Arequipa", [("Producto1", 2, 10.0), ("Producto3", 1, 15.0)])
]
rdd = sc.parallelize(ventas)

ventas_por_ciudad = rdd.map(lambda ciudad: (
    ciudad[0],  # Nombre de la ciudad
    sum([cantidad * precio for _, cantidad, precio in ciudad[1]])  # Calcular el total de ventas en esa ciudad
))

# Reducir por clave (ciudad) para obtener el total de ventas final en cada ciudad
ventas_totales_por_ciudad = ventas_por_ciudad.reduceByKey(lambda x, y: x + y)
print(ventas_totales_por_ciudad.collect())
sc.stop()

[('Lima', 120.0), ('Cusco', 140.0), ('Arequipa', 35.0)]


In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("FiltrarVentasPorCiudad").setMaster("local")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(ventas)

# Calcular el total de ventas por ciudad
ventas_por_ciudad = rdd.map(lambda ciudad: (
    ciudad[0],  # Nombre de la ciudad
    sum([cantidad * precio for _, cantidad, precio in ciudad[1]])  # Calcular el total de ventas en esa ciudad
))

# Filtrar solo las ciudades con ventas mayores a 50
ventas_filtradas = ventas_por_ciudad.filter(lambda x: x[1] > 50)
print(ventas_filtradas.collect())
sc.stop()


[('Lima', 90.0), ('Cusco', 95.0)]


In [None]:
ventas = [
    ("Lima", [("Producto1", 5, 10.0), ("Producto2", 2, 20.0)]),
    ("Cusco", [("Producto1", 3, 10.0), ("Producto3", 1, 15.0)]),
    ("Lima", [("Producto1", 1, 10.0), ("Producto2", 1, 20.0)]),
    ("Cusco", [("Producto3", 5, 15.0), ("Producto2", 1, 20.0)]),
    ("Arequipa", [("Producto1", 2, 10.0), ("Producto3", 1, 15.0)])
]
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("VentasIndividuales").setMaster("local")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(ventas)
ventas_individuales = rdd.flatMap(lambda ciudad: [
    (ciudad[0], producto, cantidad, cantidad * precio)  # Calcula el total por producto
    for producto, cantidad, precio in ciudad[1]
])
print(ventas_individuales.collect())

# Detener el contexto de Spark al final
sc.stop()

[('Lima', 'Producto1', 5, 50.0), ('Lima', 'Producto2', 2, 40.0), ('Cusco', 'Producto1', 3, 30.0), ('Cusco', 'Producto3', 1, 15.0), ('Lima', 'Producto1', 1, 10.0), ('Lima', 'Producto2', 1, 20.0), ('Cusco', 'Producto3', 5, 75.0), ('Cusco', 'Producto2', 1, 20.0), ('Arequipa', 'Producto1', 2, 20.0), ('Arequipa', 'Producto3', 1, 15.0)]


In [None]:
from pyspark import SparkConf, SparkContext
try:
    sc.stop()
except:
    pass
conf = SparkConf().setAppName("UnionEstudiantes").setMaster("local")
sc = SparkContext(conf=conf)
estudiantes_colegio_A = [
    ("Ana", "Colegio A", 85),
    ("Luis", "Colegio A", 78),
    ("Carlos", "Colegio A", 92)
]

estudiantes_colegio_B = [
    ("Maria", "Colegio B", 88),
    ("Jose", "Colegio B", 73),
    ("Lucia", "Colegio B", 95)
]
rdd_colegio_A = sc.parallelize(estudiantes_colegio_A)
rdd_colegio_B = sc.parallelize(estudiantes_colegio_B)
rdd_estudiantes_total = rdd_colegio_A.union(rdd_colegio_B)
print(rdd_estudiantes_total.collect())
sc.stop()


[('Ana', 'Colegio A', 85), ('Luis', 'Colegio A', 78), ('Carlos', 'Colegio A', 92), ('Maria', 'Colegio B', 88), ('Jose', 'Colegio B', 73), ('Lucia', 'Colegio B', 95)]


In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("IntersectionEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Datos de ejemplo: productos vendidos a los clientes
productos_vendidos = [
    ("ProductoA", 10),
    ("ProductoB", 15),
    ("ProductoC", 5),
    ("ProductoD", 20)
]

# Datos de ejemplo: productos solicitados por los clientes
productos_solicitados = [
    ("ProductoB", 15),
    ("ProductoD", 20),
    ("ProductoE", 30),
    ("ProductoF", 25)
]
rdd_vendidos = sc.parallelize(productos_vendidos)
rdd_solicitados = sc.parallelize(productos_solicitados)

# Encontrar los productos comunes entre ambos RDDs usando intersection()
productos_comunes = rdd_vendidos.intersection(rdd_solicitados)
print(productos_comunes.collect())
sc.stop()


[('ProductoD', 20), ('ProductoB', 15)]


In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("DistinctEjemplo").setMaster("local")
sc = SparkContext(conf=conf)
ventas = [
    ("ProductoA", 10),
    ("ProductoB", 15),
    ("ProductoA", 10),
    ("ProductoC", 5),
    ("ProductoB", 15),
    ("ProductoD", 20),
    ("ProductoC", 5)
]
rdd_ventas = sc.parallelize(ventas)

# Eliminar duplicados usando distinct()
ventas_unicas = rdd_ventas.distinct()
print(ventas_unicas.collect())

# Detener el contexto de Spark al final
sc.stop()


[('ProductoA', 10), ('ProductoB', 15), ('ProductoC', 5), ('ProductoD', 20)]


In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("GroupByKeyEjemplo").setMaster("local")
sc = SparkContext(conf=conf)
ventas = [
    ("Lima", 10),
    ("Moyobamba", 15),
    ("Lima", 20),
    ("Rioja", 5),
    ("Moyobamba", 10),
    ("Rioja", 7)
]
rdd_ventas = sc.parallelize(ventas)

ventas_por_ciudad = rdd_ventas.groupByKey()

for ciudad, ventas in ventas_por_ciudad.collect():
    print(f"Ciudad: {ciudad}, Ventas: {list(ventas)}")

# Detener el contexto de Spark al final
sc.stop()


Ciudad: Lima, Ventas: [10, 20]
Ciudad: Moyobamba, Ventas: [15, 10]
Ciudad: Rioja, Ventas: [5, 7]


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("SortByKeyEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Datos de ventas
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18),
    ("Cusco", 20)
]
rdd_ventas = sc.parallelize(ventas)
ventas_ordenadas = rdd_ventas.sortByKey()

# Mostrar los resultados
for ciudad, total_ventas in ventas_ordenadas.collect():
    print(f"Ciudad: {ciudad}, Total de Ventas: {total_ventas}")
sc.stop()

Ciudad: Arequipa, Total de Ventas: 18
Ciudad: Cusco, Total de Ventas: 20
Ciudad: Lima, Total de Ventas: 30
Ciudad: Moyobamba, Total de Ventas: 25
Ciudad: Rioja, Total de Ventas: 12


In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("JoinEjemplo").setMaster("local")
sc = SparkContext(conf=conf)
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18),
    ("Cusco", 20)
]
poblacion = [
    ("Lima", 9000000),
    ("Moyobamba", 50000),
    ("Rioja", 35000),
    ("Arequipa", 1000000),
    ("Cusco", 500000)
]
rdd_ventas = sc.parallelize(ventas)
rdd_poblacion = sc.parallelize(poblacion)

# Realizar el join entre los RDDs por la clave (ciudad)
rdd_join = rdd_ventas.join(rdd_poblacion)
# Mostrar los resultados
for ciudad, (total_ventas, poblacion_ciudad) in rdd_join.collect():
    print(f"Ciudad: {ciudad}, Ventas: {total_ventas}, Población: {poblacion_ciudad}")
# Detener el contexto de Spark al final
sc.stop()

Ciudad: Moyobamba, Ventas: 25, Población: 50000
Ciudad: Rioja, Ventas: 12, Población: 35000
Ciudad: Arequipa, Ventas: 18, Población: 1000000
Ciudad: Lima, Ventas: 30, Población: 9000000
Ciudad: Cusco, Ventas: 20, Población: 500000


In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("CogroupEjemplo").setMaster("local")
sc = SparkContext(conf=conf)
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18)
]
salarios = [
    ("Lima", 1200),
    ("Moyobamba", 800),
    ("Cusco", 900),
    ("Arequipa", 1100)
]
rdd_ventas = sc.parallelize(ventas)
rdd_salarios = sc.parallelize(salarios)
rdd_cogroup = rdd_ventas.cogroup(rdd_salarios)
for ciudad, (ventas_ciudad, salarios_ciudad) in rdd_cogroup.collect():
    print(f"Ciudad: {ciudad}")
    print(f"  Ventas: {list(ventas_ciudad)}")
    print(f"  Salarios: {list(salarios_ciudad)}")
    print("-" * 30)
sc.stop()


Ciudad: Moyobamba
  Ventas: [25]
  Salarios: [800]
------------------------------
Ciudad: Rioja
  Ventas: [12]
  Salarios: []
------------------------------
Ciudad: Arequipa
  Ventas: [18]
  Salarios: [1100]
------------------------------
Ciudad: Lima
  Ventas: [30]
  Salarios: [1200]
------------------------------
Ciudad: Cusco
  Ventas: []
  Salarios: [900]
------------------------------


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("CoalesceEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear los datos de ventas
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18),
    ("Cusco", 10),
    ("Iquitos", 22)
]

# Crear un RDD de ventas
rdd_ventas = sc.parallelize(ventas, 6)  # Utilizamos 6 particiones al principio

# Mostrar el número de particiones antes de usar coalesce
print(f"Particiones antes de coalesce: {rdd_ventas.getNumPartitions()}")

# Reducir las particiones a 2
rdd_coalesced = rdd_ventas.coalesce(2)

# Mostrar el número de particiones después de usar coalesce
print(f"Particiones después de coalesce: {rdd_coalesced.getNumPartitions()}")

# Mostrar los datos después de coalesce
print(rdd_coalesced.collect())

# Detener el contexto de Spark al final
sc.stop()


Particiones antes de coalesce: 6
Particiones después de coalesce: 2
[('Lima', 30), ('Moyobamba', 25), ('Rioja', 12), ('Arequipa', 18), ('Cusco', 10), ('Iquitos', 22)]


In [None]:
#Acciones

In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("ReduceEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear un RDD de ventas
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18),
    ("Cusco", 10),
    ("Iquitos", 22)
]

rdd_ventas = sc.parallelize(ventas)

# Usar reduce para calcular el total de ventas
# Función de reducción que suma los valores
total_ventas = rdd_ventas.map(lambda x: x[1]).reduce(lambda a, b: a + b)

# Mostrar el total de ventas
print(f"Total de ventas: {total_ventas}")

# Detener el contexto de Spark al final
sc.stop()


Total de ventas: 117


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("CollectEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear un RDD de ventas
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18),
    ("Cusco", 10),
    ("Iquitos", 22)
]

rdd_ventas = sc.parallelize(ventas)

# Usar collect() para recuperar todos los elementos del RDD
resultado = rdd_ventas.collect()

# Mostrar el resultado
print("Ventas por ciudad:", resultado)

# Detener el contexto de Spark al final
sc.stop()


Ventas por ciudad: [('Lima', 30), ('Moyobamba', 25), ('Rioja', 12), ('Arequipa', 18), ('Cusco', 10), ('Iquitos', 22)]


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("CountEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear un RDD de ventas
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18),
    ("Cusco", 10),
    ("Iquitos", 22)
]

rdd_ventas = sc.parallelize(ventas)

# Usar count() para contar el número de elementos en el RDD
total_elementos = rdd_ventas.count()

# Mostrar el total de elementos
print(f"Total de elementos en el RDD: {total_elementos}")

# Detener el contexto de Spark al final
sc.stop()


Total de elementos en el RDD: 6


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("FirstEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear un RDD de ventas
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18),
    ("Cusco", 10),
    ("Iquitos", 22)
]

rdd_ventas = sc.parallelize(ventas)

# Usar first() para obtener el primer elemento del RDD
primer_venta = rdd_ventas.first()

# Mostrar el primer elemento
print(f"Primer elemento del RDD: {primer_venta}")

# Detener el contexto de Spark al final
sc.stop()


Primer elemento del RDD: ('Lima', 30)


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("TakeEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear un RDD de ventas
ventas = [
    ("Lima", 30),
    ("Moyobamba", 25),
    ("Rioja", 12),
    ("Arequipa", 18),
    ("Cusco", 10),
    ("Iquitos", 22)
]

rdd_ventas = sc.parallelize(ventas)

# Usar take(n) para obtener los primeros 3 elementos del RDD
primeros_3 = rdd_ventas.take(3)

# Mostrar los primeros 3 elementos
print(f"Primeros 3 elementos del RDD: {primeros_3}")

# Detener el contexto de Spark al final
sc.stop()


Primeros 3 elementos del RDD: [('Lima', 30), ('Moyobamba', 25), ('Rioja', 12)]


In [None]:
from pyspark import SparkConf, SparkContext
import shutil
try:
    sc.stop()
except:
    pass  # Si no hay un contexto activo, se ignora el error
# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("SaveAsTextFileEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear el RDD con los datos de ventas
ventas = [
    ("T001", "Laptop", 2),
    ("T002", "Smartphone", 5),
    ("T003", "Tablet", 1),
    ("T004", "Laptop", 3),
    ("T005", "Smartphone", 2),
    ("T006", "Smartwatch", 4),
    ("T007", "Laptop", 1),
    ("T008", "Smartphone", 3)
]
rdd_ventas = sc.parallelize(ventas)
# Crear un RDD con (producto, cantidad) usando map
rdd_productos = rdd_ventas.map(lambda x: (x[1], x[2]))
# Calcular el total vendido por cada producto utilizando reduceByKey
rdd_totales = rdd_productos.reduceByKey(lambda x, y: x + y)
# Convertir cada tupla en una cadena de texto para guardarlas como texto
rdd_totales_texto = rdd_totales.map(lambda x: f"{x[0]}: {x[1]}")
# Eliminar el directorio de salida si ya existe
try:
    shutil.rmtree("ventas_totales_salida")
except FileNotFoundError:
    pass  # Si el directorio no existe, ignora el error
# Guardar el resultado como archivo de texto
rdd_totales_texto.saveAsTextFile("ventas_totales_salida")
# Detener el contexto de Spark al final
sc.stop()


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("MaxMinEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear el RDD con los datos de ventas
ventas = [
    ("T001", "Laptop", 2),
    ("T002", "Smartphone", 5),
    ("T003", "Tablet", 1),
    ("T004", "Laptop", 3),
    ("T005", "Smartphone", 2),
    ("T006", "Smartwatch", 4),
    ("T007", "Laptop", 1),
    ("T008", "Smartphone", 3)
]

rdd_ventas = sc.parallelize(ventas)

# Crear un RDD con (producto, cantidad) usando map
rdd_productos = rdd_ventas.map(lambda x: (x[1], x[2]))

# Encontrar el producto con la venta más alta (max) y la más baja (min)
max_venta = rdd_productos.reduceByKey(lambda x, y: x + y).max(lambda x: x[1])
min_venta = rdd_productos.reduceByKey(lambda x, y: x + y).min(lambda x: x[1])

# Imprimir los resultados
print(f"Producto con la venta más alta: {max_venta}")
print(f"Producto con la venta más baja: {min_venta}")

# Detener el contexto de Spark
sc.stop()


Producto con la venta más alta: ('Smartphone', 10)
Producto con la venta más baja: ('Tablet', 1)


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("CountByKeyEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear el RDD con los datos de ventas
ventas = [
    ("Laptop", 2),
    ("Smartphone", 5),
    ("Tablet", 1),
    ("Laptop", 3),
    ("Smartphone", 2),
    ("Smartwatch", 4),
    ("Laptop", 1),
    ("Smartphone", 3)
]

rdd_ventas = sc.parallelize(ventas)

# Contar cuántas veces se vendió cada producto usando countByKey
conteo_por_producto = rdd_ventas.countByKey()

# Imprimir los resultados
for producto, conteo in conteo_por_producto.items():
    print(f"{producto}: {conteo}")

# Detener el contexto de Spark
sc.stop()


Laptop: 3
Smartphone: 3
Tablet: 1
Smartwatch: 1


In [None]:
from pyspark import SparkConf, SparkContext

# Configurar y crear el contexto de Spark
conf = SparkConf().setAppName("ForEachEjemplo").setMaster("local")
sc = SparkContext(conf=conf)

# Crear el RDD con los datos de ventas
ventas = [
    ("Laptop", 2),
    ("Smartphone", 5),
    ("Tablet", 1),
    ("Laptop", 3),
    ("Smartphone", 2),
    ("Smartwatch", 4),
    ("Laptop", 1),
    ("Smartphone", 3)
]

rdd_ventas = sc.parallelize(ventas)

# Función que imprimirá cada venta
def imprimir_venta(venta):
    producto, cantidad = venta
    print(f"Producto: {producto}, Cantidad vendida: {cantidad}")

# Aplicar foreach para imprimir cada venta
rdd_ventas.foreach(imprimir_venta)

# Detener el contexto de Spark
sc.stop()
