In [1]:
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
import numpy as np
import findspark
import pyspark
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import lag, lead, first, last
from pyspark.sql.functions import row_number,lit
from pyspark.sql.types import DoubleType, IntegerType, StringType
import dtale
sc = SparkContext('local')
spark = SparkSession(sc)
sc

21/10/21 17:38:22 WARN Utils: Your hostname, Ankurs-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 192.168.1.11 instead (on interface en0)
21/10/21 17:38:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/10/21 17:38:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [27]:
raw_data = spark.read.load("../data/raw_data/raw_data.csv" ,format="csv", sep=",", inferSchema="true", header="true")

In [28]:
raw_data.columns

['Child Product',
 'CPG',
 'Month',
 'Shipment in Child Cases',
 'PPG',
 'Parent Prod',
 ' Shipment in parent Prod Cases ',
 'Inventory_Parent_Cases']

In [29]:
raw_data = raw_data.withColumnRenamed("Child Product", "Child_Product")\
                    .withColumnRenamed(" Shipment in parent Prod Cases ", "Shipment")
raw_data = raw_data.select("Child_Product", "CPG", "PPG","Month", "Shipment", "Inventory_Parent_Cases")

In [37]:
raw_data.coalesce(1).write.mode('overwrite').parquet("../data/raw_data/raw_data.parquet")

In [2]:
raw_data = spark.read.parquet("../data/raw_data/raw_data.parquet")

                                                                                

In [3]:
#convert the dataframe to Pandas
raw_data.limit(2).toPandas()

Unnamed: 0,Child_Product,CPG,PPG,Month,Shipment,Inventory_Parent_Cases
0,S1,C5,P1,201801,26,0
1,S2,C5,P1,201801,25,0


In [40]:
#Columns selection
col_sel = raw_data.select('Child_Product', 'CPG', 'Month')
col_sel.show(2)

+-------------+---+------+
|Child_Product|CPG| Month|
+-------------+---+------+
|           S1| C5|201801|
|           S2| C5|201801|
+-------------+---+------+
only showing top 2 rows



In [41]:
#Sorting in Spark
raw_data.sort("Shipment", ascending = False).show(2)

+-------------+---+---+------+--------+----------------------+
|Child_Product|CPG|PPG| Month|Shipment|Inventory_Parent_Cases|
+-------------+---+---+------+--------+----------------------+
|          S40| C2|P11|201903|     999|                     0|
|          S33| C2|P15|201805|     998|                    10|
+-------------+---+---+------+--------+----------------------+
only showing top 2 rows



In [42]:
raw_data.sort(F.desc('Shipment')).show(2)

+-------------+---+---+------+--------+----------------------+
|Child_Product|CPG|PPG| Month|Shipment|Inventory_Parent_Cases|
+-------------+---+---+------+--------+----------------------+
|          S40| C2|P11|201903|     999|                     0|
|          S33| C2|P15|201805|     998|                    10|
+-------------+---+---+------+--------+----------------------+
only showing top 2 rows



In [43]:
#cast columns
raw_data = raw_data.withColumn('CPG', F.col('CPG').cast(StringType()))

In [44]:
raw_data.toPandas().dtypes, raw_data.toPandas().shape

(Child_Product             object
 CPG                       object
 PPG                       object
 Month                      int32
 Shipment                  object
 Inventory_Parent_Cases     int32
 dtype: object,
 (2079, 6))

In [45]:
#Filter data in pyspark
_filter = raw_data.filter((raw_data.CPG == 'C1') & (raw_data.PPG == 'P1'))
_filter.show(2)

+-------------+---+---+------+--------+----------------------+
|Child_Product|CPG|PPG| Month|Shipment|Inventory_Parent_Cases|
+-------------+---+---+------+--------+----------------------+
|           S1| C1| P1|201801|       5|                     0|
|           S2| C1| P1|201801|      38|                     1|
+-------------+---+---+------+--------+----------------------+
only showing top 2 rows



In [49]:
#groupby operations in pyspark
_group = raw_data.groupby(['CPG', 'PPG', 'Month']).agg(F.sum('Shipment').alias('Shipment'))
_group.show(2)

+---+---+------+--------+
|CPG|PPG| Month|Shipment|
+---+---+------+--------+
| C2|P18|201803|    10.0|
| C3| P6|201803|    68.0|
+---+---+------+--------+
only showing top 2 rows



In [52]:
#Window functions
window = Window.partitionBy('CPG', 'PPG')
raw_data = raw_data.withColumn('len_ts', F.count('Shipment').over(window))\
                    .withColumn('sum_ts', F.sum('Shipment').over(window))
raw_data.limit(2).toPandas()

Unnamed: 0,Child_Product,CPG,PPG,Month,Shipment,Inventory_Parent_Cases,len_ts,sum_ts
0,S30,C2,P19,201801,13,1,26,447.0
1,S30,C2,P19,201802,14,2,26,447.0


In [53]:
raw_data.select('CPG', 'PPG').distinct().count()

                                                                                

96

In [4]:
rolled_data = raw_data.groupby('CPG', 'PPG', 'Month').agg(F.sum('Shipment').alias('Shipment'),
                                                         F.sum('Inventory_Parent_Cases').alias('Inventory_Parent_Cases'))

In [5]:
rolled_data = rolled_data.sort('CPG', 'PPG', 'Month')

In [6]:
rolled_data.limit(2).toPandas()

                                                                                

Unnamed: 0,CPG,PPG,Month,Shipment,Inventory_Parent_Cases
0,C1,P1,201801,43.0,1
1,C1,P1,201802,13.0,1


In [17]:
#Rolling average
window = Window.partitionBy('CPG', 'PPG').orderBy('Month').rowsBetween(-2,-1)
rolled_data = rolled_data.withColumn('rolling_average', F.mean('Shipment').over(window))
rolled_data.limit(2).toPandas()

                                                                                

Unnamed: 0,CPG,PPG,Month,Shipment,Inventory_Parent_Cases,rolling_average
0,C2,P19,201801,13.0,1,
1,C2,P19,201802,15.0,2,13.0


In [23]:
_filter = rolled_data.filter((raw_data.CPG == 'C1') & (raw_data.PPG == 'P1'))
_filter.limit(2).toPandas()

                                                                                

Unnamed: 0,CPG,PPG,Month,Shipment,Inventory_Parent_Cases,rolling_average
0,C1,P1,201801,43.0,1,
1,C1,P1,201802,13.0,1,43.0


In [33]:
calendar = rolled_data.select('Month').distinct().sort('Month')
calendar = calendar.withColumn('Month_ID', F.row_number().over(Window().orderBy(lit('A'))))
calendar.show(2)

21/10/21 22:51:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+------+--------+
| Month|Month_ID|
+------+--------+
|201801|       1|
|201802|       2|
+------+--------+
only showing top 2 rows





In [35]:
rolled_data = rolled_data.join(calendar, on = "Month", how = "inner")
rolled_data.limit(2).toPandas()

21/10/21 22:54:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Unnamed: 0,Month,CPG,PPG,Shipment,Inventory_Parent_Cases,rolling_average,Month_number,Month_ID
0,201801,C2,P19,13.0,1,,,1
1,201802,C2,P19,15.0,2,13.0,,2


In [1]:
# dtale.show(rolled_data.toPandas())