In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


# Upload the CSV manually in sidebar if UI shows upload, or simulate from a path
df = spark.read.option("header", True).csv("file:/Workspace/Users/Username/Superstore.csv")
df.show(5)

+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|       OrderID| OrderDate| Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|StoreID|      ProductID|       Category|Sub-Category|        Product Name|   Price|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|     1|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|   CG-12520|    Claire Gute| Consumer|United States|      Henderson|  Kentucky|      42420|  South|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somer

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id, rand

df_clean = df.select(
    col("OrderID"),
    col("ProductID"),
    col("OrderDate"),
    col("StoreID"),
    col("Quantity").cast("int"),
    col("Price").cast("double"),
    col("Row ID").alias("RowID")
)

df_clean.show(5)

+--------------+---------------+----------+-------+--------+--------+-----+
|       OrderID|      ProductID| OrderDate|StoreID|Quantity|   Price|RowID|
+--------------+---------------+----------+-------+--------+--------+-----+
|CA-2016-152156|FUR-BO-10001798| 11/8/2016|  South|       2|  261.96|    1|
|CA-2016-152156|FUR-CH-10000454| 11/8/2016|  South|       3|  731.94|    2|
|CA-2016-138688|OFF-LA-10000240| 6/12/2016|   West|       2|   14.62|    3|
|US-2015-108966|FUR-TA-10000577|10/11/2015|  South|       5|957.5775|    4|
|US-2015-108966|OFF-ST-10000760|10/11/2015|  South|       2|  22.368|    5|
+--------------+---------------+----------+-------+--------+--------+-----+
only showing top 5 rows


In [0]:
df_clean.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("bronze_retail")


In [0]:
bronze_df = spark.read.table("bronze_retail")
bronze_df.show(5)


+--------------+---------------+----------+--------+--------+-----+-------+
|       OrderID|      ProductID| OrderDate|Quantity|   Price|RowID|StoreID|
+--------------+---------------+----------+--------+--------+-----+-------+
|CA-2016-152156|FUR-BO-10001798| 11/8/2016|       2|  261.96|    1|  South|
|CA-2016-152156|FUR-CH-10000454| 11/8/2016|       3|  731.94|    2|  South|
|CA-2016-138688|OFF-LA-10000240| 6/12/2016|       2|   14.62|    3|   West|
|US-2015-108966|FUR-TA-10000577|10/11/2015|       5|957.5775|    4|  South|
|US-2015-108966|OFF-ST-10000760|10/11/2015|       2|  22.368|    5|  South|
+--------------+---------------+----------+--------+--------+-----+-------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import to_date

silver_df = bronze_df \
    .filter("Quantity > 0 AND Price > 0") \
    .withColumn("OrderDate", to_date(col("OrderDate"), "MM/dd/yyyy")) \
    .dropna(subset=["OrderID", "ProductID", "StoreID", "OrderDate", "Quantity", "Price"])

silver_df.show(5)


+--------------+---------------+----------+--------+--------+-----+-------+
|       OrderID|      ProductID| OrderDate|Quantity|   Price|RowID|StoreID|
+--------------+---------------+----------+--------+--------+-----+-------+
|US-2015-108966|FUR-TA-10000577|2015-10-11|       5|957.5775|    4|  South|
|US-2015-108966|OFF-ST-10000760|2015-10-11|       2|  22.368|    5|  South|
|US-2015-118983|OFF-AP-10002311|2015-11-22|       5|   68.81|   15|Central|
|US-2015-118983|OFF-BI-10000756|2015-11-22|       3|   2.544|   16|Central|
|CA-2014-105893|OFF-ST-10004186|2014-11-11|       6|  665.88|   17|Central|
+--------------+---------------+----------+--------+--------+-----+-------+
only showing top 5 rows


In [0]:
silver_df.write.format("delta").mode("overwrite").saveAsTable("silver_retail")


In [0]:
invalid_rows = bronze_df.filter("Quantity <= 0 OR Price <= 0 OR OrderID IS NULL")
invalid_rows.write.format("delta").mode("overwrite").saveAsTable("invalid_records")


In [0]:
silver_df = spark.read.table("silver_retail")
silver_df.show(5)


+--------------+---------------+----------+--------+--------+-----+-------+
|       OrderID|      ProductID| OrderDate|Quantity|   Price|RowID|StoreID|
+--------------+---------------+----------+--------+--------+-----+-------+
|US-2015-108966|FUR-TA-10000577|2015-10-11|       5|957.5775|    4|  South|
|US-2015-108966|OFF-ST-10000760|2015-10-11|       2|  22.368|    5|  South|
|US-2015-118983|OFF-AP-10002311|2015-11-22|       5|   68.81|   15|Central|
|US-2015-118983|OFF-BI-10000756|2015-11-22|       3|   2.544|   16|Central|
|CA-2014-105893|OFF-ST-10004186|2014-11-11|       6|  665.88|   17|Central|
+--------------+---------------+----------+--------+--------+-----+-------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import sum, countDistinct

gold_df = silver_df.groupBy("StoreID", "OrderDate").agg(
    sum(col("Quantity")).alias("TotalItemsSold"),
    sum(col("Quantity") * col("Price")).alias("TotalRevenue"),
    countDistinct("OrderID").alias("TotalOrders")
)

gold_df.show(10)


+-------+----------+--------------+------------------+-----------+
|StoreID| OrderDate|TotalItemsSold|      TotalRevenue|TotalOrders|
+-------+----------+--------------+------------------+-----------+
|Central|2017-10-26|             5|           143.542|          2|
|   East|2017-12-18|            47|         33141.294|          4|
|Central|2014-11-18|            33| 7234.429999999999|          4|
|   West|2015-10-20|            19|          2433.678|          2|
|  South|2015-12-25|            17|2502.3799999999997|          2|
|  South|2015-11-30|            21|1754.2160000000001|          4|
|Central|2014-10-17|            10|           424.452|          2|
|Central|2016-12-27|            14| 6879.360000000001|          2|
|Central|2016-10-29|             5|             335.0|          1|
|Central|2014-12-30|            16|1592.7939999999999|          2|
+-------+----------+--------------+------------------+-----------+
only showing top 10 rows


In [0]:
gold_df.write.format("delta").mode("overwrite").saveAsTable("gold_retail")


In [0]:
top_products_df = silver_df.groupBy("ProductID").agg(
    sum(col("Quantity")).alias("TotalUnitsSold"),
    sum(col("Quantity") * col("Price")).alias("TotalProductRevenue")
).orderBy(col("TotalProductRevenue").desc())

top_products_df.write.format("delta").mode("overwrite").saveAsTable("top_products")
