In [0]:
from pyspark.sql.functions import col, desc, count, sum as sum_, avg, max, row_number
from pyspark.sql.window import Window

In [0]:
# Garantir schema gold
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

# Ler dados silver
df_silver = spark.read.format("delta").table("silver.crypto_cleaned")

print("Colunas df_silver:", df_silver.columns)

In [0]:
# Agregação resumo diário de mercado
df_daily_summary = df_silver.groupBy("ingestion_date").agg(
    count("crypto_id").alias("total_cryptos"),
    sum_("market_cap_usd").alias("total_market_cap"),
    sum_("volume_24h").alias("total_volume_24h"),
    avg("price_change_pct_24h").alias("avg_price_change"),
    max("market_cap_usd").alias("max_market_cap")
)

In [0]:
# Agregação top performers
window_spec = Window.partitionBy("ingestion_date").orderBy(desc("price_change_pct_24h"))

df_top_gainers = df_silver.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 10) \
    .select("crypto_name", "crypto_symbol", "price_usd", 
            "price_change_pct_24h", "market_cap_rank", "ingestion_date")

In [0]:
# Agregação líderes por volume
df_volume_leaders = df_silver.groupBy("crypto_id", "crypto_name", "ingestion_date") \
    .agg(
        max("volume_24h").alias("volume_24h"),
        avg("price_usd").alias("avg_price")
    ) \
    .orderBy(desc("volume_24h")) \
    .limit(20)

In [0]:
# Salvar tabelas gold com mergeSchema para evitar conflito, modo overwrite ou append conforme objetivo
df_daily_summary.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("gold.daily_market_summary")

df_top_gainers.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("gold.top_gainers_daily")

df_volume_leaders.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("gold.volume_leaders")

print("Tabelas gold criadas com sucesso!")

In [0]:
report_path = "dbfs:/Users/g.kaahara@live.com/extractions"

df_top_gainers.write.mode("overwrite").csv(
    f"{report_path}top_gainers_daily",
    header=True
)
df_daily_summary.write.mode("overwrite").csv(
    f"{report_path}daily_market_summary",
    header=True
)