In [1]:
import findspark
findspark.init('/usr/local/spark')

import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("ETL-Assessment").getOrCreate()
spark

In [3]:
# 1. Load all the files using SparkSession. (PySpark -Dataframe)
cust_dimen = spark.read.csv("/home/hduser/deloitte_24/Datasets - Project/cust_dimen.csv", header = True, inferSchema=True)

In [4]:
market_fact = spark.read.csv("/home/hduser/deloitte_24/Datasets - Project/market_fact.csv", header = True, inferSchema=True)
orders_dimen = spark.read.csv("/home/hduser/deloitte_24/Datasets - Project/orders_dimen.csv", header = True, inferSchema=True)
prod_dimen = spark.read.csv("/home/hduser/deloitte_24/Datasets - Project/prod_dimen.csv", header = True, inferSchema=True)
shipping_dimen = spark.read.csv("/home/hduser/deloitte_24/Datasets - Project/shipping_dimen.csv", header = True, inferSchema=True)

In [5]:
cust_dimen.printSchema()
market_fact.printSchema()
orders_dimen.printSchema()
prod_dimen.printSchema()
shipping_dimen.printSchema()

root
 |-- Customer_Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Cust_id: string (nullable = true)

root
 |-- Ord_id: string (nullable = true)
 |-- Prod_id: string (nullable = true)
 |-- Ship_id: string (nullable = true)
 |-- Cust_id: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order_Quantity: integer (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping_Cost: double (nullable = true)
 |-- Product_Base_Margin: string (nullable = true)

root
 |-- Order_ID: integer (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Ord_id: string (nullable = true)

root
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Prod_id: string (nullable = true)

root
 |-- Order_ID: integer (nullable = true)
 

In [6]:
# 2. Join all the Data frames and create a new Data frame called Full_DataFrame in such a way
# that the new data frame does not contain duplicate columns. 
# (cust_dimen, market_fact, orders_dimen, prod_dimen, shipping_dimen)


#Renaming the column to avoid duplicate column names
shipping_dimen = shipping_dimen.withColumnRenamed("Ship_id","Ship_id_shipping_dimen")

In [7]:
join1 = cust_dimen.join(market_fact,"Cust_id","inner")
join2 = join1.join(orders_dimen,"Ord_id","inner")
join3 = join2.join(prod_dimen,"Prod_id","inner")
Full_DataFrame = join3.join(shipping_dimen,"Order_ID","inner")

In [8]:
Full_DataFrame.printSchema()

root
 |-- Order_ID: integer (nullable = true)
 |-- Prod_id: string (nullable = true)
 |-- Ord_id: string (nullable = true)
 |-- Cust_id: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Ship_id: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order_Quantity: integer (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping_Cost: double (nullable = true)
 |-- Product_Base_Margin: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Ship_id_shipping_dimen: string (nullable = true)



In [9]:
#Convert the Order_Date and Ship_Date columns type into Date type. And print the schema and 
#show the top 5 records for Order_Date and Ship_Date columns

from pyspark.sql.functions import *

#converting datatype of Order_Date(string) to a new columnn Order_dates (date datatype)
Full_DataFrame = Full_DataFrame.withColumn("Order_Dates",to_date(Full_DataFrame.Order_Date, "dd-MM-yyy"))

#converting datatype of Ship_Date(string) to a new columnn Ship_dates (date datatype)
Full_DataFrame = Full_DataFrame.withColumn("Ship_Dates",to_date(Full_DataFrame.Ship_Date, "dd-MM-yyy"))
Full_DataFrame.printSchema()

root
 |-- Order_ID: integer (nullable = true)
 |-- Prod_id: string (nullable = true)
 |-- Ord_id: string (nullable = true)
 |-- Cust_id: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Ship_id: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order_Quantity: integer (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping_Cost: double (nullable = true)
 |-- Product_Base_Margin: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Ship_id_shipping_dimen: string (nullable = true)
 |-- Order_Dates: date (nullable = true)
 |-- 

In [10]:
#show the top 5 records for Order_Date and Ship_Date columns
Full_DataFrame.select("Order_Dates","Ship_Dates").show(5)

+-----------+----------+
|Order_Dates|Ship_Dates|
+-----------+----------+
| 2010-10-13|2010-10-20|
| 2012-10-01|2012-10-03|
| 2012-10-01|2012-10-02|
| 2012-10-01|2012-10-03|
| 2012-10-01|2012-10-02|
+-----------+----------+
only showing top 5 rows



In [11]:
#4.Find the top 3 customers who have the maximum number of orders.
df_max_ord = Full_DataFrame.groupBy("Cust_id").agg(count("Order_ID").alias("Total_Order")).orderBy("Total_Order",ascending=False).limit(3)
df_max_ord.show()                                                   

+---------+-----------+
|  Cust_id|Total_Order|
+---------+-----------+
|Cust_1140|         62|
| Cust_572|         52|
| Cust_188|         47|
+---------+-----------+



In [12]:
#5.Create a new column DaysTakenForDelivery that contains the 
#date difference between Order_Date and Ship_Date

Full_DataFrame = Full_DataFrame.withColumn("DaysTakenForDelivery",datediff(Full_DataFrame.Ship_Dates,Full_DataFrame.Order_Dates))
Full_DataFrame.show(5)

+--------+-------+------+-------+------------------+--------+-------+----------------+-------+--------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+--------------+----------+----------------------+-----------+----------+--------------------+
|Order_ID|Prod_id|Ord_id|Cust_id|     Customer_Name|Province| Region|Customer_Segment|Ship_id|   Sales|Discount|Order_Quantity| Profit|Shipping_Cost|Product_Base_Margin|Order_Date|Order_Priority|Product_Category|Product_Sub_Category|     Ship_Mode| Ship_Date|Ship_id_shipping_dimen|Order_Dates|Ship_Dates|DaysTakenForDelivery|
+--------+-------+------+-------+------------------+--------+-------+----------------+-------+--------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+--------------+----------+----------------------+-----------+----------+--------------------+
|       3| Prod_1| 

In [13]:
#6.Find the customer whose order took the maximum time to get delivered

max_time = Full_DataFrame.orderBy("DaysTakenForDelivery",ascending=False).limit(1).select("Cust_id","DaysTakenForDelivery")
max_time.show()

+---------+--------------------+
|  Cust_id|DaysTakenForDelivery|
+---------+--------------------+
|Cust_1460|                  92|
+---------+--------------------+



In [14]:
#7. Using the windows function, retrieve total sales made by each product from the data.

from pyspark.sql.window import Window

window_spec = Window.partitionBy(Full_DataFrame.Prod_id).orderBy(Full_DataFrame.Sales)

total_sale = Full_DataFrame.withColumn("Total_Sales",sum(Full_DataFrame.Sales).over(window_spec))

total_sale.show(5)

+--------+-------+--------+---------+----------------+--------+-------+----------------+--------+-------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+-----------+----------+----------------------+-----------+----------+--------------------+-----------+
|Order_ID|Prod_id|  Ord_id|  Cust_id|   Customer_Name|Province| Region|Customer_Segment| Ship_id|  Sales|Discount|Order_Quantity| Profit|Shipping_Cost|Product_Base_Margin|Order_Date|Order_Priority|Product_Category|Product_Sub_Category|  Ship_Mode| Ship_Date|Ship_id_shipping_dimen|Order_Dates|Ship_Dates|DaysTakenForDelivery|Total_Sales|
+--------+-------+--------+---------+----------------+--------+-------+----------------+--------+-------+--------+--------------+-------+-------------+-------------------+----------+--------------+----------------+--------------------+-----------+----------+----------------------+-----------+----------+--------------------

In [15]:
#8. Count the total number of unique customers in January and how many of 
#them came back every month over the entire year in 2011.

#customers ordered in january
jan = Full_DataFrame.filter(month("Order_Dates")==1)

print("Unique customers in jan",jan.distinct().count())

#customers ordered in 2011
yr2011 = Full_DataFrame.filter(year("Order_Dates")==2011)

#join -> to get cutsomers who have returned
returning_cust = yr2011.join(jan,"Cust_Id","inner")

#selecting the unique columns
unique_cols = ["Order_ID","Prod_id","Ord_id",'Cust_id','Customer_Name','Province','Region','Customer_Segment',
               'Sales','Discount','Order_Quantity','Profit','Shipping_Cost','Product_Base_Margin','Order_Priority',
              'Product_Category','Product_Sub_Category','Ship_Mode','Ship_id','Order_Date','Order_Dates','Ship_Date','Ship_Dates','DaysTakenForDelivery']

returning_cust = Full_DataFrame.select(*unique_cols)
returning_cust.printSchema()


Unique customers in jan 1258
root
 |-- Order_ID: integer (nullable = true)
 |-- Prod_id: string (nullable = true)
 |-- Ord_id: string (nullable = true)
 |-- Cust_id: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Order_Quantity: integer (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping_Cost: double (nullable = true)
 |-- Product_Base_Margin: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Ship_id: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_Dates: date (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Ship_Dates: date (nullabl

In [16]:
#Save the above Q:8 output as a count_month.json file.

returning_cust.write.json('/home/hduser/deloitte_24/count_month.json')