In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.bronze;
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.silver;
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.gold;


In [0]:
## Design 3 Layer Architecture
bronze_path = "dbfs:/Volumes/workspace/ecommerce/bronze/events"
silver_path = "dbfs:/Volumes/workspace/ecommerce/silver/events_clean"
gold_path   = "dbfs:/Volumes/workspace/ecommerce/gold/daily_sales"


In [0]:
dbutils.fs.rm(bronze_path, recurse=True)
dbutils.fs.rm(silver_path, recurse=True)
dbutils.fs.rm(gold_path, recurse=True)


False

In [0]:
## Bronze Layer
from pyspark.sql.functions import current_timestamp

events_nov = spark.read.csv(
    "dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    header=True,
    inferSchema=True
)

bronze_events = events_nov.withColumn(
    "ingestion_time",
    current_timestamp()
)

bronze_events.write \
    .format("delta") \
    .mode("overwrite") \
    .save(bronze_path)

print("Bronze layer created for November")


Bronze layer created for November


In [0]:
spark.read.format("delta").load(bronze_path).printSchema()


root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- ingestion_time: timestamp (nullable = true)



In [0]:
## Silver Layer
from pyspark.sql import functions as F

bronze_df = spark.read.format("delta").load(bronze_path)

silver_events = (
    bronze_df
    .filter(F.col("user_id").isNotNull())
    .filter(F.col("product_id").isNotNull())
    .filter(F.col("price") > 0)
    .dropDuplicates(["user_id", "event_time", "product_id"])
)

silver_events.write \
    .format("delta") \
    .mode("overwrite") \
    .save(silver_path)

print("Silver layer created")


Silver layer created


In [0]:
spark.read.format("delta").load(silver_path).printSchema()


root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- ingestion_time: timestamp (nullable = true)



In [0]:
## Gold Layer

from pyspark.sql import functions as F

silver_df = spark.read.format("delta").load(silver_path)

gold_daily_sales = (
    silver_df
    .withColumn("event_date", F.to_date("event_time"))
    .groupBy("event_date", "event_type")
    .agg(
        F.count("*").alias("total_events"),
        F.round(F.sum("price"), 2).alias("total_revenue")
    )
)

gold_daily_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_path)

print("Gold layer created")


Gold layer created


In [0]:
spark.read.format("delta").load(gold_path).show(10)


+----------+----------+------------+--------------+
|event_date|event_type|total_events| total_revenue|
+----------+----------+------------+--------------+
|2019-11-07|  purchase|       24861|    7061896.16|
|2019-11-10|      view|     1844865|5.4579721268E8|
|2019-11-17|  purchase|      185191| 5.777424917E7|
|2019-11-03|      cart|       19538|    6731075.23|
|2019-11-19|      view|     1626483|4.6377692778E8|
|2019-11-09|      view|     1782443|5.2136967144E8|
|2019-11-22|      cart|       67344| 1.709623936E7|
|2019-11-12|      cart|       67070| 1.779703573E7|
|2019-11-08|      cart|       74427| 2.000707914E7|
|2019-11-20|      cart|       69240| 1.708541646E7|
+----------+----------+------------+--------------+
only showing top 10 rows
