Se tiene un RDD registros de ventas de producto con forma (fecha de venta, cdigo de producto, precio de venta), y en otro RDD detalle de los productos con (codigo de producto, descripcion del producto, categoria). Se pide resolver utilizando PySpark:
**A) Producto mas vendido;
B)Categoria de productos mas vendida;
C)Top5 de productos mas vendidos generando un RDD con (codigo de producto, descripcion, cantidad de ventas);
D)Cual es el producto que registro mayor aumento de precio en el ultimo año, tomando para este analisis solo los productos que cuenten con al menos 50 ventas en el ultimo año;
E) idem anterior pero calculando la categoria de productos que registro mayor variacion de precios en el ultimo año.**

In [2]:
from fechasRandom import *
import random

ventas = []
productos = []
fechaInicial = '2018-01-01'
fechaFinal = '2019-12-30'
categoriasProductos = ['Alimentos','Calzado','Jardineria','Limpieza','Detailing']

for i in range(1000):
    fechaVenta = random_date(fechaInicial, fechaFinal)
    codigoProducto = random.randint(1,10)
    precioVenta = float("{0:.2f}".format(random.uniform(10,20000)))
    ventas.append((fechaVenta,codigoProducto,precioVenta))
for j in range(1,11):
    codigoProducto = j
    descripcionProducto = "Descripcion {}".format(j)
    categoria = random.choice(categoriasProductos)
    productos.append((codigoProducto,descripcionProducto,categoria))



In [3]:
ventasRDD = sc.parallelize(ventas)
productosRDD = sc.parallelize(productos)

In [4]:
ventasRDD.collect()

[('2018-08-17', 9, 6944.21),
 ('2018-01-05', 5, 5231.64),
 ('2018-08-26', 6, 10762.98),
 ('2018-12-31', 9, 4940.65),
 ('2018-05-26', 9, 8881.46),
 ('2019-12-03', 4, 17657.29),
 ('2018-01-12', 7, 7813.6),
 ('2019-04-03', 6, 589.28),
 ('2018-12-22', 5, 10354.49),
 ('2018-10-18', 7, 18793.87),
 ('2019-01-01', 2, 8056.1),
 ('2019-04-03', 10, 18322.44),
 ('2018-02-08', 6, 16532.41),
 ('2018-01-13', 6, 3197.6),
 ('2018-07-01', 1, 1871.5),
 ('2018-02-01', 4, 15477.1),
 ('2019-04-01', 3, 15411.98),
 ('2019-03-29', 4, 17192.59),
 ('2019-09-06', 2, 5457.19),
 ('2018-11-29', 8, 1440.88),
 ('2018-10-03', 5, 18664.71),
 ('2019-06-03', 8, 16725.33),
 ('2019-07-12', 4, 10859.8),
 ('2019-04-09', 1, 6547.69),
 ('2018-08-13', 6, 9512.12),
 ('2018-06-11', 8, 4462.42),
 ('2018-08-15', 3, 19025.53),
 ('2018-02-10', 10, 19846.64),
 ('2018-11-13', 9, 17315.01),
 ('2019-06-05', 3, 5768.77),
 ('2019-04-12', 5, 9315.18),
 ('2018-07-20', 2, 5810.28),
 ('2019-10-13', 1, 15204.76),
 ('2018-01-09', 4, 1320.14),
 ('

In [5]:
productosRDD.collect()

[(1, 'Descripcion 1', 'Calzado'),
 (2, 'Descripcion 2', 'Alimentos'),
 (3, 'Descripcion 3', 'Jardineria'),
 (4, 'Descripcion 4', 'Detailing'),
 (5, 'Descripcion 5', 'Calzado'),
 (6, 'Descripcion 6', 'Jardineria'),
 (7, 'Descripcion 7', 'Limpieza'),
 (8, 'Descripcion 8', 'Detailing'),
 (9, 'Descripcion 9', 'Alimentos'),
 (10, 'Descripcion 10', 'Detailing')]

**A) Producto mas vendido**

In [6]:
#agrego la cantidad de veces que aparece un producto con map
ventasRDD1 = ventasRDD.map(lambda x: (x[1],1))

#uso reduceByKey para agrupar por clave valor y hacer la suma
ventasRDD1 = ventasRDD1.reduceByKey(lambda x,y: x+y)

#obtengo lista ordenada por valor (de menor valor a mayor valor)
ventasRDD1 = ventasRDD1.takeOrdered(ventasRDD1.count(),lambda x: x[1])

#ahora me quedo con el ultimo
print("El producto mas vendido fue el de codigo {} con {} ventas".format(ventasRDD1[-1][0], ventasRDD1[-1][1]))

El producto mas vendido fue el de codigo 7 con 111 ventas


**B)Categoria de productos mas vendida**

In [7]:
#la categoria mas vendida fue la del producto mas vendido, puedo usar datos
#del item anterior
productosRDD1 = productosRDD.filter(lambda x: x[0] == ventasRDD1[-1][0])
productosRDD1 = productosRDD1.take(1)
print("La categoria de productos mas vendida fue {} con un total de {} ventas".format(productosRDD1[0][2],ventasRDD1[-1][1]))

La categoria de productos mas vendida fue Limpieza con un total de 111 ventas


**C)Top5 de productos mas vendidos generando un RDD con (codigo de producto, descripcion, cantidad de ventas)**

In [8]:
#me quedo con (codigoProducto,descripcion)
productosRDD2 = productosRDD.map(lambda x: (x[0],x[1]))

#busco los de mayor cantidad de ventas
top5RDD = ventasRDD.map(lambda x: (x[1],1)).reduceByKey(lambda x,y:x+y)

#busco los 5 mas vendidos
top5RDD = top5RDD.top(5, lambda x: x[1])

#convierto en RDD
top5RDD = sc.parallelize(top5RDD)

#uno los 2 RDD: va a quedar de la forma (codigoProducto,(cantidadVentas,descripcion))
top5RDD = top5RDD.join(productosRDD2)

#lo paso a (codigoProducto, Descripcion, cantidadVentas)
top5RDD = top5RDD.map(lambda x: (x[0],x[1][1],x[1][0]))

#ahora ordeno de mayor a menor
top5RDD.takeOrdered(5, lambda x: x[2])[::-1]


[(7, 'Descripcion 7', 111),
 (1, 'Descripcion 1', 107),
 (4, 'Descripcion 4', 104),
 (9, 'Descripcion 9', 102),
 (2, 'Descripcion 2', 100)]

**D)Cual es el producto que registro mayor aumento de precio en el ultimo año, tomando para este analisis solo los productos que cuenten con al menos 50 ventas en el ultimo año;**

In [9]:
#tomo como ultimo año el 2019
ventasUltimoAño = ventasRDD.filter(lambda x: x[0]>='2018-12-30' and x[0]<='2019-12-30')

#busco los productos con mayor cantidad de ventas
ventasUltimoAño = ventasUltimoAño.map(lambda x: (x[1],1))
ventasUltimoAño = ventasUltimoAño.reduceByKey(lambda x,y: x+y)
#me quedo con los que tienen 50 o mas ventas
ventasUltimoAño = ventasUltimoAño.filter(lambda x: x[1]>=50)

#(codigo_producto,((fecha,precio),cantidad)) a (codigo_producto,(fecha,precio))
ventasUltAñoCompleto = ventasRDD.map(lambda x: (x[1],(x[0],x[2])))
ventasUltAñoCompleto = ventasUltAñoCompleto.join(ventasUltimoAño)
ventasUltAñoCompleto = ventasUltAñoCompleto.map(lambda x: (x[0],(x[1][0][0],x[1][0][1])))

#busco el precio final de un producto por fecha (la ultima fecha q se compro un producto va a ser el ultimo precio)
#con el reducebykey: x es (fecha0,precio0), y es (fecha1,precio1): me quedo con X si la fecha de X es mayor a la de Y
#sino me quedo con Y
preciosFinalesRDD = ventasUltAñoCompleto.reduceByKey(lambda x,y: x if x[0]>y[0] else y)

#busco el precio inicial de un producto por fecha (la primer fecha que se compro un producto es la del precio inicial)
preciosInicialesRDD = ventasUltAñoCompleto.reduceByKey(lambda x,y: x if x[0]<y[0] else y)

#uno los 2 rdd y queda de la forma (producto, ((fechaINicial,PrecioInicial),(fechaFinal,precioFinal)))
rddPrecios = preciosInicialesRDD.join(preciosFinalesRDD)

#ahora quiero que me quede (producto, precioInicial,precioFinal)
rddPrecios = rddPrecios.map(lambda x: (x[0],x[1][0][1],x[1][1][1]))

#ahora quiero que me quede (producto, diferenciaPrecios)
rddPrecios = rddPrecios.map(lambda x: (x[0], x[2]-x[1]))

#busco la mayor diferencia de precios
rddPrecios = rddPrecios.takeOrdered(rddPrecios.count(), lambda x: x[1])
print("El producto que más aumento en el ultimo año fue el producto {}, que aumento {} pesos".format(rddPrecios[-1][0],rddPrecios[-1][1]))

El producto que más aumento en el ultimo año fue el producto 10, que aumento -2518.91 pesos


'1'