In [0]:
from pyspark.sql import functions as F


In [0]:
# SOURCE
OCT_CSV = "dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv"
NOV_CSV = "dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv"  # si la usarás

# MEDALLION PATHS
BRONZE_PATH = "dbfs:/Volumes/workspace/ecommerce/ecommerce_data/medallion/bronze/events"
SILVER_PATH = "dbfs:/Volumes/workspace/ecommerce/ecommerce_data/medallion/silver/events"
GOLD_PRODUCTS_PATH = "dbfs:/Volumes/workspace/ecommerce/ecommerce_data/medallion/gold/products"


In [0]:
# BRONZE: Raw ingestion
raw = spark.read.csv(OCT_CSV, header=True, inferSchema=True)

(raw.withColumn("ingestion_ts", F.current_timestamp())
    .withColumn("source_file", F.lit("2019-Oct.csv"))
    .write.format("delta").mode("overwrite").save(BRONZE_PATH)
)

print("BRONZE saved:", BRONZE_PATH)


BRONZE saved: dbfs:/Volumes/workspace/ecommerce/ecommerce_data/medallion/bronze/events


In [0]:
raw_oct = spark.read.csv(OCT_CSV, header=True, inferSchema=True).withColumn("source_file", F.lit("2019-Oct.csv"))
raw_nov = spark.read.csv(NOV_CSV, header=True, inferSchema=True).withColumn("source_file", F.lit("2019-Nov.csv"))

raw = raw_oct.unionByName(raw_nov)

(raw.withColumn("ingestion_ts", F.current_timestamp())
    .write.format("delta").mode("overwrite").save(BRONZE_PATH)
)

In [0]:
# SILVER: Cleaned data
bronze = spark.read.format("delta").load(BRONZE_PATH)

silver = (bronze
    .withColumn("event_time", F.to_timestamp("event_time"))
    .filter(F.col("event_time").isNotNull())
    .filter(F.col("price").isNotNull())
    .filter(F.col("price") > 0)
    .filter(F.col("price") < 10000)
    .dropDuplicates(["user_session", "event_time"])  # igual que la guía
    .withColumn("event_date", F.to_date("event_time"))
    .withColumn("price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium"))
)

silver.write.format("delta").mode("overwrite").save(SILVER_PATH)

print("SILVER saved:", SILVER_PATH)


SILVER saved: dbfs:/Volumes/workspace/ecommerce/ecommerce_data/medallion/silver/events


In [0]:
# GOLD: Aggregates
silver = spark.read.format("delta").load(SILVER_PATH)

product_perf = (silver.groupBy("product_id")
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", F.col("user_id"))).alias("views"),
        F.countDistinct(F.when(F.col("event_type")=="purchase", F.col("user_id"))).alias("purchases"),
        F.sum(F.when(F.col("event_type")=="purchase", F.col("price")).otherwise(F.lit(0.0))).alias("revenue")
    )
    .withColumn("conversion_rate",
        F.when(F.col("views") > 0, (F.col("purchases")/F.col("views")*100)).otherwise(F.lit(0.0))
    )
)

product_perf.write.format("delta").mode("overwrite").save(GOLD_PRODUCTS_PATH)

print("GOLD saved:", GOLD_PRODUCTS_PATH)


GOLD saved: dbfs:/Volumes/workspace/ecommerce/ecommerce_data/medallion/gold/products
