In [0]:
# 02_Silver_Transformation (Incremental / Streaming Version)
# Purpose: Process ONLY new data from Bronze and Merge to Silver

from pyspark.sql.functions import col, current_timestamp
from delta.tables import *

# --- 1. Configuration ---
QTY_COL = "qty"
VALUE_COL = "quoteQty"
DATE_COL = "timestamp"

# Paths for Checkpointing (Crucial for Incremental Logic)
checkpoint_path = "abfss://silver@sacryptotradesdata.dfs.core.windows.net/_checkpoints/trades"
table_name = "crypto_cat.silver.trades"
table_path = "abfss://silver@sacryptotradesdata.dfs.core.windows.net/trades"

# --- 2. The Logic Function (Micro-Batch Processing) ---
# This function runs on every "batch" of new data found.
def upsert_to_silver(microBatchDF, batchId):
    # If the micro-batch is empty, skip it
    if microBatchDF.count() == 0:
        return

    # A. Clean & Deduplicate the NEW data (Incoming Batch)
    # We deduplicate within the batch first to avoid processing duplicates
    unique_keys = [DATE_COL, "exchange", QTY_COL, VALUE_COL]
    
    df_clean = (microBatchDF
        .filter(col(QTY_COL).isNotNull() & (col(QTY_COL) > 0))
        .dropDuplicates(unique_keys)
        .withColumn("ProcessedDate", current_timestamp())
    )

    # B. Merge into Silver (Target)
    # If table doesn't exist, create it. If it does, Merge.
    if not spark.catalog.tableExists(table_name):
        print(f"Initializing Silver Table with Batch {batchId}...")
        (df_clean.write.format("delta")
            .mode("overwrite")
            .option("path", table_path)
            .partitionBy("exchange")
            .saveAsTable(table_name)
        )
    else:
        print(f"Merging Batch {batchId} into Silver...")
        deltaTable = DeltaTable.forName(spark, table_name)
        
        (deltaTable.alias("t")
            .merge(
                df_clean.alias("s"),
                f"t.{DATE_COL} = s.{DATE_COL} AND t.exchange = s.exchange AND t.{QTY_COL} = s.{QTY_COL} AND t.{VALUE_COL} = s.{VALUE_COL}"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

# --- 3. The Trigger (Read Stream -> Process -> Stop) ---
print("Checking for NEW data in Bronze...")

# We use readStream instead of read
# This automatically tracks the '_checkpoints' folder to know what is new.
(spark.readStream
    .table("crypto_cat.bronze.sales_raw")
    .writeStream
    .format("delta")
    .foreachBatch(upsert_to_silver) # Pass the function above
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .start()
    .awaitTermination() # Wait for it to finish before moving to Gold
)

print("✅ Incremental Silver Processing Complete!")

Checking for NEW data in Bronze...


25/11/24 17:55:15 Spark Server has not sent updates for Streaming Query 8d782511-29a7-49fd-9d25-1ba132373398 in 60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is eec981ca-919b-4a10-bad5-78f39369e615. This is typically not a problem.
25/11/24 17:55:40 Spark Server has not sent updates for Streaming Query 8d782511-29a7-49fd-9d25-1ba132373398 in 60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is eec981ca-919b-4a10-bad5-78f39369e615. This is typically not a problem.


✅ Incremental Silver Processing Complete!


In [0]:
%sql
-- Who are the top exchanges by volume?
SELECT exchange, count(*) as trade_count, sum(quoteQty) as total_volume
FROM crypto_cat.silver.trades
GROUP BY exchange
ORDER BY total_volume DESC;

exchange,trade_count,total_volume
bitmex,24010800,1263016861394.0
bybit,5345356,238600245968.0
binance_futures,8093899,170609381714.91632
okex,9650464,170044460895.72058
hitbtc,2404576,151443397260.87708
huobi,7845819,127930825527.974
deribit,3329029,87454992498.0
binance,5597465,82691338375.65114
ftx,1198430,42955168625.83201
gdax,2693915,41797453680.62817
