In [None]:
# Databricks Notebook
# ===================================
# 04_Gold_Layer
# ===================================

from pyspark.sql.functions import *
from pyspark.sql.window import Window

print("="*60)
print("ゴールドレイヤー構築開始")
print("="*60)

# シルバーレイヤーのデータ読み込み
silver_ads = spark.table("silver_digital_ads")
silver_transactions = spark.table("silver_transactions")
silver_items = spark.table("silver_items")

# ===================================
# 1. キャンペーン別パフォーマンス
# ===================================

print("\n[1/4] キャンペーン別パフォーマンス集計中...")

# トランザクションに商品情報を結合
transactions_with_items = silver_transactions.alias("t") \
    .join(silver_items.alias("i"), col("t.item_id") == col("i.item_id"), "left") \
    .select(
        "t.*",
        "i.category_l1",
        "i.category_l2",
        "i.category_l3",
        "i.cost",
        "i.profit_margin"
    )

# キャンペーン別の売上集計
campaign_revenue = transactions_with_items \
    .filter(col("utm_campaign").isNotNull()) \
    .groupBy("utm_campaign", "transaction_date") \
    .agg(
        sum("total_amount").alias("total_revenue"),
        sum((col("price") - col("cost")) * col("quantity")).alias("total_profit"),
        count("transaction_id").alias("conversion_count"),
        countDistinct("user_id").alias("unique_customers"),
        avg("total_amount").alias("avg_order_value")
    )

# 広告データと結合してKPI計算
gold_campaign_performance = silver_ads.alias("a") \
    .join(
        campaign_revenue.alias("r"),
        (col("a.utm_campaign") == col("r.utm_campaign")) & 
        (col("a.date") == col("r.transaction_date")),
        "left"
    ) \
    .select(
        col("a.campaign_id"),
        col("a.campaign_name"),
        col("a.date"),
        col("a.year_month"),
        col("a.day_of_week"),
        col("a.ad_platform"),
        col("a.target_category"),
        col("a.impressions"),
        col("a.clicks"),
        col("a.cost").alias("ad_cost"),
        col("a.ctr"),
        col("a.cpc"),
        coalesce(col("r.total_revenue"), lit(0)).alias("revenue"),
        coalesce(col("r.total_profit"), lit(0)).alias("profit"),
        coalesce(col("r.conversion_count"), lit(0)).alias("conversions"),
        coalesce(col("r.unique_customers"), lit(0)).alias("unique_customers"),
        coalesce(col("r.avg_order_value"), lit(0)).alias("avg_order_value")
    ) \
    .withColumn("cvr", 
               when(col("clicks") > 0, col("conversions") / col("clicks") * 100)
               .otherwise(0)) \
    .withColumn("roas",
               when(col("ad_cost") > 0, col("revenue") / col("ad_cost"))
               .otherwise(0)) \
    .withColumn("cpa",
               when(col("conversions") > 0, col("ad_cost") / col("conversions"))
               .otherwise(0)) \
    .withColumn("roi",
               when(col("ad_cost") > 0, (col("profit") - col("ad_cost")) / col("ad_cost") * 100)
               .otherwise(0)) \
    .withColumn("calculation_timestamp", current_timestamp())

# 保存
gold_campaign_performance.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("year_month") \
    .save(f"{GOLD_PATH}/campaign_performance")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS gold_campaign_performance
    USING DELTA
    LOCATION '{GOLD_PATH}/campaign_performance'
""")

print(f"✓ キャンペーンパフォーマンス集計完了: {gold_campaign_performance.count():,}件")

# ===================================
# 2. カテゴリ別パフォーマンス
# ===================================

print("\n[2/4] カテゴリ別パフォーマンス集計中...")

gold_category_performance = transactions_with_items \
    .filter(col("utm_campaign").isNotNull()) \
    .groupBy("category_l1", "category_l2", "transaction_date", "year_month") \
    .agg(
        sum("total_amount").alias("total_revenue"),
        sum((col("price") - col("cost")) * col("quantity")).alias("total_profit"),
        count("transaction_id").alias("order_count"),
        countDistinct("user_id").alias("unique_customers"),
        avg("total_amount").alias("avg_order_value"),
        sum("quantity").alias("total_quantity")
    ) \
    .withColumn("calculation_timestamp", current_timestamp())

gold_category_performance.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("year_month") \
    .save(f"{GOLD_PATH}/category_performance")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS gold_category_performance
    USING DELTA
    LOCATION '{GOLD_PATH}/category_performance'
""")

print(f"✓ カテゴリパフォーマンス集計完了: {gold_category_performance.count():,}件")

# ===================================
# 3. 日次サマリ
# ===================================

print("\n[3/4] 日次サマリ集計中...")

gold_daily_summary = silver_ads \
    .groupBy("date", "year_month") \
    .agg(
        sum("impressions").alias("total_impressions"),
        sum("clicks").alias("total_clicks"),
        sum("cost").alias("total_ad_cost"),
        avg("ctr").alias("avg_ctr"),
        avg("cpc").alias("avg_cpc"),
        countDistinct("campaign_id").alias("active_campaigns")
    ) \
    .join(
        silver_transactions
            .groupBy(col("transaction_date").alias("date"))
            .agg(
                sum("total_amount").alias("total_revenue"),
                count("transaction_id").alias("total_orders"),
                countDistinct("user_id").alias("unique_customers")
            ),
        "date",
        "left"
    ) \
    .withColumn("total_revenue", coalesce(col("total_revenue"), lit(0))) \
    .withColumn("total_orders", coalesce(col("total_orders"), lit(0))) \
    .withColumn("unique_customers", coalesce(col("unique_customers"), lit(0))) \
    .withColumn("roas", 
               when(col("total_ad_cost") > 0, col("total_revenue") / col("total_ad_cost"))
               .otherwise(0)) \
    .withColumn("calculation_timestamp", current_timestamp()) \
    .orderBy("date")

gold_daily_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("year_month") \
    .save(f"{GOLD_PATH}/daily_summary")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS gold_daily_summary
    USING DELTA
    LOCATION '{GOLD_PATH}/daily_summary'
""")

print(f"✓ 日次サマリ集計完了: {gold_daily_summary.count():,}件")

# ===================================
# 4. プラットフォーム別パフォーマンス
# ===================================

print("\n[4/4] プラットフォーム別パフォーマンス集計中...")

gold_platform_performance = gold_campaign_performance \
    .groupBy("ad_platform", "year_month") \
    .agg(
        sum("impressions").alias("total_impressions"),
        sum("clicks").alias("total_clicks"),
        sum("ad_cost").alias("total_ad_cost"),
        sum("revenue").alias("total_revenue"),
        sum("conversions").alias("total_conversions"),
        avg("ctr").alias("avg_ctr"),
        avg("cpc").alias("avg_cpc"),
        avg("cvr").alias("avg_cvr"),
        countDistinct("campaign_id").alias("active_campaigns")
    ) \
    .withColumn("roas",
               when(col("total_ad_cost") > 0, col("total_revenue") / col("total_ad_cost"))
               .otherwise(0)) \
    .withColumn("cpa",
               when(col("total_conversions") > 0, col("total_ad_cost") / col("total_conversions"))
               .otherwise(0)) \
    .withColumn("calculation_timestamp", current_timestamp())

gold_platform_performance.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("year_month") \
    .save(f"{GOLD_PATH}/platform_performance")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS gold_platform_performance
    USING DELTA
    LOCATION '{GOLD_PATH}/platform_performance'
""")

print(f"✓ プラットフォームパフォーマンス集計完了: {gold_platform_performance.count():,}件")

# ===================================
# ゴールドレイヤー完成確認
# ===================================

print("\n" + "="*60)
print("ゴールドレイヤー構築完了")
print("="*60)

# 全テーブル表示
print("\n【作成されたゴールドテーブル】")
display(spark.sql("SHOW TABLES IN ad_analytics LIKE 'gold*'"))

# サンプルデータ表示
print("\n【ゴールド: キャンペーンパフォーマンス（ROAS上位10件）】")
display(
    spark.table("gold_campaign_performance")
    .orderBy(col("roas").desc())
    .limit(10)
)

print("\n【ゴールド: 日次サマリ（最新10日）】")
display(
    spark.table("gold_daily_summary")
    .orderBy(col("date").desc())
    .limit(10)
)

print("\n【ゴールド: プラットフォーム別パフォーマンス】")
display(
    spark.table("gold_platform_performance")
    .orderBy(col("total_revenue").desc())
)