In [0]:
# Databricks notebook source
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType
from pyspark.sql.functions import col, expr

# Define the updated schema
transaction_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("transaction_amount", DoubleType(), True),
    StructField("payment_method", StringType(), True)
])

connectionString = ""
ehConf = {
    "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
    "eventhubs.consumerGroup": "$Default"  # Change if using a different consumer group
}
 
# Reading the stream from Azure Event Hub
df = spark.readStream.format("eventhubs").options(**ehConf).load()
 
# Transforming the data
df = df.selectExpr("CAST(body AS STRING) as json_data") \
       .withColumn("data", from_json(col("json_data"), transaction_schema)) \
       .select("data.*")  # Extract only required fields
 
display(df)
# Function to process each micro-batch and write to Delta
def process_batch(batchDF, batch_id):
    if batchDF.count() > 0:
        batchDF.write \
            .format("delta") \
            .mode("append") \
            .saveAsTable("default.my_delta_table")  # Save to Delta table
 
# Start streaming and apply foreachBatch
query = df.writeStream \
    .outputMode("append") \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "dbfs:/user/hive/warehouse/checkpoint/") \
    .start()
 

transaction_id,customer_id,transaction_date,transaction_type,transaction_amount,payment_method
RT_TXN20000,CUST1070,2025-02-20 16:26:09,Deposit,150000.0,Credit Card
RT_TXN20001,CUST1077,2025-02-20 12:26:09,Deposit,150000.0,UPI
RT_TXN20002,CUST1024,2025-02-20 14:26:09,Insurance Premium,20000.0,Net Banking
RT_TXN20003,CUST1014,2025-02-20 02:26:09,Fund Transfer,5000.0,Net Banking
RT_TXN20004,CUST1066,2025-02-20 09:26:09,Insurance Premium,1000.0,Net Banking
RT_TXN20005,CUST1070,2025-02-19 23:26:09,Deposit,200000.0,UPI
RT_TXN20006,CUST1025,2025-02-20 12:26:09,Bill Payment,2000.0,Credit Card
RT_TXN20007,CUST1057,2025-02-20 08:26:09,Bill Payment,2000.0,Credit Card
RT_TXN20008,CUST1014,2025-02-20 02:26:09,Withdrawal,15000.0,Debit Card
RT_TXN20009,CUST1024,2025-02-19 21:26:09,Stock Investment,200000.0,Net Banking


In [0]:
cosmos_endpoint = ""
cosmos_key = ""
cosmos_database = ""
cosmos_container = "my_container"
 
cosmos_config = {
    "spark.cosmos.accountEndpoint": cosmos_endpoint,
    "spark.cosmos.accountKey": cosmos_key,
    "spark.cosmos.database": cosmos_database,
    "spark.cosmos.container": cosmos_container,
    "spark.cosmos.write.strategy": "ItemOverwrite"
}

In [0]:
from pyspark.sql.functions import col
df = spark.read.format("delta").load("dbfs:/user/hive/warehouse/my_delta_table")
df = df.withColumn("id", col("transaction_id").cast("string"))

In [0]:
df.write \
    .format("cosmos.oltp") \
    .mode("APPEND") \
    .option("spark.synapse.linkedService", "your-linked-service") \
    .option("spark.cosmos.accountEndpoint", cosmos_endpoint) \
    .option("spark.cosmos.accountKey", cosmos_key) \
    .option("spark.cosmos.database", cosmos_database) \
    .option("spark.cosmos.container", cosmos_container) \
    .save()