In [0]:
oct_events = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
    header=True,
    inferSchema=True
)

In [0]:
nov_events = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    header=True,
    inferSchema=True
)

In [0]:
# Only use columns present in the table: x, y, z
df_duplicate = spark.table("workspace.ecommerce.Delta_oct_events").dropDuplicates(["x","y","z"])

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import current_timestamp


In [0]:
# Add ingestion metadata
bronze_df = oct_events.withColumn("ingestion_time", current_timestamp())

In [0]:
# Bronze Delta Path
bronze_path = "workspace.default.events_delta"


In [0]:
%sql
create volume if not exists workspace.ecommerce.bronze;
create volume if not exists workspace.ecommerce.silver;
create volume if not exists workspace.ecommerce.gold;

In [0]:
%sql
show volumes in workspace.ecommerce;

database,volume_name
ecommerce,bronze
ecommerce,ecommerce_data
ecommerce,gold
ecommerce,silver


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import current_timestamp
nov_events = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    header=True,
    inferSchema=True
)
oct_events = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
    header=True,
    inferSchema=True
)
raw = oct_events
bronze_df = raw.withColumn("ingestion_time", current_timestamp())

bronze_df.write\
    .format("delta")\
    .mode("append")\
    .save("/Volumes/workspace/ecommerce/bronze")

In [0]:
silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/silver")


In [0]:
# SILVER: Cleaned data
bronze = spark.read.format("delta").load("/Volumes/workspace/ecommerce/bronze")
silver = bronze.filter(F.col("price") > 0) \
    .filter(F.col("price") < 10000) \
    .dropDuplicates(["user_session", "event_time"]) \
    .withColumn("event_date", F.to_date("event_time")) \
    .withColumn("price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium"))
silver.write.format("delta")\
    .mode("append")\
    .save("/Volumes/workspace/ecommerce/silver")

In [0]:
# GOLD: Aggregates
silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/silver")
product_perf = silver.groupBy("product_id", "brand") \
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", "user_id")).alias("views"),
        F.countDistinct(F.when(F.col("event_type")=="purchase", "user_id")).alias("purchases"),
        F.sum(F.when(F.col("event_type")=="purchase", F.col("price").cast("double"))).alias("revenue")
    )\
    .withColumn("conversion_rate",
        F.when(F.col("views") > 0, F.col("purchases")/F.col("views")*100).otherwise(0)
    )
product_perf.write.format("delta").mode("append").save("/Volumes/workspace/ecommerce/gold")

In [0]:
gold = spark.read.format("delta").load("/Volumes/workspace/ecommerce/gold")
display(gold)

product_id,brand,views,purchases,revenue,conversion_rate
1004573,samsung,1,1,79034.34,100.0
12704683,nokian,1,1,6242.940000000001,100.0
3300488,redmond,1,1,13694.1,100.0
42000008,pt-group,1,1,875.2,100.0
8500290,,1,1,8143.460000000001,100.0
1307143,hp,1,1,8028.0,100.0
2800645,arg,1,1,4632.84,100.0
6800852,apacer,1,1,185.12,100.0
12717781,bfgoodrich,1,0,,0.0
20900533,xiaomi,1,1,163.72,100.0
