In [0]:
%sql
SELECT * FROM default.gold_product_performance

In [0]:
df = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")
df.show()

In [0]:
from pyspark.sql.functions import to_date, col

df = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
    header=True,
    inferSchema=True
)

df = df.withColumn(
    "event_date",
    to_date(col("event_time"))
)


In [0]:
df.select("event_time", "event_date").show(5, truncate=False)


In [0]:
df.createOrReplaceTempView("df")


In [0]:
%sql
ALTER TABLE default.gold_product_performance
ADD COLUMNS (event_date DATE);


In [0]:
%sql
ALTER TABLE default.gold_product_performance
ADD COLUMNS (category_code STRING);


In [0]:
%sql
MERGE INTO default.gold_product_performance AS g
USING (
    SELECT
        product_id,
        MAX(category_code) AS category_code
    FROM df
    GROUP BY product_id
) AS d
ON g.product_id = d.product_id

WHEN MATCHED THEN
  UPDATE SET g.category_code = d.category_code;


In [0]:
%sql
select * from default.gold_product_performance

In [0]:
%sql
-- Revenue with 7-day moving average
WITH daily AS (
  SELECT event_date, SUM(revenue) as rev
  FROM  default.gold_product_performance GROUP BY event_date
)
SELECT event_date, rev,
  AVG(rev) OVER (ORDER BY event_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as ma7
FROM daily;

In [0]:
%sql
-- Conversion funnel
SELECT category_code,
  SUM(views) as views,
  SUM(purchases) as purchases,
  ROUND(SUM(purchases)*100.0/SUM(views), 2) as conversion_rate
FROM default.gold_product_performance
GROUP BY category_code;


In [0]:
%sql
-- Customer tiers
SELECT
  CASE WHEN cnt >= 10 THEN 'VIP'
       WHEN cnt >= 5 THEN 'Loyal'
       ELSE 'Regular' END as tier,
  COUNT(*) as customers,
  AVG(total_spent) as avg_ltv
FROM (SELECT user_id, COUNT(*) cnt, SUM(price) total_spent
      FROM default.silver_events WHERE event_type='purchase' GROUP BY user_id)
GROUP BY tier;