### Purpose:
Transform Bronze raw records into a clean Silver table:
- one row per (asset, pull)
- typed columns (price as double, timestamps)
- basic quality filters

### Set catalog and schema

In [0]:
spark.sql("USE CATALOG databricks_cata")
spark.sql("USE SCHEMA price_movers")

spark.sql("SELECT current_catalog(), current_schema()").show(truncate = False)

### Read Bronze and Inspect Schema


In [0]:
bronze_table = "bronze_prices_raw"
bronze = spark.table(bronze_table)

bronze.printSchema()
bronze.select("ingest_ts", "source_file").orderBy("ingest_ts", ascending = False).show(5, truncate = False)



In [0]:
%sql
select * from databricks_cata.price_movers.bronze_prices_raw limit 5;

### Transform: Explode data into rows

In [0]:
bronze = spark.table("bronze_prices_raw")
bronze.select("data").printSchema()

### Parse **data** into a MAP, then explode

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

bronze = spark.table("bronze_prices_raw")


assets = ["bitcoin", "cardano", "ethereum", "ripple", "solana"]

# Build a map: asset_id -> metrics_struct
data_map_expr = []
for a in assets:
    data_map_expr.extend([F.lit(a), F.col(f"data.`{a}`")])

# data_schema = MapType(StringType(), MapType(StringType(), DoubleType()))

silver_step = (
    bronze
    .withColumn("pulled_at_ts", F.to_timestamp("pulled_at_utc"))
    # Parse the JSON string column into a map
    .withColumn("data_map", F.create_map(*data_map_expr))
    .select(
        "ingest_ts",
        "source_file",
        F.col("source").cast("string").alias("api_source"),
        F.col("vs_currency").cast("string").alias("vs_currency"),
        "pulled_at_ts",
        #Explode the map into rows
        F.explode("data_map").alias("asset_id", "metrics")
    
    )
)
silver_step.printSchema()
display(silver_step)

In [0]:
silver_step.printSchema()

### Extract price + last_updated_at safely

In [0]:
silver_base = (
  silver_step
  .select(
    "ingest_ts",
    "source_file",
    "api_source",
    "vs_currency",
    "pulled_at_ts",
    "asset_id",
    # price is stored under key = vs_currency
    F.col("metrics.zar").cast("double").alias("price"),
    F.col("metrics.last_updated_at").cast("long").alias("last_updated_at_unix")
  )
    # Convert source event time
    .withColumn("event_ts", F.to_timestamp(F.from_unixtime("last_updated_at_unix")))
    .withColumn("event_ts", F.coalesce("event_ts", "pulled_at_ts"))
     # Quality rules
    .filter(F.col("price").isNotNull())
    .filter(F.col("price") > 0)
  )

display(silver_base)

### Add South African time columns

In [0]:
silver = (
    silver_base
    .withColumn("event_ts_sa", F.from_utc_timestamp(F.col("event_ts"), "Africa/Johannesburg"))
    .withColumn("pulled_at_ts_sa", F.from_utc_timestamp(F.col("pulled_at_ts"), "Africa/Johannesburg"))
)

display(silver.select("asset_id", "event_ts", "event_ts_sa", "price").orderBy(F.col("event_ts").desc()))


In [0]:
silver.printSchema()

### Write to Silver Table

In [0]:
silver.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("silver_prices")

### Validate Silver Table

In [0]:
spark.sql("SELECT COUNT(*) AS silver_rows FROM silver_prices").show()

spark.sql("""
          SELECT asset_id, COUNT(*) AS rows_per_asset
          FROM silver_prices
          GROUP BY asset_id
          ORDER BY rows_per_asset DESC""").show(truncate=False)

spark.sql("""
          SELECT asset_id, event_ts, price, vs_currency
          FROM silver_prices
          ORDER BY event_ts DESC 
          LIMIT 10""").show(truncate=False)