### To read Excel File using PySpark, you fist need to read the file using Pandas then convert to PySPark Dataframe

## Step 1: Import all necessary libries

In [None]:
df = pd.read_excel("Sales-Data_01.xlsx")

In [None]:
df.head()

In [None]:
df.to_csv("sales_data.csv", index=False)

In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
#Start spark session
#We need to set a App Name, this might take longer if doing it for the first time.
spark = SparkSession.builder.config("spark.driver.host", "localhost").appName('tutorial').getOrCreate()

In [3]:
spark

**Read Data using Spark**

# Task 1: Sales Data

In [4]:
df_pyspark_2 = spark.read.csv("sales_data.csv", header=True, inferSchema=True)
#InferSchema endures the right datatype are in each individual columns

## Step 3: Data Transformation & Cleaning

In [5]:
#View each columns datatypes
df_pyspark_2.printSchema()

root
 |-- S/N: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Branch: string (nullable = true)
 |-- Pizza Sold: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Sales Discount: double (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Time Range: string (nullable = true)



In [6]:
df_pyspark_2.show()

+---+-------------------+-------+----------------+-----+--------+--------------+--------------------+-------------+
|S/N|               Date| Branch|      Pizza Sold|Price|Quantity|Sales Discount|                Time|   Time Range|
+---+-------------------+-------+----------------+-----+--------+--------------+--------------------+-------------+
|  1|2021-06-10 00:00:00|  Italy|         Meatzaa| 2000|       5|          0.03| 2022-12-17 15:57:36|Before 9:00am|
|  2|2020-10-05 00:00:00| France|    Extravaganza| 2000|       4|          0.07|2022-12-17 17:54:...|Before 9:00am|
|  3|2020-11-04 00:00:00|  Spain|     BBQ Chicken| 4000|       5|          0.05|2022-12-17 09:04:...|Before 9:00am|
|  4|2021-10-27 00:00:00| France|    Extravaganza| 2000|       1|           0.0|2022-12-17 16:03:...|Before 9:00am|
|  5|2020-10-20 00:00:00|England|         Meatzaa| 2000|       4|           0.0|2022-12-17 08:54:...|Before 9:00am|
|  6|2020-09-10 00:00:00|Germany|      Hot Veggie| 4000|       2|       

In [7]:
from pyspark.sql.functions import to_date, col
df_pyspark_2 = df_pyspark_2.withColumn("Total_Amount_Sold", col("Quantity") * col("Price")*(1-col("Sales Discount")))


In [8]:
df_pyspark_2.select(["Pizza Sold","Total_Amount_Sold"]).show()

+----------------+-----------------+
|      Pizza Sold|Total_Amount_Sold|
+----------------+-----------------+
|         Meatzaa|           9700.0|
|    Extravaganza|7439.999999999999|
|     BBQ Chicken|          19000.0|
|    Extravaganza|           2000.0|
|         Meatzaa|           8000.0|
|      Hot Veggie|           7200.0|
|BBQ Philly Steak|          19000.0|
|   Chicken Feast|           2000.0|
|         Meatzaa|           5700.0|
|    Chicken Suya|          20000.0|
|  Chicken Legend|           9700.0|
|BBQ Philly Steak|          15200.0|
|    Chicken Suya|           7600.0|
|   Chicken Feast|           9700.0|
|   Chicken Feast|           8000.0|
|       Beef Suya|          14250.0|
|   Chicken Feast|           9500.0|
|      Hot Veggie|          19400.0|
|         Meatzaa|           9000.0|
|         Meatzaa|           3800.0|
+----------------+-----------------+
only showing top 20 rows



### Final Aggregate Value for the Data Analyst

In [9]:
df_pyspark_2.groupBy("Pizza Sold").agg({"Total_Amount_Sold":"sum"}).show()

+-------------------+----------------------+
|         Pizza Sold|sum(Total_Amount_Sold)|
+-------------------+----------------------+
|     Pepperoni Suya|             3109110.0|
|      Chicken Feast|             1681880.0|
|          Beef Suya|             2839650.0|
|          Margarita|             3358920.0|
|Hot Pepperoni Feast|             4125800.0|
|   BBQ Philly Steak|             3669920.0|
|     Veggie Supreme|             2916810.0|
|         Hot Veggie|             3662480.0|
|    Pepperoni Feast|             3483400.0|
|            Meatzaa|             1620900.0|
|       Extravaganza|             1752760.0|
|        BBQ Chicken|             3466680.0|
|       Chicken Bali|             1716320.0|
|           Italiano|             2842020.0|
|     Chicken Legend|             1702500.0|
|       Chicken Suya|             3675360.0|
+-------------------+----------------------+



In [10]:
df_pyspark_2 = df_pyspark_2.groupBy("Pizza Sold").agg({"Total_Amount_Sold":"sum"})

In [11]:
df_pyspark_2.show()

+-------------------+----------------------+
|         Pizza Sold|sum(Total_Amount_Sold)|
+-------------------+----------------------+
|     Pepperoni Suya|             3109110.0|
|      Chicken Feast|             1681880.0|
|          Beef Suya|             2839650.0|
|          Margarita|             3358920.0|
|Hot Pepperoni Feast|             4125800.0|
|   BBQ Philly Steak|             3669920.0|
|     Veggie Supreme|             2916810.0|
|         Hot Veggie|             3662480.0|
|    Pepperoni Feast|             3483400.0|
|            Meatzaa|             1620900.0|
|       Extravaganza|             1752760.0|
|        BBQ Chicken|             3466680.0|
|       Chicken Bali|             1716320.0|
|           Italiano|             2842020.0|
|     Chicken Legend|             1702500.0|
|       Chicken Suya|             3675360.0|
+-------------------+----------------------+



# Task 2: Target Data

In [12]:
df_pyspark_1 = spark.read.csv("Sales-Target.csv", header=True, inferSchema=True)
#InferSchema endures the right datatype are in each individual columns

In [13]:
df_pyspark_1.show()

+------+----+-------+--------------------+----------------+-----------+------------+-------------+--------------+------------+------------------+-------------------+-----------+------------------+--------------------+-----------+---------------+--------------+--------------+-----------+--------------------+
| Years|Date| Branch|         BBQ Chicken|BBQ Philly Steak|  Beef Suya|Chicken Bali|Chicken Feast|Chicken Legend|Chicken Suya|      Extravaganza|Hot Pepperoni Feast| Hot Veggie|          Italiano|           Margarita|    Meatzaa|Pepperoni Feast|Pepperoni Suya|Veggie Supreme|Unnamed: 19|© UrBizEdge Limited |
+------+----+-------+--------------------+----------------+-----------+------------+-------------+--------------+------------+------------------+-------------------+-----------+------------------+--------------------+-----------+---------------+--------------+--------------+-----------+--------------------+
|2020.0| Jan|England|           2791833.6|       1872384.0| 1.005396E7|  

In [14]:
#View each columns datatypes
df_pyspark_1.printSchema()

root
 |-- Years: double (nullable = true)
 |-- Date: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- BBQ Chicken: double (nullable = true)
 |-- BBQ Philly Steak: double (nullable = true)
 |-- Beef Suya: double (nullable = true)
 |-- Chicken Bali: double (nullable = true)
 |-- Chicken Feast: double (nullable = true)
 |-- Chicken Legend: double (nullable = true)
 |-- Chicken Suya: double (nullable = true)
 |-- Extravaganza: double (nullable = true)
 |-- Hot Pepperoni Feast: double (nullable = true)
 |-- Hot Veggie: double (nullable = true)
 |-- Italiano: double (nullable = true)
 |-- Margarita: double (nullable = true)
 |-- Meatzaa: double (nullable = true)
 |-- Pepperoni Feast: double (nullable = true)
 |-- Pepperoni Suya: double (nullable = true)
 |-- Veggie Supreme: double (nullable = true)
 |-- Unnamed: 19: string (nullable = true)
 |-- © UrBizEdge Limited : string (nullable = true)



In [15]:
# delete two columns
df_pyspark_1 = df_pyspark_1.drop(*('Unnamed: 19',"© UrBizEdge Limited "))


In [16]:
df_pyspark_1.columns

['Years',
 'Date',
 'Branch',
 'BBQ Chicken',
 'BBQ Philly Steak',
 'Beef Suya',
 'Chicken Bali',
 'Chicken Feast',
 'Chicken Legend',
 'Chicken Suya',
 'Extravaganza',
 'Hot Pepperoni Feast',
 'Hot Veggie',
 'Italiano',
 'Margarita',
 'Meatzaa',
 'Pepperoni Feast',
 'Pepperoni Suya',
 'Veggie Supreme']

In [17]:
df_pyspark_1= df_pyspark_1.withColumnRenamed("BBQ Chicken","BBQ_Chicken")
df_pyspark_1= df_pyspark_1.withColumnRenamed("BBQ Philly Steak","BBQ_Philly_Steak")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Beef Suya","Beef_Suya")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Chicken Bali","Chicken_Bali")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Chicken Feast","Chicken_Feast")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Chicken Legend","Chicken_Legend")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Chicken Suya","Chicken_Suya")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Hot Pepperoni Feast","Hot_Pepperoni_Feast")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Hot Veggie","Hot_Veggie")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Pepperoni Feast","Pepperoni_Feast")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Pepperoni Suya","Pepperoni_Suya")
df_pyspark_1= df_pyspark_1.withColumnRenamed("Veggie Supreme","Veggie_Supreme")

## You Need to UnPivot some columns

In [18]:
from pyspark.sql.functions import expr
from pyspark.sql.functions import array
from pyspark.sql.functions import struct

In [19]:
#Note: Ensure there is no spacing when writing the code
unpivotExpr = "stack(16, 'BBQ_Chicken', BBQ_Chicken, 'BBQ_Philly_Steak', BBQ_Philly_Steak, 'Beef_Suya', Beef_Suya, 'Chicken_Bali', Chicken_Bali, 'Chicken_Feast', Chicken_Feast, 'Chicken_Legend', Chicken_Legend, 'Chicken_Suya', Chicken_Suya, 'Extravaganza', Extravaganza, 'Hot_Pepperoni_Feast', Hot_Pepperoni_Feast, 'Hot_Veggie', Hot_Veggie, 'Italiano', Italiano, 'Margarita', Margarita, 'Meatzaa', Meatzaa, 'Pepperoni_Feast', Pepperoni_Feast, 'Pepperoni_Suya', Pepperoni_Suya, 'Veggie_Supreme', Veggie_Supreme) as (Pizza_Products,Target_Amount_Sold)"
#columns = ['Years', 'Date', 'Branch']
#df_pyspark_1.select(array(columns))
df_pyspark_1 = df_pyspark_1.select("Years","Date","Branch", expr(unpivotExpr))
#df_pyspark_1 = df_pyspark_1.select(['Years', 'Date', 'Branch'], expr(unpivotExpr))

In [20]:
df_pyspark_1.show()

+------+----+-------+-------------------+------------------+
| Years|Date| Branch|     Pizza_Products|Target_Amount_Sold|
+------+----+-------+-------------------+------------------+
|2020.0| Jan|England|        BBQ_Chicken|         2791833.6|
|2020.0| Jan|England|   BBQ_Philly_Steak|         1872384.0|
|2020.0| Jan|England|          Beef_Suya|        1.005396E7|
|2020.0| Jan|England|       Chicken_Bali|         2075048.0|
|2020.0| Jan|England|      Chicken_Feast|         2526216.0|
|2020.0| Jan|England|     Chicken_Legend|         3381012.0|
|2020.0| Jan|England|       Chicken_Suya|         3295864.0|
|2020.0| Jan|England|       Extravaganza|         2099229.6|
|2020.0| Jan|England|Hot_Pepperoni_Feast|         1824960.0|
|2020.0| Jan|England|         Hot_Veggie|         2196135.0|
|2020.0| Jan|England|           Italiano|         2831976.0|
|2020.0| Jan|England|          Margarita|         6102995.2|
|2020.0| Jan|England|            Meatzaa|         7971931.0|
|2020.0| Jan|England|   

In [24]:
'''from pyspark.sql.functions import upward_fill, downward_fill

# Fill up the column
#df = df.withColumn("column_name", fillna(upward_fill("column_name")))

# Fill down the column
df_pyspark_1.withColumn("Years", fillna(downward_fill("Years")))'''
#You need to update the Pyspark to be able to use the Fillna, Fill_Up & Fill_Down

'from pyspark.sql.functions import upward_fill, downward_fill\n\n# Fill up the column\n#df = df.withColumn("column_name", fillna(upward_fill("column_name")))\n\n# Fill down the column\ndf_pyspark_1.withColumn("Years", fillna(downward_fill("Years")))'

## Aggregate(GroupBy) the Pizza and Amount SOld

In [25]:
df_pyspark_1.columns

['Years', 'Date', 'Branch', 'Pizza_Products', 'Target_Amount_Sold']

In [26]:
df_pyspark_1 = df_pyspark_1.groupBy("Pizza_Products").agg({"Target_Amount_Sold":"sum"})
df_pyspark_1.show()

+-------------------+-----------------------+
|     Pizza_Products|sum(Target_Amount_Sold)|
+-------------------+-----------------------+
|      Chicken_Feast|    5.870164388000001E8|
|     Veggie_Supreme|           6.12804425E8|
|     Chicken_Legend|          6.127479926E8|
|          Margarita|   1.9355369595199997E9|
|       Chicken_Bali|          5.767242822E8|
|   BBQ_Philly_Steak|           5.88206461E8|
|        BBQ_Chicken|    9.600649076800001E8|
|       Chicken_Suya|          5.679765358E8|
|            Meatzaa|         1.8048419424E9|
|       Extravaganza|   4.7222161423999995E8|
|    Pepperoni_Feast|           5.88681188E8|
|         Hot_Veggie|    6.020760175999999E8|
|     Pepperoni_Suya|    5.891687681999999E8|
|Hot_Pepperoni_Feast|          5.871240264E8|
|           Italiano|    4.669655563199999E8|
|          Beef_Suya|   2.3177112623999996E9|
+-------------------+-----------------------+

