## How to run pyspark in Jupyter Notebook

 1. Set following environment variables
 ```sh
 export SPARK_HOME /PATH_SPARK_INSTALLATION/spark3
 export PYSPARK_PYTHON /PATH_PYTHON
 
 ```
 2. If you dont have it then install findspark using following command
 ```sh
 python -m pip install findspark
 
 ```
 3. Start Jupyter Notebook
 4. Enjoy pyspark coding!

In [133]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
# Cargamos los archivos de entrada en DataFrames
products_df = spark.read.parquet("/Users/cefranlly.perez/sparkprojects/analyticalways/data/products.parquet")
sales_df = spark.read.parquet("/Users/cefranlly.perez/sparkprojects/analyticalways/data/sales.parquet")
stock_df = spark.read.parquet("/Users/cefranlly.perez/sparkprojects/analyticalways/data/stock_movements.parquet")

In [4]:
# Miramos los datos para empezar a jugar con ellos
"""
 Input:
    ProductRootCode (integer): código raíz del producto, sin talla.
    ProductId (integer): código del producto.
    SupplierPrice (float): coste del producto.
    RetailPrice (float): precio del producto de venta al público.
    Family (string): familia de productos a la que pertenece.
    
    rows -> 9347
"""

products_df.show(10)

+---------------+---------+--------+-------------+-----------+
|ProductRootCode|ProductId|  Family|SupplierPrice|RetailPrice|
+---------------+---------+--------+-------------+-----------+
|              1|        2|BRACELET|        36.02|      180.0|
|              3|        4|BRACELET|        41.43|      220.0|
|              5|        6|BRACELET|        53.58|      230.0|
|              7|        8|BRACELET|        62.35|      250.0|
|              9|       10|BRACELET|         0.64|       59.0|
|             11|       12|BRACELET|         0.64|       69.0|
|             13|       14|BRACELET|          7.9|       59.0|
|             15|       16|BRACELET|        40.11|      210.0|
|             17|       18|BRACELET|         6.47|       49.0|
|             19|       20|EARRINGS|         3.81|       45.0|
+---------------+---------+--------+-------------+-----------+
only showing top 10 rows



In [6]:
"""
 Input:
    StoreId (integer): Código de la tienda.
    ProductId (integer): Código del producto.
    Date (date): fecha de la venta.
    Quantity (integer): unidades vendidas (los valores negativos corresponden a devoluciones).
    rows -> 597426
"""

sales_df.show(10)

+-------+---------+----------+--------+
|StoreId|ProductId|      Date|Quantity|
+-------+---------+----------+--------+
|      2|     6652|2020-12-29|       1|
|     50|     6652|2020-06-30|       1|
|      8|     6652|2020-02-11|       1|
|     30|     6652|2020-01-11|       1|
|     41|     6652|2019-05-24|       0|
|     25|     6652|2019-01-16|      -1|
|     27|     6652|2019-02-14|       1|
|      8|     6652|2020-02-10|      -1|
|     10|     6652|2019-02-13|       1|
|     50|     6652|2020-11-06|       1|
+-------+---------+----------+--------+
only showing top 10 rows



In [8]:
"""
 Input:
    StoreId (integer): Código de la tienda.
    ProductId (integer): Código del producto.
    Date (date): fecha del movimiento.
    Quantity (integer): unidades de entrada (+) o salida (-).
    rows -Z 1891344
"""

stock_df.show(10)

+-------+---------+----------+--------+
|StoreId|ProductId|      Date|Quantity|
+-------+---------+----------+--------+
|     13|     4666|2017-01-01|       2|
|      1|     4666|2017-01-01|       0|
|     14|     2829|2017-01-01|       1|
|      2|     2829|2017-01-31|      -1|
|      2|     2829|2017-06-30|       1|
|      2|     2829|2018-01-21|      -1|
|      2|     2829|2017-01-01|       1|
|     37|     2829|2017-01-31|       1|
|     14|     2829|2017-09-30|       4|
|     14|     2829|2017-07-20|      -1|
+-------+---------+----------+--------+
only showing top 10 rows



# Calculo del stock diario
A partir de los movimientos de stock hay que calcular el stock diario desde el 01/01/2019, a nivel de código raíz de producto

In [10]:
stock_union_df = stock_df.join(products_df.drop("Family", "SupplierPrice", "RetailPrice"), ['productId']).filter(stock_union_df.ProductRootCode.isNotNull())
stock_union_df.show(10)

+---------+-------+----------+--------+---------------+
|ProductId|StoreId|      Date|Quantity|ProductRootCode|
+---------+-------+----------+--------+---------------+
|     4666|     13|2017-01-01|       2|           4665|
|     4666|      1|2017-01-01|       0|           4665|
|     2829|     14|2017-01-01|       1|           2828|
|     2829|      2|2017-01-31|      -1|           2828|
|     2829|      2|2017-06-30|       1|           2828|
|     2829|      2|2018-01-21|      -1|           2828|
|     2829|      2|2017-01-01|       1|           2828|
|     2829|     37|2017-01-31|       1|           2828|
|     2829|     14|2017-09-30|       4|           2828|
|     2829|     14|2017-07-20|      -1|           2828|
+---------+-------+----------+--------+---------------+
only showing top 10 rows



In [16]:
# Agrupamos por Producto Raíz, tienda y fecha para sumar las cantidades y ver como se fue reduciendo o manteniendo el stock en el paso de los días
stock_grouped_df = stock_union_df.groupBy("ProductRootCode", "StoreId", "Date").sum("Quantity").withColumnRenamed("sum(Quantity)", "Quantity").orderBy("StoreId", "ProductRootCode", "Date")
stock_grouped_df.show()

In [29]:
# Para el calculo del stock se usa las ventanas de tiempo de spark para entender como fue moviendose la mercancia por tienda
from pyspark.sql import Window as W, functions as F
w = W.partitionBy(["ProductRootCode", "StoreId"]).orderBy("StoreId", "ProductRootCode", "Date")
stock_date_df = stock_grouped_df.withColumn("stock", F.sum("Quantity").over(w))
stock_date_df.cache()
stock_date_df.show()

DataFrame[ProductRootCode: int, StoreId: int, Date: date, Quantity: bigint, stock: bigint]

In [32]:
# Creamos un valor temporal para aquellas fechas menores al 2019-01-01 de manera de ser usada para calcular el stock luego del 2019 inclusive

df = stock_date_df.withColumn("condicionAdicional", F.when(F.col("Date") < '2019-01-01', "menor").otherwise("mayor")).groupBy("ProductRootCode", "StoreId", "condicionAdicional").agg(F.max("Date").alias("Date"), F.max("Stock").alias("Stock")).drop("condicionAdicional")
df.orderBy("StoreId", "ProductRootCode", "Date").show()


In [38]:
"""
Ahora del valor temporal creados, forzamos la fecha al 2019 del maximo valor por tienda/prodcuto para aquellas con
fecha menor al 2019 y conservamos los demas posteriores a esa fecha
"""
df = df.withColumn("Date", F.when(df.Date < '2019-01-01', '2019-01-01').otherwise(col("Date"))).filter((df.Stock > 0) & (df.Date >= '2019-01-01'))

df.orderBy("StoreId", "ProductRootCode", "Date").show()



## Calculamos para cada código raíz de producto – tienda, los intervalos de fechas y el stock en ese intervalo.

In [102]:
df_daily = df.groupBy("ProductRootCode", "StoreId").agg(F.min("Date").alias("StartDate"), F.max("Date").alias("endDate"), F.min("Stock").alias("Stock")) \
            .withColumn("endDate", F.when(col("StartDate") == col("endDate"), '2020-12-31').otherwise(col("endDate"))) 

df_daily.show()


### Escribimos los resultados en el directorio

In [69]:

df_daily.coalesce(1).write.parquet("/Users/cefranlly.perez/sparkprojects/analyticalways/data/interval_stock/")


# Calcular KPIs

## Calculando el beneficio de cada producto
### sales.quantity * products.RetailPrice - stock.quantity * products.SupplierPrice
Siempre que stock.quantity > 0

In [236]:
"""
Agrupamos por ProductId y tienda
Tomamos en cuenta los valores positivos del stock ya que son los que aún se conservan en cada tienda (Pasivo)
Unimos por Producto y tienda para evitar los cross joins
"""
benefits_df = sales_df.groupBy("ProductId", "StoreId").agg(F.sum(sales_df.Quantity).alias('q_sales')) . \
    join(
        stock_df.filter(stock_df.Quantity > 0).groupBy("ProductId", "StoreId").agg(F.sum(stock_df.Quantity).alias('q_stock')), ['productId', 'StoreId']
    ). \
        join(
            products_df, 'productId'
        ). \
            drop("ProductRootCode", "Family")

In [237]:
benefits_df.show()

+---------+-------+-------+-------+-------------+-----------+
|ProductId|StoreId|q_sales|q_stock|SupplierPrice|RetailPrice|
+---------+-------+-------+-------+-------------+-----------+
|      158|     27|      1|      4|         1.32|       12.0|
|      162|      3|      5|      9|         1.32|       12.0|
|     1929|      8|     10|     32|         2.22|       19.0|
|    15089|     23|      5|     16|         2.22|       19.0|
|      150|     34|      7|      9|         1.85|       12.0|
|     1923|      7|      6|     25|         1.69|       12.0|
|      154|     13|      5|      9|         1.85|       12.0|
|      156|     22|      4|      6|         1.85|       12.0|
|     2418|      7|      0|      3|        23.33|      120.0|
|     2267|     27|      1|      8|          4.3|       29.0|
|     9375|      3|      2|      4|         7.22|       42.0|
|     9679|     37|      1|     22|         5.79|       42.0|
|    10688|     41|      1|      2|        20.62|      120.0|
|    100

In [238]:

benefits_product_df = sales_df.withColumn('year', F.year(F.to_timestamp('Date', 'yyyy-MM-dd'))). \
    groupBy("ProductId", "year").agg(F.sum(sales_df.Quantity).alias('q_sales')) . \
    join(
        stock_df.withColumn('year', F.year(F.to_timestamp('Date', 'yyyy-MM-dd'))). \
    filter(stock_df.Quantity > 0).groupBy("ProductId", "year").agg(F.sum(stock_df.Quantity).alias('q_stock')), ['productId', 'year']
    ). \
        join(
            products_df, 'productId'
        ). \
            drop("Family")

In [242]:
# Function para el calculo del beneficio
from pyspark.sql.types import DoubleType, DecimalType

def benefit(q_sale, q_stock, supplier_price, retail_price):    
    result = (q_sale * retail_price) - (q_stock * supplier_price)
    return result
calculate_udf = F.udf(benefit, DoubleType())

## Calculando el beneficio por familia producto y año

In [243]:
benefits_result_product_df = benefits_product_df. \
    groupBy("ProductRootCode", "year").agg(F.sum("q_sales").alias("q_sales"), F.sum("q_stock").alias("q_stock"), F.max("RetailPrice").alias("RetailPrice"), F.max("SupplierPrice").alias("SupplierPrice")). \
        withColumn("benefit", 
                    calculate_udf("q_sales", "q_stock", "SupplierPrice", "RetailPrice"))
benefits_result_product_df.show()



+---------------+----+-------+-------+-----------+-------------+------------------+
|ProductRootCode|year|q_sales|q_stock|RetailPrice|SupplierPrice|           benefit|
+---------------+----+-------+-------+-----------+-------------+------------------+
|          13150|2020|      2|      2|       42.0|         7.22|  69.5600004196167|
|           1644|2019|      1|      1|       79.0|        13.57| 65.43000030517578|
|           7862|2019|    674|   1976|       19.0|         3.24|6403.7599811553955|
|          14475|2019|      4|     17|       69.0|        10.65| 94.95000648498535|
|          14622|2019|      2|      8|       25.0|          4.3|15.599998474121094|
|           7613|2019|     89|    281|      120.0|        26.33|3281.2700214385986|
|           6846|2020|     67|     94|       35.0|         5.94|1786.6399946212769|
|           1830|2019|      2|      1|       62.0|        10.65|113.35000038146973|
|           2160|2019|      6|     12|       39.0|         6.14|160.32000160

## Calculando el beneficio por tienda y año

In [247]:
"""
Agrupamos por Tienda y año
Tomamos en cuenta los valores positivos del stock ya que son los que aún se conservan en cada tienda (Pasivo)
"""

benefits_retail_df = sales_df.withColumn('year', F.year(F.to_timestamp('Date', 'yyyy-MM-dd'))). \
    groupBy("ProductId", "StoreId", "year").agg(F.sum(sales_df.Quantity).alias('q_sales')) . \
    join(
        stock_df.withColumn('year', F.year(F.to_timestamp('Date', 'yyyy-MM-dd'))). \
    filter(stock_df.Quantity > 0).groupBy("ProductId", "StoreId", "year").agg(F.sum(stock_df.Quantity).alias('q_stock')), ['productId', 'StoreId','year']
    ). \
        join(
            products_df, 'productId'
        ). \
            drop("Family", "ProductRootCode", "ProductId")


calculate_udf = F.udf(benefit, DoubleType())
benefits_result_store_df = benefits_retail_df. \
        withColumn("benefit", 
                    calculate_udf("q_sales", "q_stock", "SupplierPrice", "RetailPrice"))

benefits_result_store_df.show()



+-------+----+-------+-------+-------------+-----------+-------------------+
|StoreId|year|q_sales|q_stock|SupplierPrice|RetailPrice|            benefit|
+-------+----+-------+-------+-------------+-----------+-------------------+
|     25|2020|      4|      5|         1.32|       12.0|  41.39999973773956|
|     14|2019|     10|     17|         2.22|       19.0|  152.2599995136261|
|     27|2019|      2|      3|         2.22|       19.0|  31.33999991416931|
|      3|2020|      4|      8|         1.85|       12.0|  33.19999980926514|
|      8|2019|      6|      8|         1.69|       12.0|  58.47999954223633|
|     18|2019|      1|      1|        58.05|      210.0| 151.95000076293945|
|     18|2019|      1|      3|         7.72|       45.0|  21.84000062942505|
|     10|2020|      1|      1|        15.29|       89.0|  73.71000003814697|
|     43|2020|      1|      1|         26.4|      130.0| 103.60000038146973|
|     18|2019|      1|      1|         26.4|      130.0| 103.60000038146973|

## Calculo de la rotación