In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/streaming_input/input1")

Out[2]: True

In [0]:
dbutils.fs.ls("dbfs:/FileStore/streaming_input/input1")

Out[3]: []

In [0]:
schema_json = "order_id long, customer_id long, customer_fname string, customer_lname string,  city string, state string, pincode long, line_iems array<struct<order_item_id:long, order_item_product_id:long, order_item_quantity:long, order_item_subtotal:float, order_item_product_price:float>>"

In [0]:
order_data = spark \
.readStream \
.format("json") \
.schema(schema_json) \
.option("path", "dbfs:/FileStore/streaming_input/input1") \
.load()


In [0]:
order_data.createOrReplaceTempView("orders")

In [0]:
exploded_orders = spark.sql("select order_id, customer_id, city, state, pincode, explode(line_iems) as lines from orders")

In [0]:
exploded_orders.createOrReplaceTempView("exploded_orders")

In [0]:
flattend_orders = spark.sql("""select order_id, customer_id, city, state, pincode, 
                            lines.order_item_id as item_id,
                            lines.order_item_product_id as product_id, 
                            lines.order_item_quantity as quantity,
                            lines.order_item_product_price as product_price, 
                            lines.order_item_subtotal as subtotal
                            from exploded_orders""")

In [0]:
flattend_orders.createOrReplaceTempView("orders_flattend")

In [0]:
# from pyspark.sql.functions import distinct
aggregated_orders = sql("""select customer_id, approx_count_distinct(order_id) as orders_placed, count(item_id) as products_purchased, sum(subtotal) 
                              as amount_spent
                              from orders_flattend
                              group By customer_id
                              """)
    


In [0]:
def myfunction(orders_result, batch_id):

    orders_result.createOrReplaceTempView("orders_result")

    merge_statement = """merge into orders_result_final t using orders_result s 
    on t.customer_id = s.customer_id
    when matched 
    then update set t.products_purchased = s.products_purchased, t.orders_placed = s.orders_placed, t.amount_spent = s.amount_spent
    when not matched then 
    insert * 
    """
    orders_result._jdf.sparkSession().sql(merge_statement)

In [0]:
#if table exist you can drop that table using below command

spark.sql("drop table orders_result_final")

Out[36]: DataFrame[]

In [0]:
#Also we will delete the directory also
dbutils.fs.rm("dbfs:/user/hive/warehouse/orders_result_final", True)

Out[37]: False

In [0]:
#target table creation command

spark.sql("create table orders_result_final (customer_id long, orders_placed long, products_purchased long, amount_spent float)")

Out[38]: DataFrame[]

In [0]:
streaming_query = aggregated_orders \
    .writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "Checkpointlocation105") \
    .foreachBatch(myfunction) \
    .start()

In [0]:
spark.sql("select * from orders_result_final").show()

+-----------+-------------+------------------+------------+
|customer_id|orders_placed|products_purchased|amount_spent|
+-----------+-------------+------------------+------------+
|        256|            1|                 6|     1159.96|
|       8827|            1|                 8|   1399.7001|
|       9557|            8|                56|     15564.5|
|       8355|            6|                28|      5939.2|
|       6272|            3|                16|   3419.6802|
|      10280|            3|                20|     5319.52|
|      11318|            1|                10|     2259.72|
|      11599|            2|                10|   2219.9001|
|       5882|            3|                 5|   674.91003|
+-----------+-------------+------------------+------------+



In [0]:
streaming_query.explain()

