In [1]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName("sales-analytics").getOrCreate()
spark

24/06/08 11:03:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## EXTRACT

In [8]:
df_order = spark.read.option("header", "true").option("quote", '"').csv("file:///home/ngocthang/Documents/Code/Sales-Analytics/sales-analytics/data/raw-data/orders.csv")

                                                                                

In [22]:
df_order.show()

+-----------+---------------+---------------------+-------------+---------+------------+----------------+---------------------------------+-------------------+
|Customer ID|Customer Status|Date Order was placed|Delivery Date| Order ID|  Product ID|Quantity Ordered|Total Retail Price for This Order|Cost Price Per Unit|
+-----------+---------------+---------------------+-------------+---------+------------+----------------+---------------------------------+-------------------+
|        579|         Silver|            01-Jan-17|    07-Jan-17|123002578|220101400106|               2|                             92.6|               20.7|
|       7574|         SILVER|            01-Jan-17|    05-Jan-17|123004074|210201000009|               1|                             21.7|               9.95|
|      28861|           Gold|            01-Jan-17|    04-Jan-17|123000871|230100500068|               1|                              1.7|                0.8|
|      43796|           Gold|           

In [23]:
df_order.printSchema()

root
 |-- Customer ID: string (nullable = true)
 |-- Customer Status: string (nullable = true)
 |-- Date Order was placed: string (nullable = true)
 |-- Delivery Date: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Total Retail Price for This Order: string (nullable = true)
 |-- Cost Price Per Unit: string (nullable = true)



In [24]:
df_order.count()

                                                                                

185013

In [43]:
df_product = spark.read.\
    option("header", "true")\
    .option("quote", '"')\
    .csv("file:///home/ngocthang/Documents/Code/Sales-Analytics/sales-analytics/data/raw-data/product-supplier.csv")

In [44]:
df_product.count()

5504

In [45]:
df_product.show()

+------------+------------+-----------------+--------------------+--------------------+----------------+--------------------+-----------+
|  Product ID|Product Line| Product Category|       Product Group|        Product Name|Supplier Country|       Supplier Name|Supplier ID|
+------------+------------+-----------------+--------------------+--------------------+----------------+--------------------+-----------+
|210100100001|    Children|Children Outdoors|Outdoor things, Kids|Boy's and Girl's ...|              NO|Scandinavian Clot...|         50|
|210100100002|    Children|Children Outdoors|Outdoor things, Kids|   Children's Jacket|              ES| Luna sastreria S.A.|       4742|
|210100100003|    Children|Children Outdoors|Outdoor things, Kids|Children's Jacket...|              NO|Scandinavian Clot...|         50|
|210100100004|    Children|Children Outdoors|Outdoor things, Kids| Children's Rain Set|              NO|Scandinavian Clot...|         50|
|210100100005|    Children|Childre

## TRANSFORM

In [28]:
from pyspark.sql.functions import to_date, date_format, col, when, udf, count, isnan
from pyspark.sql.types import StringType

In [29]:
df_order = df_order.dropDuplicates()

In [30]:
# check if there is any wrong cloumn
df_order.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_order.columns]).show()



+-----------+---------------+---------------------+-------------+--------+----------+----------------+---------------------------------+-------------------+
|Customer ID|Customer Status|Date Order was placed|Delivery Date|Order ID|Product ID|Quantity Ordered|Total Retail Price for This Order|Cost Price Per Unit|
+-----------+---------------+---------------------+-------------+--------+----------+----------------+---------------------------------+-------------------+
|          0|              0|                    0|            0|       0|         0|               0|                                0|                  0|
+-----------+---------------+---------------------+-------------+--------+----------+----------------+---------------------------------+-------------------+



                                                                                

In [127]:
standardize = udf(lambda s : s.capitalize(), StringType()) # Capitalize the first letter and convert the rest to lowercase

df_order_transform = df_order.withColumn("Date Order was placed", to_date("Date Order was placed", "dd-MMM-yy")) \
                            .withColumn("Delivery Date", to_date("Delivery Date", "dd-MMM-yy")) \
                            .withColumn("Customer Status", standardize("Customer Status")) \
                            .withColumn("Total Retail Price for This Order", col("Total Retail Price for This Order").cast("double")) \
                            .withColumn("Cost Price Per Unit", col("Cost Price Per Unit").cast("double"))

In [128]:
df_order_transform = df_order_transform.withColumnRenamed("Date Order was placed", "Order Date").\
                                        withColumnRenamed("Total Retail Price for This Order", "Order Price").\
                                        withColumnRenamed("Cost Price Per Unit", "Cost Per Product").\
                                        withColumnRenamed("Quantity Ordered", "Order Quantity")

In [129]:
df_order_transform.count()

                                                                                

185013

In [130]:
df_order_transform.show(5)

[Stage 183:>                                                        (0 + 3) / 3]

+-----------+---------------+----------+-------------+---------+------------+--------------+-----------+----------------+
|Customer ID|Customer Status|Order Date|Delivery Date| Order ID|  Product ID|Order Quantity|Order Price|Cost Per Product|
+-----------+---------------+----------+-------------+---------+------------+--------------+-----------+----------------+
|      92956|         Silver|2017-01-02|   2017-01-02|123009195|230100100013|             2|      226.2|            58.9|
|      62039|           Gold|2017-01-03|   2017-01-03|123013358|220100100036|             1|       41.2|            20.7|
|      40267|         Silver|2017-01-05|   2017-01-05|123024632|210200600084|             1|       34.8|            14.9|
|      50664|           Gold|2017-01-07|   2017-01-07|123037269|240800200043|             2|       84.6|           18.45|
|      55786|         Silver|2017-01-07|   2017-01-07|123034300|210200600013|             1|       83.7|            41.8|
+-----------+-----------

                                                                                

In [131]:
df_order_transform.filter(col("Customer ID") == "579").show()

+-----------+---------------+----------+-------------+---------+------------+--------------+-----------+----------------+
|Customer ID|Customer Status|Order Date|Delivery Date| Order ID|  Product ID|Order Quantity|Order Price|Cost Per Product|
+-----------+---------------+----------+-------------+---------+------------+--------------+-----------+----------------+
|        579|         Silver|2018-09-28|   2018-09-28|123306088|240100100235|             1|       13.2|            5.65|
|        579|         Silver|2017-01-01|   2017-01-07|123002578|220101400106|             2|       92.6|            20.7|
|        579|         Silver|2021-09-24|   2021-09-24|124350126|220101400027|             2|      139.0|            31.7|
|        579|           Gold|2021-04-29|   2021-04-29|124209728|240100100458|             1|        6.7|            2.95|
+-----------+---------------+----------+-------------+---------+------------+--------------+-----------+----------------+



In [132]:
df_order_transform.printSchema()

root
 |-- Customer ID: string (nullable = true)
 |-- Customer Status: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Delivery Date: date (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Order Quantity: string (nullable = true)
 |-- Order Price: double (nullable = true)
 |-- Cost Per Product: double (nullable = true)



//////////

In [46]:
df_product = df_product.dropDuplicates()

In [47]:
df_product.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_product.columns]).show()

+----------+------------+----------------+-------------+------------+----------------+-------------+-----------+
|Product ID|Product Line|Product Category|Product Group|Product Name|Supplier Country|Supplier Name|Supplier ID|
+----------+------------+----------------+-------------+------------+----------------+-------------+-----------+
|         0|           0|               0|            0|           0|               0|            0|          0|
+----------+------------+----------------+-------------+------------+----------------+-------------+-----------+



In [48]:
import pycountry

def get_country_name(country_code):
    try:
        country = pycountry.countries.get(alpha_2=country_code)
        if country:
            return country.name
        else:
            return None
    except:
        return None
    
get_country_name_udf = udf(get_country_name, StringType())

# Apply the UDF to the DataFrame column

df_product_transform = df_product.withColumn("Supplier Country Name", get_country_name_udf(col("Supplier Country")))

In [49]:
df_product_transform = df_product_transform.withColumnRenamed("Supplier Country", "Supplier Country Code")

In [54]:
df_product_transform.printSchema()

root
 |-- Product ID: string (nullable = true)
 |-- Product Line: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Product Group: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Supplier Country Code: string (nullable = true)
 |-- Supplier Name: string (nullable = true)
 |-- Supplier ID: string (nullable = true)
 |-- Supplier Country Name: string (nullable = true)



In [50]:
df_product_transform.show()

+------------+---------------+--------------------+--------------------+--------------------+---------------------+--------------------+-----------+---------------------+
|  Product ID|   Product Line|    Product Category|       Product Group|        Product Name|Supplier Country Code|       Supplier Name|Supplier ID|Supplier Country Name|
+------------+---------------+--------------------+--------------------+--------------------+---------------------+--------------------+-----------+---------------------+
|210200900019|       Children|     Children Sports|        Osprey, Kids|Osprey Cellerator...|                   US|Triple Sportswear...|       3664|        United States|
|220100100113|Clothes & Shoes|             Clothes|    Eclipse Clothing|Big Guy Men's Col...|                   US|         Eclipse Inc|       1303|        United States|
|220100100252|Clothes & Shoes|             Clothes|    Eclipse Clothing|Big Guy Men's Sol...|                   US|         Eclipse Inc|       13

In [135]:
dimCountry = df_product_transform.select(col("Supplier Country Code").alias("countryID"), col("Supplier Country Name").alias("countryName")).distinct()
dimCountry.show()

+---------+--------------+
|countryID|   countryName|
+---------+--------------+
|       NO|        Norway|
|       SE|        Sweden|
|       DE|       Germany|
|       FR|        France|
|       NL|   Netherlands|
|       GB|United Kingdom|
|       AU|     Australia|
|       DK|       Denmark|
|       US| United States|
|       CA|        Canada|
|       PT|      Portugal|
|       ES|         Spain|
|       BE|       Belgium|
+---------+--------------+



In [None]:
dimSup

save file as parquet format

In [65]:
df_order_parquet = df_order_transform.repartition(1)
df_order_parquet.write.mode('overwrite').parquet("D:\Code\DataEngieering-Projects\Projects\Sales-Analytics\data\data-transformed\order")

In [66]:
df_product_parquet = df_product_transform.repartition(1)
df_product_parquet.write.mode('overwrite').parquet("D:\Code\DataEngieering-Projects\Projects\Sales-Analytics\data\data-transformed\product")

### LOAD

load file to hdfs

In [10]:
order_file = spark.read.\
    parquet("file:///home/ngocthang/Documents/Code/Sales-Analytics/sales-analytics/data/data-transformed/order/part-00000-44c3a1bf-7c6a-4bb9-8a78-74cf24fb6596-c000.snappy.parquet")

                                                                                

In [11]:
order_file.show()

[Stage 2:>                                                          (0 + 1) / 1]

+-----------+---------------+----------+-------------+---------+------------+--------------+-----------+----------------+
|Customer ID|Customer Status|Order Date|Delivery Date| Order ID|  Product ID|Order Quantity|Order Price|Cost Per Product|
+-----------+---------------+----------+-------------+---------+------------+--------------+-----------+----------------+
|      92956|         Silver|02/01/2017|   02/01/2017|123009195|230100100013|             2|      226.2|            58.9|
|      62039|           Gold|03/01/2017|   03/01/2017|123013358|220100100036|             1|       41.2|            20.7|
|      40267|         Silver|05/01/2017|   05/01/2017|123024632|210200600084|             1|       34.8|            14.9|
|      50664|           Gold|07/01/2017|   07/01/2017|123037269|240800200043|             2|       84.6|           18.45|
|      55786|         Silver|07/01/2017|   07/01/2017|123034300|210200600013|             1|       83.7|            41.8|
|      46746|           

                                                                                

In [12]:
product_file = spark.read.\
    parquet("file:///home/ngocthang/Documents/Code/Sales-Analytics/sales-analytics/data/data-transformed/product/part-00000-7877a23d-2971-48eb-93e2-9db7fab3794f-c000.snappy.parquet")

In [13]:
product_file.show()

+------------+---------------+--------------------+--------------------+--------------------+---------------------+--------------------+-----------+---------------------+
|  Product ID|   Product Line|    Product Category|       Product Group|        Product Name|Supplier Country Code|       Supplier Name|Supplier ID|Supplier Country Name|
+------------+---------------+--------------------+--------------------+--------------------+---------------------+--------------------+-----------+---------------------+
|210200900019|       Children|     Children Sports|        Osprey, Kids|Osprey Cellerator...|                   US|Triple Sportswear...|       3664|        United States|
|220100100113|Clothes & Shoes|             Clothes|    Eclipse Clothing|Big Guy Men's Col...|                   US|         Eclipse Inc|       1303|        United States|
|220100100252|Clothes & Shoes|             Clothes|    Eclipse Clothing|Big Guy Men's Sol...|                   US|         Eclipse Inc|       13

In [14]:
order_file.write.mode('overwrite').parquet("hdfs://localhost:9000/sales-analytics-data/transformed-data/orderfile.parquet")

                                                                                

In [15]:
product_file.write.mode('overwrite').parquet("hdfs://localhost:9000/sales-analytics-data/transformed-data/productfile.parquet")

load to data warehouse

In [21]:
dimCountry = product_file.select(col("Supplier Country Name").alias("countryName"), col("Supplier Country Name").alias("countryName"))
dimCountry.show()

+-------------+---------------------+
|  countryName|Supplier Country Code|
+-------------+---------------------+
|United States|                   US|
|United States|                   US|
|United States|                   US|
|United States|                   US|
|United States|                   US|
|United States|                   US|
|  Netherlands|                   NL|
|  Netherlands|                   NL|
|       Canada|                   CA|
|United States|                   US|
|United States|                   US|
|    Australia|                   AU|
|  Netherlands|                   NL|
|    Australia|                   AU|
|        Spain|                   ES|
|United States|                   US|
|      Belgium|                   BE|
|United States|                   US|
|       Norway|                   NO|
|        Spain|                   ES|
+-------------+---------------------+
only showing top 20 rows

