### Lab 5: Processing Incremental Updates with PySpark Structured Streaming and Delta tables
In this lab you'll apply your knowledge of PySpark and structured streaming to implement a simple multi-hop (medallion) architecture.

#### 1.0. Import Required Libraries

In [1]:
%pip install findspark pyspark delta-spark==3.3.0


Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import shutil
import time

import pyspark
from delta import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

C:\spark-4.5.4-bin-hadoop3


#### 2.0. Instantiate Global Variables

In [3]:
# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = "C:\\lab_data"
data_dir = os.path.join(base_dir, 'retail-org')
customers_stream_dir = "C:\\lab_data\\retail-org\\customers"

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "customers_dlh"
sql_warehouse_dir = "C:\\spark-warehouse"
database_dir = os.path.join(sql_warehouse_dir, dest_database)
print(dest_database,"/n", sql_warehouse_dir,"/n", database_dir)

customers_output_bronze = os.path.join(database_dir, 'customers_bronze')
customers_output_silver = os.path.join(database_dir, 'customers_silver')
customers_output_gold = os.path.join(database_dir, 'customers_gold')

customers_dlh /n C:\spark-warehouse /n C:\spark-warehouse\customers_dlh


#### 3.0. Define Global Functions

In [4]:
def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"

#### 4.0. Create a New Spark Session

In [5]:



worker_threads = f"local[{int(os.cpu_count()/2)}]"
shuffle_partitions = int(os.cpu_count())

builder = pyspark.sql.SparkSession.builder \
    .appName('PySpark Customers Delta Table in Juptyer')\
    .master(worker_threads)\
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '2g')\
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.adaptive.enabled', 'false') \
    .config('spark.sql.debug.maxToStringFields', 50) \
    .config('spark.sql.shuffle.partitions', shuffle_partitions) \
    .config('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .config('spark.sql.streaming.schemaInference', 'true') \
    .config('spark.sql.warehouse.dir', database_dir) \
    .config('spark.streaming.stopGracefullyOnShutdown', 'true')

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark

### 5.0. Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [6]:
remove_directory_tree(database_dir)

"Directory 'C:\\spark-warehouse\\customers_dlh' has been removed successfully."

#### 6.0. Bronze Table: Ingest and Stage Data
This lab uses a collection of customer-related CSV data found in *`../04-PySpark/lab_data/retail-org/customers/`*. 
<br>This is available to you by way of the `customers_stream_dir` variable.

##### 6.1. Read this data into a Stream using schema inference
- Use a **`_checkpoint`** folder and the **`schemaLocation`** option to store the schema info in a dedicated folder for **`customers`**.
- Set the **`maxFilesPerTrigger`** option to **`1`**.
- Set the **`inferSchema`** and **`header`** options to **`true`**.
- Use **`.csv()`** to specify the source directory.

In [7]:
customers_csv = os.path.join(customers_stream_dir, "customers.csv")
print(customers_csv)

C:\lab_data\retail-org\customers\customers.csv


In [8]:
df_customers = (
    spark.read
    .format('csv')
    .options(header='true', inferSchema=True)
    .load(customers_csv)
)

# Unit Test -------------
print(f"The 'df_customers' object is of type: {type(df_customers)}.")
df_customers.printSchema()

print(f"The 'df_customers' DataFrame contains {df_customers.count()} rows.")
df_customers.toPandas().head(5)


The 'df_customers' object is of type: <class 'pyspark.sql.dataframe.DataFrame'>.
root
 |-- customer_id: integer (nullable = true)
 |-- tax_id: double (nullable = true)
 |-- tax_code: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- street: string (nullable = true)
 |-- number: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- ship_to_address: string (nullable = true)
 |-- valid_from: integer (nullable = true)
 |-- valid_to: double (nullable = true)
 |-- units_purchased: double (nullable = true)
 |-- loyalty_segment: integer (nullable = true)

The 'df_customers' DataFrame contains 28813 rows.


Unnamed: 0,customer_id,tax_id,tax_code,customer_name,state,city,postcode,street,number,unit,region,district,lon,lat,ship_to_address,valid_from,valid_to,units_purchased,loyalty_segment
0,11123757,,,"SMITH, SHIRLEY",IN,BREMEN,46506.0,N CENTER ST,521.0,,Indiana,50.0,-86.146582,41.450763,"IN, 46506.0, N CENTER ST, 521.0",1532824233,1548137000.0,34.0,3
1,30585978,,,"STEPHENS, GERALDINE M",OR,ADDRESS,0.0,NO SITUS,,,,,-122.105516,45.374317,"OR, 0, NO SITUS, nan",1523100473,,18.0,3
2,349822,,,"GUZMAN, CARMEN",VA,VIENNA,22181.0,HILL RD,2860,,VA,,-77.294126,38.883033,"VA, 22181, HILL RD, 2860",1522922493,,5.0,0
3,27652636,,,"HASSETT, PATRICK J",WI,VILLAGE OF NASHOTAH,53058.0,IVY LANE,W333N 5591,,,,-88.409517,43.121379,"WI, 53058.0, IVY LANE, W333N 5591",1531834357,1558052000.0,7.0,1
4,14437343,,,"HENTZ, DIANA L",OH,COLUMBUS,43228.0,ALLIANCE WAY,5706,,OH,FRA,-83.158438,39.978218,"OH, 43228.0, ALLIANCE WAY, 5706",1517227530,,0.0,0


In [9]:
#lab code. do not delete
customers_checkpoint_bronze = os.path.join(customers_output_bronze, '_checkpoint')

df_customers_bronze = (
    spark.readStream \
        .option("schemaLocation", customers_checkpoint_bronze) \
        .option("maxFilesPerTrigger", 1) \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .csv(customers_stream_dir)
)

df_customers_bronze.isStreaming


True

In [10]:
df_customers_bronze.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- tax_id: double (nullable = true)
 |-- tax_code: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- street: string (nullable = true)
 |-- number: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- ship_to_address: string (nullable = true)
 |-- valid_from: integer (nullable = true)
 |-- valid_to: double (nullable = true)
 |-- units_purchased: double (nullable = true)
 |-- loyalty_segment: integer (nullable = true)



##### 6.2. Stream the raw data to a Delta table.
 - Use the **`delta`** format.
 - Use the **`append`** output mode.
 - Use **`customers_bronze`** as the **`queryName`**.
 - Use **`availableNow = True`** for the **`trigger`**
 - Use the **`_checkpoint`** folder with the **`checkpointLocation`** option.

In [11]:
customers_bronze_query = (
    df_customers_bronze \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .queryName("customers_bronze") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", customers_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(customers_output_bronze)
    # TODO: Configurations
)

In [12]:
customers_bronze_query.awaitTermination()

##### 6.3. Create a Streaming Temporary View named **`customers_bronze_temp`**
- Use the **`delta`** format.
- Set the **`inferSchema`** option to **`true`**
- Load the data from the output of the **`bronze`** delta table (**`customers_output_bronze`**)

In [13]:
(spark.readStream \
    .format("delta") \
    .option("InferSchema", "true") \
    .load(customers_output_bronze) \
    .createOrReplaceTempView("customers_bronze_streaming")
    # TODO: Configurations
)

##### 6.4. Clean and Enhance the Data
Use the CTAS syntax to define a new streaming view called **`bronze_enhanced_temp`** that does the following:
* Omits records with a null **`postcode`** (set to zero)
* Inserts a column called **`receipt_time`** containing a current timestamp using the **`current_timestamp()`** function.
* Inserts a column called **`source_file`** containing the input filename using the **`imput_file_name()`** function.

In [14]:
sql_bronze_temp = """
   -- TODO: author SQL Statement
   CREATE OR REPLACE TEMPORARY VIEW bronze_enhanced_temp AS
      SELECT *
         , current_timestamp() AS receipt_time
         , input_file_name() AS source_file
   FROM customers_bronze_streaming
   WHERE postcode IS NOT NULL AND postcode != '0'
"""
spark.sql(sql_bronze_temp)

DataFrame[]

#### 7.0. Silver Table
##### 7.1. Stream the data from **`bronze_enhanced_temp`** to a **`Delta`** table named **`customers_silver`**.
 - Use the **`append`** output mode.
 - Use **`customers_silver`** as the **`queryName`**.
 - Use **`availableNow = True`** for the **`trigger`**
 - Use a **`_checkpoint`** folder with the **`checkpointLocation`** option to store the schema info in a dedicated folder for **`customers`**.

In [15]:
customers_checkpoint_silver = os.path.join(customers_output_silver, '_checkpoint')

customers_silver_query = \
(spark.table("bronze_enhanced_temp") \
     .writeStream \
     .format("delta") \
     .outputMode("append") \
     .queryName("customers_silver") \
     .trigger(availableNow=True) \
     .option("checkpointLocation", customers_checkpoint_silver) \
     .option("compression", "snappy") \
     .start(customers_output_silver)
     # TODO: Configurations
)

# does not work because i didnt do the above one i think. 

In [16]:
customers_silver_query.awaitTermination()

##### 7.2. Create a Streaming Temporary View
- Create another streaming temporary view named **`customers_silver_temp`** for the **`customers_silver`** table so we can perform business-level queries using SQL.

In [17]:
(spark.readStream \
     # TODO: Confgurations
     .format("delta") \
     .option("InferSchema", "true") \
     .load(customers_output_silver) \
     .createOrReplaceTempView("customers_silver_temp")
)

#### 8.0. Gold Table
##### 8.1. Use the CTAS syntax to define a new streaming view called **`customer_count_by_state_temp`** that does the following:
- Reads data from the **`customers_silver_temp`** temporary view created in the preceding step.
- Selects the **`state`** along with the number of customers per (grouped by) state.

In [18]:
sql_gold_temp = """
CREATE OR REPLACE TEMPORARY VIEW customer_count_by_state_temp AS
    SELECT state, count(customer_id) AS customer_count
    FROM customers_silver_temp
    GROUP BY state
"""
spark.sql(sql_gold_temp)

DataFrame[]

##### 8.2. Stream the data from the **`customer_count_by_state_temp`** view to a Delta table called **`customer_count_by_state_gold`**.
- Use the **`complete`** output mode because aggregations like **`count()`** and sorting cannot operate on *unbounded* datasets.  
- Use a **`_checkpoint`** folder with the **`checkpointLocation`** option and a dedicated folder for **`customers`** as the checkpoint path.

In [19]:
customer_count_checkpoint_gold = os.path.join(customers_output_gold, '_checkpoint')

customer_count_by_state_gold_query = \
(spark.table("customer_count_by_state_temp") \
     .writeStream \
     .format("delta") \
     .outputMode("complete") \
     .queryName("customer_count_by_state_gold") \
     .trigger(availableNow=True) \
     .option("checkpointLocation", customer_count_checkpoint_gold) \
     .toTable("customer_count_by_state_gold")
     # TODO: Configurations
)

In [20]:
customer_count_by_state_gold_query.awaitTermination()

#### 9.0. Query the Results
- Query the **`customer_count_by_state_gold`** table (this will not be a streaming query).
- Select the **`state`** and **`customer_count`** columns.
- Sort the results by **`customer_count`** in descending order (i.e., from highest to lowest).

In [21]:
sql_customer_count_query = """
    -- TODO: Author SQL query
    SELECT state AS `State`
        , customer_count AS `Customer Count`
    FROM customer_count_by_state_gold
    ORDER BY `Customer Count` DESC
"""
spark.sql(sql_customer_count_query).toPandas()

Unnamed: 0,State,Customer Count
0,NY,3417
1,CA,2900
2,FL,2525
3,OH,1914
4,MA,1889
5,NJ,1503
6,IN,1105
7,MI,1090
8,NC,1016
9,WI,992


In [22]:
spark.stop()