In [1]:
import pandas as pd
from pyspark.sql import *

In [2]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)
ventas = sqlc.read.csv("ventas.csv", header=True).rdd
ventas = ventas.map(lambda x:(pd.to_datetime(x[0],"coerce"), \
                     pd.to_numeric(x[1],"coerce"), pd.to_numeric(x[2],"coerce"))).cache()
productos = sqlc.read.csv("productos.csv", header=True).rdd
productos = productos.map(lambda x:(pd.to_numeric(x[0],"coerce"), x[1], x[2])).cache()

#ventas: (fecha de venta, código de producto, precio de venta)
#productos: (código de producto, descripción del producto, categoría)

In [3]:
#A: Cuál es el producto más vendido
ventas_por_producto = ventas.map(lambda x: (x[1], 1)).reduceByKey(lambda x,y: x+y).cache()
mayor_venta = ventas_por_producto.reduce(lambda x,y:x if x[1]>y[1] else y)[1]
ventas_por_producto.filter(lambda x:x[1]==mayor_venta).collect()#Por si se repite el maximo

[(16, 64), (44, 64)]

In [4]:
#B: Cuál es la categoría de productos más vendida
id_y_categoria = productos.map(lambda x:(x[0], x[2])).cache()
categoria_y_ventas = ventas_por_producto.join(id_y_categoria).\
                map(lambda x:(x[1][1], x[1][0]))
categoria_y_ventas = categoria_y_ventas.reduceByKey(lambda x,y:x+y).cache()
max_cantidad = categoria_y_ventas.reduce(lambda x,y: x if x[1]>y[1] else y)[1]
categoria_y_ventas.filter(lambda x: x[1]==max_cantidad).collect()#Por si se repite el maximo

[('categoria_4', 250)]

In [5]:
#C: Cuál es el top5 de productos más vendidos generando un RDD con 
#(código de producto, descripción, cantidad de ventas)
prod_mas_vend = sc.parallelize(ventas_por_producto.takeOrdered(5,lambda x:-x[1]))#Ni idea que hacer 
                                                                                 #aca con los repetidos
prod_mas_vend.leftOuterJoin(productos.map(lambda x:(x[0],x[1]))).map(lambda x:(x[0],x[1][1],x[1][0])).collect()

[(115, 'xIU', 10),
 (155, 'WIoE', 9),
 (16, 'gbIPijV', 64),
 (44, 'HiDne', 64),
 (294, 'vgiBzfM', 10)]

In [6]:
#D: Cuál es el producto que registró mayor aumento de precio en el último año, tomando para 
#este análisis solo los productos que cuenten con al menos 50 ventas en el último año.
ultimo_anio = ventas.reduce(lambda x,y:x if x[0].year>y[0].year else y)[0].year

#Me quedo con las ventas del ultimo anio
ventas_ultimo_anio = ventas.filter(lambda x:x[0].year==ultimo_anio)

#Cuento cuantas veces se vendio cada producto y saco los que tengan menos de 50
ventas_totales = ventas_ultimo_anio.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y)
ventas_mas_de_50 = ventas_totales.filter(lambda x:x[1]>=50).cache()

#Me quedo con el id y el precio
ventas_id_precio = ventas_ultimo_anio.map(lambda x: (x[1], x[2]))

#Hago join para quedarme con los datos de los productos con 50 ventas o mas
ventas_id_precio_mas_de_50 = ventas_id_precio.join(ventas_mas_de_50).\
                        map(lambda x:(x[0], x[1][0])).cache()#(id,precio)

#Me fijo el maximo y minimo valor de cada producto
menor_precio = ventas_id_precio_mas_de_50.reduceByKey(lambda x,y:x if x<y else y)
mayor_precio = ventas_id_precio_mas_de_50.reduceByKey(lambda x,y:x if x>y else y)

#Hago join para poder quedarme con el que tenga mayor diferencia
menor_precio.join(mayor_precio).\
        reduce(lambda x,y: x if __builtin__.abs(x[1][1]-x[1][0]) > __builtin__.abs(y[1][1]-y[1][0]) else y)

(44, (21, 85))

In [11]:
#D: Idem anterior, pero calculando la categoría de productos que registró mayor variación 
#de precios en el último año.

#Hago join de los productos con 50 ventas o mas del ultimo año y de la categoria
categoria_y_precio = ventas_id_precio_mas_de_50.join(id_y_categoria).\
                        map(lambda x:(x[1][1], x[1][0]))#(categoria,precio)

#Calculo el maximo y el minimo precio de cada categoria
menor_precio_categoria = categoria_y_precio.reduceByKey(lambda x,y: x if x<y else y)
mayor_precio_categoria = categoria_y_precio.reduceByKey(lambda x,y: x if x>y else y)
categoria_menor_mayor = menor_precio_categoria.join(mayor_precio_categoria)#(categoria(menor_precio,mayor_precio))
categoria_menor_mayor.reduce(lambda x,y: x if __builtin__.abs(x[1][1]-x[1][0])>__builtin__.abs(y[1][1]-y[1][0]) else y)

('categoria_5', (21, 85))