Retail Orders ETL Pipeline

In [4]:
import findspark
findspark.init("/home/hadup/spark")

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Retail Orders ETL") \
    .getOrCreate()

Extract

In [8]:
df = spark.read.option("header", True).csv("hdfs://localhost:9000/user/hadup/retail_project/raw/orders.csv")

df.show(5)

#df.printSchema()

+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+
|Order Id|Order Date|     Ship Mode|  Segment|      Country|           City|     State|Postal Code|Region|       Category|Sub Category|     Product Id|cost price|List Price|Quantity|Discount Percent|
+--------+----------+--------------+---------+-------------+---------------+----------+-----------+------+---------------+------------+---------------+----------+----------+--------+----------------+
|       1|2023-03-01|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|   Bookcases|FUR-BO-10001798|       240|       260|       2|               2|
|       2|2023-08-15|  Second Class| Consumer|United States|      Henderson|  Kentucky|      42420| South|      Furniture|      Chairs|FUR-CH-10000454|       600|       730|       3|               3|


Transform

In [11]:
df = df.drop("Ship Mode", "Segment", "Country", "Postal Code")

df.show(5)

df.printSchema()

+--------+----------+---------------+----------+------+---------------+------------+---------------+----------+----------+--------+----------------+
|Order Id|Order Date|           City|     State|Region|       Category|Sub Category|     Product Id|cost price|List Price|Quantity|Discount Percent|
+--------+----------+---------------+----------+------+---------------+------------+---------------+----------+----------+--------+----------------+
|       1|2023-03-01|      Henderson|  Kentucky| South|      Furniture|   Bookcases|FUR-BO-10001798|       240|       260|       2|               2|
|       2|2023-08-15|      Henderson|  Kentucky| South|      Furniture|      Chairs|FUR-CH-10000454|       600|       730|       3|               3|
|       3|2023-01-10|    Los Angeles|California|  West|Office Supplies|      Labels|OFF-LA-10000240|        10|        10|       2|               5|
|       4|2022-06-18|Fort Lauderdale|   Florida| South|      Furniture|      Tables|FUR-TA-10000577|      

In [13]:
from pyspark.sql.functions import col, isnan, when, count

df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()

+--------+----------+----+-----+------+--------+------------+----------+----------+----------+--------+----------------+
|Order Id|Order Date|City|State|Region|Category|Sub Category|Product Id|cost price|List Price|Quantity|Discount Percent|
+--------+----------+----+-----+------+--------+------------+----------+----------+----------+--------+----------------+
|       0|         0|   0|    0|     0|       0|           0|         0|         0|         0|       0|               0|
+--------+----------+----+-----+------+--------+------------+----------+----------+----------+--------+----------------+



In [16]:
from pyspark.sql.types import DoubleType, IntegerType, FloatType, DateType

df = df.withColumn("Order Id", col("Order Id").cast(IntegerType())) \
        .withColumn("cost price", col("cost price").cast(DoubleType())) \
        .withColumn("List Price", col("List Price").cast(DoubleType())) \
        .withColumn("Quantity", col("Quantity").cast(IntegerType())) \
        .withColumn("Discount Percent", col("Discount Percent").cast(FloatType())) \
        .withColumn("Order Date", col("Order Date").cast(DateType())) 

df.printSchema()

root
 |-- Order Id: integer (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub Category: string (nullable = true)
 |-- Product Id: string (nullable = true)
 |-- cost price: double (nullable = true)
 |-- List Price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount Percent: float (nullable = true)



In [23]:
df = df.withColumn("Profit", col("List Price") - col("cost price"))

df = df.withColumn("Revenue", col("Profit") * col("Quantity"))

df.select("Product Id", "List Price", "cost price", "Profit", "Quantity", "Revenue").show(5)

+---------------+----------+----------+------+--------+-------+
|     Product Id|List Price|cost price|Profit|Quantity|Revenue|
+---------------+----------+----------+------+--------+-------+
|FUR-BO-10001798|     260.0|     240.0|  20.0|       2|   40.0|
|FUR-CH-10000454|     730.0|     600.0| 130.0|       3|  390.0|
|OFF-LA-10000240|      10.0|      10.0|   0.0|       2|    0.0|
|FUR-TA-10000577|     960.0|     780.0| 180.0|       5|  900.0|
|OFF-ST-10000760|      20.0|      20.0|   0.0|       2|    0.0|
+---------------+----------+----------+------+--------+-------+
only showing top 5 rows



In [24]:
df.show(5)

+--------+----------+---------------+----------+------+---------------+------------+---------------+----------+----------+--------+----------------+------+-------+
|Order Id|Order Date|           City|     State|Region|       Category|Sub Category|     Product Id|cost price|List Price|Quantity|Discount Percent|Profit|Revenue|
+--------+----------+---------------+----------+------+---------------+------------+---------------+----------+----------+--------+----------------+------+-------+
|       1|2023-03-01|      Henderson|  Kentucky| South|      Furniture|   Bookcases|FUR-BO-10001798|     240.0|     260.0|       2|             2.0|  20.0|   40.0|
|       2|2023-08-15|      Henderson|  Kentucky| South|      Furniture|      Chairs|FUR-CH-10000454|     600.0|     730.0|       3|             3.0| 130.0|  390.0|
|       3|2023-01-10|    Los Angeles|California|  West|Office Supplies|      Labels|OFF-LA-10000240|      10.0|      10.0|       2|             5.0|   0.0|    0.0|
|       4|2022-0