# Spark Structured Streaming with Iceberg: bronze to silver

In this notebook, we will read messages from Pulsar, then ingest to bronze Iceberg table, then readStream from the bronze Iceberg bronze table and write to other downstream tables. 

Considerations:
- The bronze table allows persisting all message so other downstream tables can be processed without losing messing
- Allow replayability when raw messages are saved in Iceberg tables to reprocess
- However, we cannot identify the latest changes readStream from the Iceberg table, so the spark.readStream(bronze_table) will have all the data in the table, potential perfomance issue with very large bronze table. The write to downstream table has checkpoint so we can still guarantee idempotent write 

In [1]:
# Use Databricks Connect to connect to Databricks and run notebook interactively from IDE on local machine, comment this cell if running on Databricks workspace
from databricks.connect import DatabricksSession

cluster_id = "0709-132523-cnhxf2p6"

# Create Databricks Connect session using profile
spark = DatabricksSession.builder \
    .profile("DEFAULT") \
    .remote(cluster_id=cluster_id) \
    .getOrCreate()

print(f"✅ Connected to Databricks! Spark version: {spark.version}")

# Test the connection
df = spark.sql("SELECT current_timestamp() as current_time")
df.show()

✅ Connected to Databricks! Spark version: 3.5.2
+--------------------+
|        current_time|
+--------------------+
|2025-07-23 01:23:...|
+--------------------+



# Bronze

In [19]:
service_url = "pulsar://44.247.85.233:6650"
topic = "financial-messages"

In [20]:
from schemas import schema, instrument_ref_schema, instrument_error_schema, instrument_risk_schema 
from pyspark.sql.functions import col, from_json, schema_of_json

In [21]:

fin_df = (
    spark.readStream
    .format("pulsar")
    .option("service.url", service_url)
    .option("topics", topic)
    .option("startingOffsets", "latest")
    .load()
    .select(from_json(col("value").cast("string"), schema).alias("value"))
    .select("value.*")
)

In [None]:
display(fin_df)

Iceberg tables need to be created before writing

In [5]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS users.anhhoang_chu.bronze_fin_instrument ( {schema} )
USING ICEBERG
""")

In [17]:
(
    fin_df.writeStream
    .format("iceberg")
    .outputMode("append")
    .option("schemaLocation", "/Volumes/users/anhhoang_chu/iceberg/bronze1/_schema")
    .option("checkpointLocation", "/Volumes/users/anhhoang_chu/iceberg/bronze1/_checkpoint")
    .trigger(availableNow=True)
    .toTable("users.anhhoang_chu.bronze_fin_instrument")
)

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x308aa2fc0>

In [11]:
%sql
desc history users.anhhoang_chu.bronze_fin_instrument

Unnamed: 0,version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,4,2025-07-23 01:23:58.935,2917128087014142,anhhoang.chu@databricks.com,STREAMING UPDATE,"{'outputMode': 'Append', 'queryId': '8aadc9b1-d7a6-4c9f-9521-cb48e42dec4b', 'epochId': '3', 'statsOnLoad': 'false'}",,,0709-132523-cnhxf2p6,3.0,WriteSerializable,True,"{'numRemovedFiles': '0', 'numAddedFiles': '0', 'numOutputRows': '0', 'numOutputBytes': '0'}",,Databricks-Runtime/16.4.x-photon-scala2.13
1,3,2025-07-22 22:22:27.175,2917128087014142,anhhoang.chu@databricks.com,STREAMING UPDATE,"{'outputMode': 'Append', 'queryId': '8aadc9b1-d7a6-4c9f-9521-cb48e42dec4b', 'epochId': '2', 'statsOnLoad': 'false'}",,{'notebookId': '3205547712014776'},0722-214326-7mdrhwya,2.0,WriteSerializable,True,"{'numRemovedFiles': '0', 'numAddedFiles': '1', 'numOutputRows': '1', 'numOutputBytes': '74835'}",,Databricks-Runtime/17.0.x-scala2.13
2,2,2025-07-22 22:03:57.247,2917128087014142,anhhoang.chu@databricks.com,STREAMING UPDATE,"{'outputMode': 'Append', 'queryId': '8aadc9b1-d7a6-4c9f-9521-cb48e42dec4b', 'epochId': '1', 'statsOnLoad': 'false'}",,{'notebookId': '3205547712014776'},0722-214326-7mdrhwya,1.0,WriteSerializable,True,"{'numRemovedFiles': '0', 'numAddedFiles': '1', 'numOutputRows': '1', 'numOutputBytes': '74837'}",,Databricks-Runtime/17.0.x-scala2.13
3,1,2025-07-22 22:00:18.850,2917128087014142,anhhoang.chu@databricks.com,STREAMING UPDATE,"{'outputMode': 'Append', 'queryId': '8aadc9b1-d7a6-4c9f-9521-cb48e42dec4b', 'epochId': '0', 'statsOnLoad': 'false'}",,{'notebookId': '3205547712014776'},0722-214326-7mdrhwya,0.0,WriteSerializable,True,"{'numRemovedFiles': '0', 'numAddedFiles': '1', 'numOutputRows': '11', 'numOutputBytes': '77899'}",,Databricks-Runtime/17.0.x-scala2.13
4,0,2025-07-22 22:00:08.464,2917128087014142,anhhoang.chu@databricks.com,CREATE TABLE,"{'partitionBy': '[]', 'clusterBy': '[]', 'description': None, 'isManaged': 'true', 'properties': '{""delta.enableIcebergCompatV2"":""true"",""write.metadata.path"":""s3://databricks-e2demofieldengwest/b169b504-4c54-49f2-bc3a-adf4b128f36d/tables/520e699f-4bd9-48ca-bc00-458383a89d44/_iceberg/metadata"",""delta.universalFormat.enabledFormats"":""iceberg"",""write.parquet.compression-codec"":""zstd"",""delta.enableIcebergWriterCompatV1"":""true"",""write.summary.partition-limit"":""100"",""write.wap.enabled"":""false"",""delta.columnMapping.mode"":""id"",""delta.columnMapping.maxColumnId"":""150"",""history.expire.min-snapshots-to-keep"":""100"",""write.data.path"":""s3://databricks-e2demofieldengwest/b169b504-4c54-49f2-bc3a-adf4b128f36d/tables/520e699f-4bd9-48ca-bc00-458383a89d44"",""history.expire.max-snapshot-age-ms"":""0"",""delta.enableTypeWidening"":""true"",""write.metadata.compression-codec"":""gzip"",""delta.checkpointPolicy"":""v2"",""write.object-storage.enabled"":""true"",""gc.enabled"":""false"",""delta.enableInCommitTimestamps"":""true""}', 'statsOnLoad': 'false'}",,{'notebookId': '3205547712014776'},0722-214326-7mdrhwya,,WriteSerializable,True,{},,Databricks-Runtime/17.0.x-scala2.13


In [0]:
dbutils.fs.ls("s3://databricks-e2demofieldengwest/b169b504-4c54-49f2-bc3a-adf4b128f36d/tables/520e699f-4bd9-48ca-bc00-458383a89d44")

# Silver

## Instrument Reference

In [14]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS users.anhhoang_chu.silver_instrument_reference({instrument_ref_schema})
USING ICEBERG
""")

In [15]:
# We cannot identify the latest changes  when readStream from the Iceberg table, so exploded_df will have all the data in the table

from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import *

exploded_df = (
      spark.readStream
        .format("iceberg")
        .table("users.anhhoang_chu.bronze_fin_instrument")
        .select(
            col("jobidentifier"),
            col("analysisidentifier"),
            explode(col("data")).alias("data_item")
        )
        .filter(col("data_item.type") == "instrument")
    )

display(exploded_df)

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
tahoe

JVM stacktrace:
org.apache.spark.sql.catalyst.ExtendedAnalysisException
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:694)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$2(UnsupportedOperationChecker.scala:65)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$2$adapted(UnsupportedOperationChecker.scala:63)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:303)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:302)
	at scala.collection.immutable.Vector.foreach(Vector.scala:1895)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:302)
	at scala.collection.immutable.Vector.foreach(Vector.scala:1895)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:302)
	at scala.collection.immutable.Vector.foreach(Vector.scala:1895)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:302)
	at scala.collection.immutable.Vector.foreach(Vector.scala:1895)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:302)
	at scala.collection.immutable.Vector.foreach(Vector.scala:1895)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:302)
	at scala.collection.immutable.Vector.foreach(Vector.scala:1895)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:302)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:63)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:57)
	at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:281)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyWithCachedData$2(QueryExecution.scala:518)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1450)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyWithCachedData$1(QueryExecution.scala:516)
	at scala.util.Try$.apply(Try.scala:210)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1684)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1745)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:526)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getResultCacheStats(ResultCacheManager.scala:615)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.processAsArrowBatches(SparkConnectPlanExecution.scala:217)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:131)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:404)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:313)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:233)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:464)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1450)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:464)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:90)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:89)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:463)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:233)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:139)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:614)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:261)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:614)

DataFrame[jobidentifier: string, analysisidentifier: string, data_item: struct<type:string,instrumentreference:struct<analysisidentifier:string,instrumentidentifier:string,asofdate:string,accountidentifier:string,accountname:string,instrumentname:string,description:string,instrumenttype:string,instrumentsubtype:string,consumerproductcategory:string,originationdate:string,maturitydate:string,amortizationtype:string,amortizationenddate:string,isinterestonly:string,cashflowtype:string,instrumentcurrency:string,notionalportion:double,unpaidprincipalbalance:string,currentcommitmentamount:double,marketpriceoverride:string,fixedpaymentamount:double,currentbookpriceoverride:string,interestratetype:string,interestpaymentfrequency:string,curerate:double,fixedrate:double,currentrate:double,portfolioidentifier:string,interestratespread:double,interestrateindexmultiplier:double,interestrateindex:string,lifetimeinterestratecap:double,lifetimeinterestratefloor:double,periodicinterestratecap:double,pe

In [0]:
# However, the checkpointLocatiion allows us to only write the new records to silver table
ref_df = (
    exploded_df.select(
        col("data_item.instrumentreference.*")
    )
)
(
ref_df.writeStream
    .queryName("silver_instrument_reference_stream")
    .format("iceberg")
    .outputMode("append")
    .option("schemaLocation", "/Volumes/users/anhhoang_chu/iceberg/silver/ref_schema")
    .option("checkpointLocation", "/Volumes/users/anhhoang_chu/iceberg/silver/_ref_checkpoint")
    .trigger(availableNow=True)
    .toTable("users.anhhoang_chu.silver_instrument_reference")
)

In [0]:
%sql
select * from users.anhhoang_chu.silver_instrument_reference

## Instrument Error

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS users.anhhoang_chu.silver_instrument_error({instrument_error_schema})
USING ICEBERG
""")

In [0]:
from pyspark.sql.functions import explode, col 

error_df = exploded_df.withColumn("errors", explode(col("data_item.instrumenterror"))).select("errors.*")

(
error_df.writeStream
    .queryName("silver_instrument_error_stream")
    .format("iceberg")
    .outputMode("append")
    .option("schemaLocation", "/Volumes/users/anhhoang_chu/iceberg/silver/error_schema")
    .option("checkpointLocation", "/Volumes/users/anhhoang_chu/iceberg/silver/_error_checkpoint")
    .trigger(availableNow=True)
    .toTable("users.anhhoang_chu.silver_instrument_error")
)

In [0]:
%sql
select * from users.anhhoang_chu.silver_instrument_error

## Instrument Risk Metrics

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS users.anhhoang_chu.silver_instrument_risk_metric({instrument_risk_schema})
USING ICEBERG
""")

In [0]:
from pyspark.sql.functions import explode, col 

error_df = exploded_df.withColumn("riskmetric", explode(col("data_item.instrumentriskmetric"))).select("riskmetric.*")

(
error_df.writeStream
    .queryName("silver_instrument_risk_stream")
    .format("iceberg")
    .outputMode("append")
    .option("schemaLocation", "/Volumes/users/anhhoang_chu/iceberg/silver/risk_schema")
    .option("checkpointLocation", "/Volumes/users/anhhoang_chu/iceberg/silver/_risk_checkpoint")
    .trigger(availableNow=True)
    .toTable("users.anhhoang_chu.silver_instrument_risk_metric")
)

In [0]:
%sql
select * from users.anhhoang_chu.silver_instrument_risk_metric

# Monitoring
Status of each streaming queries can be viewed in SparkUI/Structured Streaming

![pulsar-streaming-queries](./pulsar-streaming-queries.jpg)