In [1]:
import pyspark
import numpy as np
from datetime import datetime as dt

try:
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')

In [2]:
# rdd de ventas = (fecha de venta, código de producto, precio de venta)
# rdd de productos = (codigo, descripcion, categoria)
# A) Producto mas vendido
# B) Categoria de productos mas vendida
# C) top 5 de productos mas vendidos en un rdd de la forma [(codigo, descripcion, cantidad de ventas)]
# D) producto que registro mayor aumento de precios en el ultimo año, contando solo aquellos que vendieron mas de 50 productos
# E) categoria de productos que mas aumento en el ultimo año

In [3]:
productos = [
    (1, 'pituto que va en el cosito', 1),
    (2, 'cosito que va en el pituto', 1),
    (3, 'dulce de membrillo', 2),
    (4, 'tomates en conserva', 2),
    (5, 'tomates', 2),
    (6, 'conserva', 3)
]

def get_random_id():
    return np.random.randint(1, 7)

def get_random_price():
    return np.random.randint(50, 150)

def get_random_date():
    day = np.random.randint(1, 28)
    month = np.random.randint(1, 12)
    year = np.random.randint(1995, 2020)
    return dt.strptime(("%s-%s-%s" % (month, day, year)), '%m-%d-%Y')

ventas = []
for i in range (1, 7000):
    ventas.append( (get_random_date(), get_random_id(), get_random_price()) )
    
productos_rdd = sc.parallelize(productos) 
    
ventas_rdd = sc.parallelize(ventas)
ventas_rdd.take(5)

[(datetime.datetime(1998, 11, 23, 0, 0), 6, 112),
 (datetime.datetime(2014, 8, 6, 0, 0), 6, 79),
 (datetime.datetime(2012, 9, 19, 0, 0), 3, 127),
 (datetime.datetime(2016, 10, 16, 0, 0), 2, 70),
 (datetime.datetime(2010, 9, 18, 0, 0), 5, 60)]

In [4]:
producto_mas_vendido = ventas_rdd.map(lambda x: (x[1], 1))\
                                 .reduceByKey(lambda x, y: x + y)\
                                 .top(1, key=lambda x: x[1])
producto_mas_vendido

[(2, 1217)]

In [5]:
productos_con_categoria = productos_rdd.map(lambda x: (x[0], x[2]))

categoria_mas_vendida = ventas_rdd.map(lambda x: ((x[1]), 1))\
                                  .reduceByKey(lambda x, y: x + y)\
                                  .join(productos_con_categoria)\
                                  .map(lambda x: (x[1][1], x[1][0]))\
                                  .reduceByKey(lambda x, y: x + y)\
                                  .top(1, key= lambda x: x[1])
categoria_mas_vendida

[(2, 3469)]

In [6]:
top5_mas_vendidos = ventas_rdd.map(lambda x: (x[1], 1))\
                              .reduceByKey(lambda x, y: x + y)\
                              .top(5, key=lambda x: x[1])

top5_mas_vendidos

[(2, 1217), (6, 1178), (4, 1158), (5, 1157), (3, 1154)]

In [7]:
cutoff_date = dt.strptime('11-16-2018', '%m-%d-%Y')

ventas_2018 = ventas_rdd.filter(lambda x: x[0] > cutoff_date)

productos_con_ventas = ventas_2018.map(lambda x: (x[1], 1) )\
                                 .reduceByKey(lambda x,y: x+y)\
                                 .filter(lambda x: x[1] > 50)\

mayor_variacion = ventas_2018.map(lambda x: (x[1], (x[2])))\
                             .join(productos_con_ventas)\
                             .map(lambda x: (x[0], (x[1][0], x[1][0])))\
                             .reduceByKey(lambda x,y: (max(x[0],y[0]), min(x[1],y[1])))\
                             .map(lambda x: (x[0], x[1][0]-x[1][1]))\
                             .top(1, key=lambda x: x[1])

print("El articulo con mayor variacion fue #%i con $%i" % mayor_variacion[0])

El articulo con mayor variacion fue #1 con $99


In [8]:
categorias_por_producto = productos_rdd.map(lambda x: (x[0], x[2]))

ventas_con_categorias = ventas_rdd.map(lambda x: (x[1], (x[0], x[2])))\
                                  .join(categorias_por_producto)\
                                  .map(lambda x: (x[1][1], (x[1][0]) ))\
                                  .filter(lambda x: x[1][0] > cutoff_date)
# id_categoria, (fecha, precio))

categorias_con_50_ventas = ventas_con_categorias.map(lambda x: (x[0], 1))\
                                                .reduceByKey(lambda x, y: (x+y))\
                                                .filter(lambda x: x[1] > 50)\

mayor_variacion_categoria = ventas_con_categorias.join(categorias_con_50_ventas)\
                                                 .map(lambda x: (x[0], (x[1][0][1], x[1][0][1])) )\
                                                 .reduceByKey(lambda x, y: ((max(x[0], y[0]), min(x[0], y[0]))))\
                                                 .map(lambda x: (x[0], x[1][0] - x[1][1]) )\
                                                 .top(1, key=lambda x: x[1])

mayor_variacion_categoria

[(1, 28)]