In [0]:
dbutils.fs.unmount('/mnt/files/')

In [0]:
storageAccountName = 'retailanalyticssa'
blobContainerName = 'prac'
storageAccountAccessKey = dbutils.secrets.get(scope = "sa_key", key = "sakey")

dbutils.fs.mount(
  source = f'wasbs://{blobContainerName}@{storageAccountName}.blob.core.windows.net',
  mount_point = '/mnt/files/',
  extra_configs = {'fs.azure.account.key.' + storageAccountName + '.blob.core.windows.net': storageAccountAccessKey}
)

In [0]:
from pyspark.sql.functions import *

In [0]:
orders_schema = """order_id long, customer_id long, customer_fname string, customer_lname string, 
                    city string, state string, pincode long,
                    line_items array<struct<order_item_id: long,
                                            order_item_product_id: long,
                                            order_item_quantity: long,
                                            order_item_product_price: float,
                                            order_item_subtotal: float>>"""

In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/streaming_input/input1")

In [0]:
orders_df = spark \
.readStream \
.format("json") \
.schema(orders_schema) \
.option("path","dbfs:/mnt/files/orders_combined.json") \
.load()

In [0]:
orders_df.createOrReplaceTempView("orders")


In [0]:
exploded_orders = spark.sql("""select order_id,customer_id,city,state,
pincode,explode(line_items) lines from orders""")

In [0]:
exploded_orders.createOrReplaceTempView("exploded_orders")

In [0]:
flattened_orders = spark.sql("""select order_id, customer_id, city, state, pincode, 
lines.order_item_id as item_id, lines.order_item_product_id as product_id,
lines.order_item_quantity as quantity,lines.order_item_product_price as price,
lines.order_item_subtotal as subtotal from exploded_orders""")

In [0]:
flattened_orders.createOrReplaceTempView("orders_flattened")

In [0]:
aggregated_orders = spark.sql("""select customer_id, approx_count_distinct(order_id) as orders_placed, 
count(item_id) as products_purchased,sum(subtotal) as amount_spent 
from orders_flattened group by customer_id""")

In [0]:
def myfunction(orders_result,batch_id):
    orders_result.createOrReplaceTempView("orders_result")
    merge_statement = """merge into orders_final_result t using orders_result s
    on t.customer_id == s.customer_id
    when matched then
    update set t.products_purchased = s.products_purchased, t.orders_placed = s.orders_placed,
    t.amount_spent = s.amount_spent
    when not matched then
    insert *
    """

    orders_result._jdf.sparkSession().sql(merge_statement)



In [0]:
streaming_query = aggregated_orders \
.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation","checkpointdir108") \
.foreachBatch(myfunction) \
.start()


In [0]:
spark.sql("create table orders_final_result (customer_id long, orders_placed long, products_purchased long, amount_spent float)")

In [0]:
spark.sql("select * from orders_final_result").show()

In [0]:
streaming_query.