#### deserialize Avro using writer schema, validate, watermark, dedup, route errors to DLQ.

#### Avro decode helper

In [0]:
from pyspark.sql.avro.functions import from_avro
from pyspark.sql import functions as F

In [0]:
# Use schema fetched earlier as `writer_schema`
# If you prefer static, paste schema JSON into a string variable.

bronze_stream = spark.readStream.format('delta').load(bronze_base)


In [0]:
# Try decode; if it fails for the whole batch, handle in foreachBatch
decoded = bronze_stream.select(
    F.col('topic'), F.col('partition'), F.col('offset'), F.col('kafka_ts'),
    F.col('value_bytes'),
    from_avro('value_bytes', writer_schema).alias('data')
)

In [0]:
# Flatten expected columns (example order schema)
flat = decoded.select(
    'topic','partition','offset','kafka_ts',
    F.col('data.order_id').alias('order_id'),
    F.col('data.event_time').cast('timestamp').alias('event_ts'),
    F.col('data.customer_id').alias('customer_id'),
    F.col('data.amount').cast('double').alias('amount'),
    F.col('data.currency').alias('currency')
)


In [0]:
# Watermark & basic DQ
validated = (flat
  .withWatermark('event_ts', '15 minutes')
  .filter('amount IS NOT NULL AND amount >= 0')
)


In [0]:
# Deduplicate by business key within watermark
silver = validated.dropDuplicates(['order_id'])

In [0]:
silver_path = 'dbfs:/pipelines/orders/silver'
ckpt_silver = 'dbfs:/pipelines/orders/_ckpt_silver'

In [0]:
query = (silver.writeStream
  .format('delta')
  .outputMode('append')
  .option('checkpointLocation', ckpt_silver)
  .start(silver_path))

#### DLQ handling pattern

In [0]:
dlq_path = 'dbfs:/pipelines/orders/dlq'
ckpt_dlq = 'dbfs:/pipelines/orders/_ckpt_dlq'

from pyspark.sql import DataFrame

def process_batch(batch_df: DataFrame, batch_id: int):
    try:
        parsed = (batch_df
          .select('topic','partition','offset','kafka_ts','value_bytes',
                  from_avro('value_bytes', writer_schema).alias('data')))
        # If the above fails, itâ€™ll jump to except
        flattened = (parsed
          .select('topic','partition','offset','kafka_ts',
                  F.col('data.*')))
        (flattened
          .write.mode('append')
          .format('delta')
          .save(silver_path))
    except Exception as e:
        # Persist batch to DLQ with error
        err_str = str(e)
        (batch_df
          .withColumn('error', F.lit(err_str))
          .write.mode('append')
          .format('delta')
          .save(dlq_path))

(bronze_stream.writeStream
  .foreachBatch(process_batch)
  .option('checkpointLocation', ckpt_dlq)
  .start())