In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, when, avg, stddev, lag
import dlt

In [0]:
@dlt.table(
    name="gold_crypto_leaderboard",
    comment="Gold: latest snapshot per crypto, ranked by market cap.",
    table_properties={"quality": "gold"}
)
def gold_crypto_leaderboard():
    df = dlt.read("silver_crypto_market")
    # 2. Define window: per crypto, newest snapshot first
    window_spec = Window.partitionBy("crypto_id").orderBy(col("snapshot_ts").desc())
    # 3. Rank rows inside each crypto_id group
    ranked = df.withColumn(
        "row_num",
        row_number().over(window_spec)
    )
    # 4. Keep only the latest row per crypto
    latest_only = ranked.filter(col("row_num") == 1).drop("row_num")
    latest_only = latest_only.withColumn(
        "ath_distance_bucket",
        when(col("pct_from_ath") <= 10, "Near ATH")
        .when(col("pct_from_ath") <= 30, "Moderate Drawdown")
        .otherwise("Deep Discount")
    )
    return latest_only.select(
        "crypto_id",
        "symbol",
        "name",
        "market_cap_rank",
        "price_usd",
        "market_cap",
        "volume_24h",
        "price_change_pct_24h",
        "pct_from_ath",
        "ath_distance_bucket",
        "snapshot_ts"
    )


@dlt.table(
    name="gold_crypto_trends",
    comment="Gold: time-series trend metrics per crypto over time.",
    table_properties={"quality": "gold"}
)
def gold_crypto_trends():
    df = dlt.read("silver_crypto_market")
    w = Window.partitionBy("crypto_id").orderBy(df.snapshot_ts)

    trends = df.withColumn(
        "ma_3",
        avg("price_usd").over(w.rowsBetween(-2, 0))
    )
    trends = trends.withColumn(
        "vol_3",
        stddev("price_usd").over(w.rowsBetween(-2, 0))
    )

    trends = trends.withColumn(
        "prev_price",
        lag("price_usd", 1).over(w)
    )
    
    trends = trends.withColumn(
        "price_momentum",
        when(trends.price_usd > trends.prev_price, "UP")
        .when(trends.price_usd < trends.prev_price, "DOWN")
        .otherwise("FLAT")
    )

    return trends.select(
        "crypto_id",
        "symbol",
        "name",
        "snapshot_ts",
        "price_usd",
        "ma_3",
        "vol_3",
        "prev_price",
        "price_momentum",
        "market_cap_rank",
        "volume_24h",
        "pct_from_ath"
    )
