# Add widgets for parameters

In [0]:
from pyspark.sql import functions as F

In [0]:
dbutils.widgets.text("src_path","/Volumes/idc/idc_kaggle/ecom_data/2019-Nov.csv")

dbutils.widgets.dropdown("layer","bronze",["bronze","silver","gold"])


# Use parameters

In [0]:
src = dbutils.widgets.get("src_path")
layer = dbutils.widgets.get("layer")


In [0]:
bronze = "/Volumes/idc/idc_kaggle/ecom_data/ecom_oct/bronze/"
silver = "/Volumes/idc/idc_kaggle/ecom_data/ecom_oct/silver/"
gold = "/Volumes/idc/idc_kaggle/ecom_data/ecom_oct/gold/product/"

# Bronze layer

In [0]:
def bronze_run():
    raw = spark.read.csv(
        src,
        header=True,
        inferSchema=True
    )

    bronze_df = raw.withColumn("ingestion_time", F.current_timestamp())
    bronze_df.write.format("delta").mode("overwrite").save(bronze)

# Silver layer

In [0]:
def silver_run():
    bronze_df = spark.read.format('delta').load(bronze)
    silver_df = bronze_df.filter(F.col("price").between(0, 10000))\
    .dropDuplicates(["user_session","event_time"])\
    .withColumn("event_date", F.to_date(F.col("event_time")))\
    .withColumn("price_tier",
        F.when(F.col("price")<10,"budget")
        .when(F.col("price")<50,"affordable")
        .when(F.col("price")<100,"moderate")
        .when(F.col("price")<500,"expensive")
        .otherwise("luxury"))
    
    silver_df.write.format("delta").mode("overwrite").save(silver)

# Gold layer

In [0]:
def gold_run():
    silver_df = spark.read.format('delta').load(silver)
    gold_df = silver_df.groupBy("product_id")\
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", F.col("user_id"))).alias("views"),
        F.countDistinct(F.when(F.col("event_type")=="purchase",F.col("user_id"))).alias("purchases"),
        F.sum(F.when(F.col("event_type")=="purchase",F.col("price"))).alias("revenue")
    ).withColumn("converstion_rate", F.when(F.col("views")>0, F.col("purchases")/F.col("views")*100).otherwise(None))

    gold_df.write.format("delta").mode("overwrite").save(gold)  

In [0]:
def run_layers(layer_name):
    if layer_name == "bronze":
        bronze_run()   
    elif layer_name == "silver":
        silver_run()
    elif layer_name == "gold":
        gold_run()
    else:   
        raise Exception("Invalid layer name")


In [0]:
run_layers(layer)