In [0]:
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, IntegerType, ArrayType, DecimalType

# ==========================================
# 1. CONFIGURATION
# ==========================================
storage_account_name = "databricksete1995"
silver_container     = "silver"

# Paths
silver_path     = f"abfss://{silver_container}@{storage_account_name}.dfs.core.windows.net/earthquake_clean"
checkpoint_path = f"abfss://{silver_container}@{storage_account_name}.dfs.core.windows.net/_checkpoints/earthquake_silver"

# Table Names (Unity Catalog)
source_table = "databricks_cata.bronze.bronze_earthquake"
target_table = "databricks_cata.silver.silver_earthquake"

# ==========================================
# 2. DEFINE SCHEMA (Matches your Producer Data)
# ==========================================
# This tells Spark exactly how to read the JSON inside 'json_payload'
json_schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("geometry", StructType([
        StructField("coordinates", ArrayType(DoubleType()), True) # [Lon, Lat, Depth]
    ])),
    StructField("properties", StructType([
        StructField("mag", DoubleType(), True),
        StructField("place", StringType(), True),
        StructField("time", LongType(), True),       # Unix MS
        StructField("updated", LongType(), True),    # Unix MS
        StructField("tz", IntegerType(), True),
        StructField("url", StringType(), True),
        StructField("detail", StringType(), True),
        StructField("felt", IntegerType(), True),
        StructField("cdi", DoubleType(), True),
        StructField("mmi", DoubleType(), True),
        StructField("alert", StringType(), True),
        StructField("status", StringType(), True),
        StructField("tsunami", IntegerType(), True),
        StructField("sig", IntegerType(), True),
        StructField("net", StringType(), True),
        StructField("code", StringType(), True),
        StructField("ids", StringType(), True),
        StructField("sources", StringType(), True),
        StructField("types", StringType(), True),
        StructField("nst", IntegerType(), True),
        StructField("dmin", DoubleType(), True),
        StructField("rms", DoubleType(), True),
        StructField("gap", DoubleType(), True),
        StructField("magType", StringType(), True),
        StructField("type", StringType(), True)
    ]))
])

print(f"ðŸš€ Silver Stream Initializing...")
print(f"ðŸ“¥ Reading from: {source_table}")
print(f"ðŸ“¤ Writing to:   {target_table}")

# ==========================================
# 3. READ STREAM (From Bronze Delta Table)
# ==========================================
df_bronze = spark.readStream.table(source_table)

# ==========================================
# 4. TRANSFORMATION (Parse & Flatten)
# ==========================================
df_parsed = df_bronze.withColumn("parsed", from_json(col("json_payload"), json_schema))

df_silver = df_parsed.select(
    # Keep Metadata
    col("ingestion_time"),
    
    # 1. Identity
    col("parsed.id").alias("event_id"),
    col("parsed.type").alias("feature_type"),
    
    # 2. Geometry (Split Array into Columns)
    col("parsed.geometry.coordinates")[0].alias("longitude"),
    col("parsed.geometry.coordinates")[1].alias("latitude"),
    col("parsed.geometry.coordinates")[2].alias("depth"),
    
    # 3. Properties (Main)
    col("parsed.properties.mag").alias("magnitude"),
    col("parsed.properties.magType").alias("magnitude_type"),
    col("parsed.properties.place").alias("place"),
    
    # Time Conversion (Unix MS -> Timestamp)
    (col("parsed.properties.time") / 1000).cast("timestamp").alias("event_time"),
    (col("parsed.properties.updated") / 1000).cast("timestamp").alias("updated_time"),
    col("parsed.properties.tz").alias("timezone_offset"),
    
    # 4. Properties (Alerts & Status)
    col("parsed.properties.status").alias("status"),
    col("parsed.properties.alert").alias("alert_level"),
    col("parsed.properties.tsunami").alias("tsunami_flag"),
    col("parsed.properties.sig").alias("significance"),
    col("parsed.properties.felt").alias("felt_reports"),
    col("parsed.properties.cdi").alias("cdi_intensity"),
    col("parsed.properties.mmi").alias("mmi_intensity"),
    
    # 5. Technical Data
    col("parsed.properties.net").alias("network"),
    col("parsed.properties.code").alias("source_code"),
    col("parsed.properties.nst").alias("station_count"),
    col("parsed.properties.dmin").alias("min_distance"),
    col("parsed.properties.rms").alias("rms_error"),
    col("parsed.properties.gap").alias("azimuth_gap"),
    
    # 6. URLs
    col("parsed.properties.url").alias("usgs_url")
)

# ==========================================
# 5. WRITE STREAM (To Silver Container)
# ==========================================
query = (df_silver.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .option("path", silver_path)        # <--- Forces data to 'silver' container
    .option("mergeSchema", "true")      # Auto-evolve if USGS adds fields
    .trigger(availableNow=True)
    .table(target_table))

print(f"âœ… Stream Started! Transforming data into {target_table}")

In [0]:
%sql
SELECT 
    event_time, 
    magnitude, 
    place, 
    latitude, 
    longitude, 
    tsunami_flag 
FROM databricks_cata.silver.silver_earthquake 
ORDER BY magnitude DESC 
LIMIT 10