### Lab 5: Processing Incremental Updates with PySpark Structured Streaming and Delta tables
In this lab you'll apply your knowledge of PySpark and structured streaming to implement a simple multi-hop (medallion) architecture.

#### 1.0. Import Required Libraries

In [2]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import shutil
import time

import pyspark
from delta import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

/opt/anaconda3/envs/pysparkenv/lib/python3.12/site-packages/pyspark


#### 2.0. Instantiate Global Variables

In [4]:
# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'lab_data')
data_dir = os.path.join(base_dir, 'retail-org')
customers_stream_dir = os.path.join(data_dir, 'customers')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "customers_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
database_dir = os.path.join(sql_warehouse_dir, dest_database)

customers_output_bronze = os.path.join(database_dir, 'customers_bronze')
customers_output_silver = os.path.join(database_dir, 'customers_silver')
customers_output_gold = os.path.join(database_dir, 'customers_gold')

#### 3.0. Define Global Functions

In [6]:
def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"

#### 4.0. Create a New Spark Session

In [8]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"
shuffle_partitions = int(os.cpu_count())

builder = pyspark.sql.SparkSession.builder \
    .appName('PySpark Customers Delta Table in Juptyer')\
    .master(worker_threads)\
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '2g')\
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.adaptive.enabled', 'false') \
    .config('spark.sql.debug.maxToStringFields', 50) \
    .config('spark.sql.shuffle.partitions', shuffle_partitions) \
    .config('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .config('spark.sql.streaming.schemaInference', 'true') \
    .config('spark.sql.warehouse.dir', database_dir) \
    .config('spark.streaming.stopGracefullyOnShutdown', 'true')

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark

25/04/07 23:45:41 WARN Utils: Your hostname, Sonias-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.26.76.137 instead (on interface en0)
25/04/07 23:45:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/soniasadani/.ivy2/cache
The jars for the packages stored in: /Users/soniasadani/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a597ecf5-b5fa-4bb5-af2d-ad8aedd13195;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central


:: loading settings :: url = jar:file:/opt/anaconda3/envs/pysparkenv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 79ms :: artifacts dl 2ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.3.0 from central in [default]
	io.delta#delta-storage;3.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-a597ecf5-b5fa-4bb5-af2d-ad8aedd13195
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/2ms)
25/04/07 23:45:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-ja

### 5.0. Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [10]:
remove_directory_tree(database_dir)

"Directory '/Users/soniasadani/Documents/04-PySpark/spark-warehouse/customers_dlh' has been removed successfully."

#### 6.0. Bronze Table: Ingest and Stage Data
This lab uses a collection of customer-related CSV data found in *`../04-PySpark/lab_data/retail-org/customers/`*. 
<br>This is available to you by way of the `customers_stream_dir` variable.

##### 6.1. Read this data into a Stream using schema inference
- Use a **`_checkpoint`** folder and the **`schemaLocation`** option to store the schema info in a dedicated folder for **`customers`**.
- Set the **`maxFilesPerTrigger`** option to **`1`**.
- Set the **`inferSchema`** and **`header`** options to **`true`**.
- Use **`.csv()`** to specify the source directory.

In [12]:
customers_checkpoint_bronze = os.path.join(customers_output_bronze, '_checkpoint')

df_customers_bronze = (
    spark.readStream \
    .format("csv")
        .option("schemaLocation", customers_checkpoint_bronze)
        .option("maxFilesPerTrigger", 1)
        .option("header", "true")
        .option("inferSchema", "true")
        .load(customers_stream_dir)
)

In [13]:
df_customers_bronze.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- tax_id: double (nullable = true)
 |-- tax_code: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- street: string (nullable = true)
 |-- number: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- ship_to_address: string (nullable = true)
 |-- valid_from: integer (nullable = true)
 |-- valid_to: double (nullable = true)
 |-- units_purchased: double (nullable = true)
 |-- loyalty_segment: integer (nullable = true)



##### 6.2. Stream the raw data to a Delta table.
 - Use the **`delta`** format.
 - Use the **`append`** output mode.
 - Use **`customers_bronze`** as the **`queryName`**.
 - Use **`availableNow = True`** for the **`trigger`**
 - Use the **`_checkpoint`** folder with the **`checkpointLocation`** option.

In [15]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {dest_database}")
customers_bronze_query = (
    df_customers_bronze
        .writeStream
        .format("delta")
        .outputMode("append")
        .queryName("customers_bronze")
        .trigger(availableNow=True)
        .option("checkpointLocation", customers_checkpoint_bronze)
        .toTable(f"{dest_database}.customers_bronze")
)

In [16]:
customers_bronze_query.awaitTermination()

                                                                                

In [17]:
os.listdir(customers_stream_dir)

['customers.csv']

##### 6.3. Create a Streaming Temporary View named **`customers_bronze_temp`**
- Use the **`delta`** format.
- Set the **`inferSchema`** option to **`true`**
- Load the data from the output of the **`bronze`** delta table (**`customers_output_bronze`**)

In [19]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {dest_database}")

customers_bronze_query = (
    df_customers_bronze
        .writeStream
        .format("delta")
        .outputMode("append")
        .queryName("customers_bronze")
        .trigger(availableNow=True)
        .option("checkpointLocation", customers_checkpoint_bronze)
        .toTable(f"{dest_database}.customers_bronze")
)

customers_bronze_query.awaitTermination()

##### 6.4. Clean and Enhance the Data
Use the CTAS syntax to define a new streaming view called **`bronze_enhanced_temp`** that does the following:
* Omits records with a null **`postcode`** (set to zero)
* Inserts a column called **`receipt_time`** containing a current timestamp using the **`current_timestamp()`** function.
* Inserts a column called **`source_file`** containing the input filename using the **`imput_file_name()`** function.

In [21]:
(
    spark.readStream
        .table(f"{dest_database}.customers_bronze")
        .createOrReplaceTempView("customers_bronze_temp")
)

#### 7.0. Silver Table
##### 7.1. Stream the data from **`bronze_enhanced_temp`** to a **`Delta`** table named **`customers_silver`**.
 - Use the **`append`** output mode.
 - Use **`customers_silver`** as the **`queryName`**.
 - Use **`availableNow = True`** for the **`trigger`**
 - Use a **`_checkpoint`** folder with the **`checkpointLocation`** option to store the schema info in a dedicated folder for **`customers`**.

In [23]:
customers_silver_query = (
    spark.sql("SELECT * FROM bronze_enhanced_temp")
        .writeStream
        .format("delta")
        .outputMode("append")
        .queryName("customers_silver")
        .trigger(availableNow=True)
        .option("checkpointLocation", customers_checkpoint_silver)
        .toTable(f"{dest_database}.customers_silver")
)

customers_silver_query.awaitTermination()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `bronze_enhanced_temp` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [bronze_enhanced_temp], [], false


##### 7.2. Create a Streaming Temporary View
- Create another streaming temporary view named **`customers_silver_temp`** for the **`customers_silver`** table so we can perform business-level queries using SQL.

In [None]:
(spark.readStream \
     # TODO: Confgurations
)

#### 8.0. Gold Table
##### 8.1. Use the CTAS syntax to define a new streaming view called **`customer_count_by_state_temp`** that does the following:
- Reads data from the **`customers_silver_temp`** temporary view created in the preceding step.
- Selects the **`state`** along with the number of customers per (grouped by) state.

In [None]:
sql_gold_temp = """
    TODO: Author SQL statement
"""
spark.sql(sql_gold_temp)

##### 8.2. Stream the data from the **`customer_count_by_state_temp`** view to a Delta table called **`customer_count_by_state_gold`**.
- Use the **`complete`** output mode because aggregations like **`count()`** and sorting cannot operate on *unbounded* datasets.  
- Use a **`_checkpoint`** folder with the **`checkpointLocation`** option and a dedicated folder for **`customers`** as the checkpoint path.

In [None]:
customer_count_checkpoint_gold = os.path.join(customers_output_gold, '_checkpoint')

customer_count_by_state_gold_query = \
(spark.table("customer_count_by_state_temp") \
     # TODO: Configurations
)

In [None]:
customer_count_by_state_gold_query.awaitTermination()

#### 9.0. Query the Results
- Query the **`customer_count_by_state_gold`** table (this will not be a streaming query).
- Select the **`state`** and **`customer_count`** columns.
- Sort the results by **`customer_count`** in descending order (i.e., from highest to lowest).

In [None]:
sql_customer_count_query = """
    TODO: Author SQL query
"""
spark.sql(sql_customer_count_query).toPandas()

In [None]:
spark.stop()