%md
## Workflow Description

This Jupyter Notebook document contains a workflow for stateful streaming analysis using Apache Spark and Delta Lake.

### Cell 0: Spark Session Initialization
In this cell, we initialize the Spark session and import the necessary modules.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("Statefull_Streaming").getOrCreate()

### Cell 1: Create Database
This cell creates a database if it doesn't already exist in the Spark session.

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS orders.statefull_streaming")

### Cell 2: Define File Paths
This cell defines the file paths for the Delta tables and checkpoint locations.

In [None]:
DELTA_TABLE =  "s3://labdataset/delta/orders"
HIGH_VALUE_CHECKPOINT_LOCATION = "s3://labdataset/delta/high_value_orders/checkpoint"
HIGH_VALUE_LOCATION = "s3://labdataset/delta/high_value_orders/data"
ITEM_CUSTOMER_CHECKPOINT_LOCATION = "s3://labdataset/delta/item_customer_count/checkpoint"
ITEM_CUSTOMER_LOCATION = "s3://labdataset/delta/item_customer_count/data"

### Cell 3: Reading Streaming Data
This cell reads the streaming data from the Delta table and displays it.

In [None]:
stat_df = spark.readStream.format("delta").load(DELTA_TABLE)
display(stat_df)

%md
#### Select the field we need for our statefull streaming counters : 

- Count orders where total amount > 100.
- Count items per customerId.

In [None]:
stat_df = stat_df.select(
            col('batch_id'),
            col('parsedValue.totalAmount').alias('totalAmount'),
            col('parsedValue.customerId').alias('customerId'),
            col('parsedValue.items').alias('items'),
        )

stat_df.createOrReplaceTempView("stat_view")

### Cell 6: First Counter Aggregation
This cell performs the first counter aggregation by counting the number of high-value orders where the total amount is greater than 4000.

In [None]:
high_value_orders_query = """
SELECT COUNT(*) AS highValueOrderCount
FROM stat_view
WHERE totalAmount > 4000
"""

high_value_orders_df = spark.sql(high_value_orders_query)

### Cell 8: Save First Counter Output
This cell saves the output of the first counter aggregation into an S3 location using Delta Lake.

In [None]:
high_value_orders_output_query = high_value_orders_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", HIGH_VALUE_CHECKPOINT_LOCATION) \
    .option("path", HIGH_VALUE_LOCATION) \
    .outputMode("complete") \
    .start()


### Cell 10: Second Counter Aggregation
This cell performs the second counter aggregation by counting the number of items per customer.

In [None]:
items_per_customer_query = """
SELECT customerId, COUNT(*) AS itemCount
FROM (
    SELECT customerId, EXPLODE(items) AS item
    FROM stat_view
) exploded_view
GROUP BY customerId
"""

items_per_customer_df = spark.sql(items_per_customer_query)

### Cell 12: Save Second Counter Output
This cell saves the output of the second counter aggregation into an S3 location using Delta Lake.

In [None]:
items_per_customer_output_query  = items_per_customer_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", ITEM_CUSTOMER_CHECKPOINT_LOCATION) \
    .option("path", ITEM_CUSTOMER_LOCATION) \
    .outputMode("complete") \
    .start()


### Cell 14: Await Termination
This cell waits for the streaming queries to finish.

In [None]:
high_value_orders_output_query.awaitTermination()
items_per_customer_output_query.awaitTermination()