In [0]:
spark

In [0]:
from pyspark.sql.functions import *

In [0]:
orders_schema = "order_id long,customer_id long,customer_fname string,customer_lname string,city string,state string,pincode long,line_items array<struct<order_item_id: long,order_item_product_id: long,order_item_quantity: long,order_item_product_price: float,order_item_subtotal: float>>"

In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/streaming_input/input1")

True

In [0]:
orders_df = spark \
.readStream \
.format("json") \
.schema(orders_schema) \
.option("path","dbfs:/FileStore/streaming_input/input1") \
.load()

In [0]:
orders_df.createOrReplaceTempView("orders")


In [0]:
exploded_orders = spark.sql("""select order_id,customer_id,city,state,
pincode,explode(line_items) lines from orders""")

In [0]:
exploded_orders.createOrReplaceTempView("exploded_orders")

In [0]:
flattened_orders = spark.sql("""select order_id, customer_id, city, state, pincode, 
lines.order_item_id as item_id, lines.order_item_product_id as product_id,
lines.order_item_quantity as quantity,lines.order_item_product_price as price,
lines.order_item_subtotal as subtotal from exploded_orders""")

In [0]:
flattened_orders.createOrReplaceTempView("orders_flattened")

In [0]:
aggregated_orders = spark.sql("""select customer_id, approx_count_distinct(order_id) as orders_placed, 
count(item_id) as products_purchased,sum(subtotal) as amount_spent 
from orders_flattened group by customer_id""")

In [0]:
def myfunction(orders_result,batch_id):
    orders_result.createOrReplaceTempView("orders_result")
    merge_statement = """merge into orders_final_result t using orders_result s
    on t.customer_id == s.customer_id
    when matched then
    update set t.products_purchased = s.products_purchased, t.orders_placed = s.orders_placed,
    t.amount_spent = s.amount_spent
    when not matched then
    insert *
    """

    orders_result._jdf.sparkSession().sql(merge_statement)



In [0]:
streaming_query = aggregated_orders \
.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation","checkpointdir108") \
.foreachBatch(myfunction) \
.start()


In [0]:
spark.sql("create table orders_final_result (customer_id long, orders_placed long, products_purchased long, amount_spent float)")

DataFrame[]

In [0]:
spark.sql("select * from orders_final_result").show()

+-----------+-------------+------------------+------------+
|customer_id|orders_placed|products_purchased|amount_spent|
+-----------+-------------+------------------+------------+
|        256|            1|                 3|      579.98|
|      11318|            1|                 5|     1129.86|
|       8827|            1|                 4|   699.85004|
|      11599|            2|                 5|   1109.9501|
+-----------+-------------+------------------+------------+



In [0]:
streaming_query.

{'id': 'c75d11ca-5a62-4402-90c0-7b4668b95209',
 'runId': '4bdf0b36-333b-4daa-ac54-0be264c75edc',
 'name': None,
 'timestamp': '2024-03-12T22:46:34.703Z',
 'batchId': 3,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 10, 'triggerExecution': 10},
 'stateOperators': [{'operatorName': 'stateStoreSave',
   'numRowsTotal': 4,
   'numRowsUpdated': 0,
   'allUpdatesTimeMs': 1196,
   'numRowsRemoved': 0,
   'allRemovalsTimeMs': 0,
   'commitTimeMs': 24803,
   'memoryUsedBytes': 89544,
   'numRowsDroppedByWatermark': 0,
   'numShufflePartitions': 200,
   'numStateStoreInstances': 200,
   'customMetrics': {'loadedMapCacheHitCount': 800,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 23296}}],
 'sources': [{'description': 'FileStreamSource[dbfs:/FileStore/streaming_input/input1]',
   'startOffset': {'logOffset': 2},
   'endOffset': {'logOffset': 2},
   'latestOffset': None,
   'numInputRows': 0,
   'inputRows