In [103]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [104]:
#2- Se tiene un RDD registros de ventas de producto con la forma (fecha de venta, código de producto, 
#precio de venta) y en otro RDD detalle de los productos con (código de producto, descripción del producto, 
#categoría).

rdd_data_1 = [
    ("2020-03-06",1,200),
    ("2019-05-06",2,125),
    ("2018-10-26",1,100),
    ("2020-01-10",2,450),
    ("2020-05-04",2,300),
    ("2020-07-30",2,600),
    ("2019-08-22",2,700),
    ("2020-05-12",2,300),
    ("2016-03-03",3,500),
    ("2005-08-13",3,500),
    ("2006-04-10",3,500),
    ("2013-08-07",4,700),
    ("2009-06-10",4,500),
    ("2020-08-20",5,100),
    ("2018-06-10",5,70),
    ("2019-07-10",6,1000),
    ("2020-10-10",6,1500)
]

rdd_data_2 = [
    (1, "Afeitadora", "Cuidados Personales"),
    (2, "Auriculares", "Electronica"),
    (3, "Zapallo", "Verduras"),
    (4, "Pantufla", "Ropa"),
    (5, "Aguacate", "Verduras"),
    (6, "Camisa", "Ropa")
]

rdd_ventas = sc.parallelize(rdd_data_1)
rdd_prods = sc.parallelize(rdd_data_2)

In [105]:
ventas_df = rdd_ventas.toDF(['fecha','id','precio'])
rdd_prods = rdd_prods.toDF(['id', 'descripcion', 'categoria']).rdd
ventas_df = ventas_df.withColumn("fecha", (col("fecha").cast("date")))
rdd_ventas = ventas_df.rdd
rdd_ventas.collect()

[Row(fecha=datetime.date(2020, 3, 6), id=1, precio=200),
 Row(fecha=datetime.date(2019, 5, 6), id=2, precio=125),
 Row(fecha=datetime.date(2018, 10, 26), id=1, precio=100),
 Row(fecha=datetime.date(2020, 1, 10), id=2, precio=450),
 Row(fecha=datetime.date(2020, 5, 4), id=2, precio=300),
 Row(fecha=datetime.date(2020, 7, 30), id=2, precio=600),
 Row(fecha=datetime.date(2019, 8, 22), id=2, precio=700),
 Row(fecha=datetime.date(2020, 5, 12), id=2, precio=300),
 Row(fecha=datetime.date(2016, 3, 3), id=3, precio=500),
 Row(fecha=datetime.date(2005, 8, 13), id=3, precio=500),
 Row(fecha=datetime.date(2006, 4, 10), id=3, precio=500),
 Row(fecha=datetime.date(2013, 8, 7), id=4, precio=700),
 Row(fecha=datetime.date(2009, 6, 10), id=4, precio=500),
 Row(fecha=datetime.date(2020, 8, 20), id=5, precio=100),
 Row(fecha=datetime.date(2018, 6, 10), id=5, precio=70),
 Row(fecha=datetime.date(2019, 7, 10), id=6, precio=1000),
 Row(fecha=datetime.date(2020, 10, 10), id=6, precio=1500)]

In [106]:
rdd_products_mapped = rdd_prods.map(lambda x: (x.id, (x.categoria, x.descripcion)))
rdd_products_mapped.collect()

[(1, ('Cuidados Personales', 'Afeitadora')),
 (2, ('Electronica', 'Auriculares')),
 (3, ('Verduras', 'Zapallo')),
 (4, ('Ropa', 'Pantufla')),
 (5, ('Verduras', 'Aguacate')),
 (6, ('Ropa', 'Camisa'))]

In [107]:
rdd_joined = rdd_ventas.map(lambda x: (x.id, (x.fecha, x.precio))).join(rdd_products_mapped).cache()
rdd_joined.take(1)

[(1,
  ((datetime.date(2020, 3, 6), 200), ('Cuidados Personales', 'Afeitadora')))]

In [108]:
rdd_joined_ordered = rdd_joined.map(lambda x: (x[0], (1, x[1][1][1]))).cache()
rdd_joined_ordered.reduceByKey(lambda x,y: ((x[0]+y[0],x[1]))).map(lambda x:(x[1][1], x[1][0]))\
.takeOrdered(1, lambda x:-x[1])

[('Auriculares', 6)]

In [109]:
#Cuál es la categoría de productos más vendida.
rdd_joined.map(lambda x: (x[1][1][0], (1))).reduceByKey(lambda x,y: x+y).takeOrdered(1, lambda x: -x[1])

[('Electronica', 6)]

In [116]:
#Cuál es el top5 de productos más vendidos generando un RDD con (código de producto, 
#descripción, cantidad de ventas)

rdd_joined_ordered.reduceByKey(lambda x,y:(x[0]+y[0],x[1])).map(lambda x: (x[0], x[1][1], x[1][0]))\
.takeOrdered(5, lambda x: (-x[2]))

[(2, 'Auriculares', 6),
 (3, 'Zapallo', 3),
 (1, 'Afeitadora', 2),
 (4, 'Pantufla', 2),
 (5, 'Aguacate', 2)]