In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

# Spark session
spark = SparkSession.builder \
    .appName("SilverLayer") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read from bronze layer
bronze_path = "/opt/bitnami/spark/data/bronze"
df_bronze = spark.readStream.format("delta").load(bronze_path)

# Data cleaning and transformation
df_silver = df_bronze.withColumn("amount", col("amount").cast(DecimalType(18, 2))) \
    .withColumnRenamed("base", "crypto_base") \
    .withColumnRenamed("currency", "quote_currency") \
    .withColumn("timestamp", col("timestamp").cast(TimestampType())) \
    .dropDuplicates(["crypto_base", "quote_currency", "timestamp"])

# Write to silver layer (upsert)
silver_path = "/opt/bitnami/spark/data/silver"
checkpoint_path = "/opt/bitnami/spark/data/silver_checkpoint"

def upsert_to_delta(micro_batch_df, batch_id):
    delta_table = DeltaTable.forPath(spark, silver_path)
    delta_table.alias("t") \
        .merge(
            micro_batch_df.alias("s"),
            "t.crypto_base = s.crypto_base AND t.quote_currency = s.quote_currency AND t.timestamp = s.timestamp"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

query = df_silver.writeStream \
    .foreachBatch(upsert_to_delta) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_path) \
    .start()

query.awaitTermination()