In [0]:
# Create widgets for Gold layer
dbutils.widgets.text("source_table", "default.silver_events")

source_table = dbutils.widgets.get("source_table")

print("Running Gold layer")
print("Source Table:", source_table)

In [0]:
from pyspark.sql import functions as F

gold_df = (
    spark.read.table(source_table)
    .groupBy("product_id")
    .agg(
        F.count(F.when(F.col("event_type") == "view", True)).alias("views"),
        F.count(F.when(F.col("event_type") == "purchase", True)).alias("purchases"),
        F.sum(F.when(F.col("event_type") == "purchase", F.col("price"))).alias("revenue")
    )

    .withColumn(
        "conversion_rate",
        F.when(F.col("views") > 0, (F.col("purchases") / F.col("views")) * 100)
         .otherwise(0)
    )
)

gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.gold_product_performance")

print("Gold layer completed successfully")

In [0]:
spark.read.table("default.gold_product_performance").show()

In [0]:
from pyspark.sql import functions as F

spark.read.table("default.gold_product_performance") \
    .filter(F.col("conversion_rate").isNotNull()) \
    .orderBy(F.col("revenue").desc()) \
    .limit(10) \
    .show(truncate=False)


In [0]:
from pyspark.sql import functions as F

spark.read.table("default.gold_product_performance") \
    .filter(F.col("conversion_rate").isNotNull()) \
    .orderBy(F.col("revenue").desc()) \
    .limit(20) \
    .show(truncate=False)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql.functions import broadcast
top10 = (
    spark.read.table("default.gold_product_performance")
    .filter(F.col("conversion_rate").isNotNull())
    .orderBy(F.col("revenue").desc())
    .limit(10)
)
schema = StructType([
    StructField("event_time", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("user_session", StringType(), True)
])
product_brand = (
    spark.read
    .option("header", "true")
    .schema(schema)
    .csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")
    .select("product_id", "brand")
    .filter(F.col("brand").isNotNull())
    .dropDuplicates(["product_id"])
)
final_output = (
    broadcast(top10)
    .join(product_brand, "product_id", "left")
    .withColumn(
        "revenue_fmt",
        F.when(F.col("revenue") >= 1_000_000,
               F.concat(F.round(F.col("revenue") / 1_000_000, 2), F.lit(" M")))
         .when(F.col("revenue") >= 1_000,
               F.concat(F.round(F.col("revenue") / 1_000, 2), F.lit(" K")))
         .otherwise(F.round(F.col("revenue"), 2).cast("string"))
    )
    .orderBy(F.col("revenue").desc())
    .select(
        "product_id",
        "brand",
        "views",
        "purchases",
        "conversion_rate",
        "revenue_fmt"
    )
)

final_output.show(truncate=False)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType

# --------------------------------------------------
# 1. Define schema (DO NOT infer for large CSV)
# --------------------------------------------------

schema = StructType([
    StructField("event_time", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])

# --------------------------------------------------
# 2. Read 2019-Oct.csv
# --------------------------------------------------

raw_oct = (
    spark.read
    .option("header", "true")
    .schema(schema)
    .csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")
)

# --------------------------------------------------
# 3. Filter brand = Dell (case-insensitive)
# --------------------------------------------------

dell_df = (
    raw_oct
    .filter(F.lower(F.col("brand")) == "dell")
)

# --------------------------------------------------
# 4. Create a DELTA table for Dell brand
# --------------------------------------------------

dell_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dell_brand_events")

# --------------------------------------------------
# 5. Show output
# --------------------------------------------------

spark.read.table("default.dell_brand_events").show(truncate=False)


In [0]:
from pyspark.sql import functions as F

# --------------------------------------------------
# Calculate views, purchases, conversion rate & revenue
# --------------------------------------------------

dell_metrics_by_product = (
    spark.read.table("default.dell_brand_events")
    .groupBy("product_id", "brand")
    .agg(
        # Views
        F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("views"),
        
        # Purchases
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
        
        # Revenue (only from purchases)
        F.sum(F.when(F.col("event_type") == "purchase", F.col("price")).otherwise(0)).alias("revenue")
    )
    # Conversion rate = purchases / views
    .withColumn(
        "conversion_rate",
        F.when(F.col("views") > 0,
               F.round((F.col("purchases") / F.col("views")) * 100, 2)
        ).otherwise(0.0)
    )
)

# --------------------------------------------------
# Format revenue (K / M)
# --------------------------------------------------

final_output = (
    dell_metrics_by_product
    .withColumn(
        "revenue_fmt",
        F.when(F.col("revenue") >= 1_000_000,
               F.concat(F.round(F.col("revenue") / 1_000_000, 2), F.lit(" M")))
         .when(F.col("revenue") >= 1_000,
               F.concat(F.round(F.col("revenue") / 1_000, 2), F.lit(" K")))
         .otherwise(F.round(F.col("revenue"), 2).cast("string"))
    )
    .orderBy(F.col("revenue").desc())
    .select(
        "product_id",
        "brand",
        "views",
        "purchases",
        "conversion_rate",
        "revenue_fmt"
    )
)

# --------------------------------------------------
# Show final result
# --------------------------------------------------

final_output.show(truncate=False)

