## Demo 2: Understanding Incremental Data Ingestion with PySpark Structured Streaming
Spark Structured Streaming is a feature of Apache Spark that enables for simplified configuration when processing incremental datasets. Historically, streaming big data has been driven by the need to reduce latency for the sake of providing near real-time analytics; however, this lesson focuses more on implementing incremental data processing.

While not absolutely required when working with a *data lakehouse*, many enterprise implementations will derive substantial benefit from incremental data processing. Incremental ETL is important since it allows us to deal solely with new data that has been encountered since the last ingestion. Reliably processing only the new data reduces redundant processing and helps enterprises reliably scale data pipelines.

Consider the following datasets and use cases:
* Data scientists need secure, de-identified, versioned access to frequently updated records in an operational database
* Credit card transactions need to be compared to past customer behavior to identify and flag fraud
* A multi-national retailer seeks to serve custom product recommendations using purchase history
* Log files from distributed systems need to be analayzed to detect and respond to instabilities
* Clickstream data from millions of online shoppers needs to be leveraged for A/B testing of UX

These are just a small sample of datasets that grow incrementally and infinitely over time.  Here, we demonstrate the basics of using Spark Structured Streaming for incremental data processing.

#### Treating Infinite Data as a Table
The main benefit of Spark Structured Streaming is that it enables users to interact with ever-growing data sources as if they were simply a static table of records.

<img src="http://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png" width="800"/>

In the graphic above, a **data stream** describes any data source that grows over time. New data in a data stream might correspond to:
* A new JSON log file landing in cloud storage
* Updates to a database captured in a CDC feed
* Events queued in a pub/sub messaging feed
* A CSV file of sales closed the previous day

Historically, to update the results of a continuous stream of real-time data, either the entire source dataset had to be completely reprocessed, or custom logic had to be implemented to identify and process only those files or records that had been added since the previous update was executed.  Structured Streaming enables defining a query against the data source to automatically detect new records and propagate them through previously defined logic.

#### Basic Concepts
- The developer defines an **input table** by configuring a streaming read against a **source**. The syntax for doing this is similar to working with static data.
- A **query** is defined against the input table. Both the DataFrames API and Spark SQL can be used to easily define transformations and actions against the input table.
- This logical query on the input table generates the **results table**. The results table contains the incremental state information of the stream.
- The **output** of a streaming pipeline will persist updates to the results table by writing to an external **sink**. Generally, a sink will be a durable system such as files or a pub/sub messaging bus.
- New rows are appended to the input table for each **trigger interval**. These new rows are essentially analogous to micro-batch transactions and will be automatically propagated through the results table to the sink.

<img src="http://spark.apache.org/docs/latest/img/structured-streaming-model.png" width="800"/>

For more information, see the section in the <a href="http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts" target="_blank">Structured Streaming Programming Guide</a>.

#### End-to-End Fault Tolerance
Structured Streaming ensures end-to-end exactly-once fault-tolerance guarantees through _checkpointing_ (discussed below) and <a href="https://en.wikipedia.org/wiki/Write-ahead_logging" target="_blank">Write Ahead Logs</a>.  Structured Streaming sources, sinks, and the underlying execution engine work together to track the progress of stream processing. If a failure occurs, the streaming engine attempts to restart and/or reprocess the data. For best practices on recovering from a failed streaming query see <a href="https://docs.databricks.com/spark/latest/structured-streaming/production.html#recover-from-query-failures" target="_blank">docs</a>. Note: This approach _only_ works if the streaming source is replayable; replayable sources include cloud-based object storage and pub/sub messaging services.

At a high level, the underlying streaming mechanism relies on a couple approaches:
* First, Structured Streaming uses checkpointing and write-ahead logs to record the offset range of data being processed during each trigger interval.
* Next, the streaming sinks are designed to be _idempotent_ — that is, multiple writes of the same data (as identified by the offset) do _not_ result in duplicates being written to the sink.

Taken together, replayable data sources and idempotent sinks allow Structured Streaming to ensure **end-to-end, exactly-once semantics** under any failure condition.

#### Unsupported Operations
Most operations on a streaming DataFrame are identical to a static DataFrame. There are <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations" target="_blank">some exceptions to this</a>.  Consider the model of the data as a constantly appending table. Sorting is one of a handful of operations that is either too complex or logically not possible to do when working with streaming data.  While a full discussion of these exceptions is out of scope for this demonstration, note that advanced streaming methods like windowing and watermarking can be used to add additional functionality to incremental workloads.

### 1.0. Implementing a Structured Streaming Data Pipeline
The first step for any successful data lakehouse implementation is ingesting into a Delta Lake table from a storage location. Historically, ingesting files from a data lake into a database has been a complicated process; however, Spark Structured Streaming provides an easy-to-use mechanism for incrementally and efficiently processing new data files as they arrive in a file storage location. Due to the benefits and scalability that Spark Structured Streaming delivers, its use is a general **best practice** when ingesting data from storage locations like a cloud object storage.

Learning objectives include: 
* Describing the programming model used by Spark Structured Streaming
* Describing the requirements for end-to-end fault tolerance
* Configuring required options to perform a streaming read on a source
* Configuring required options to perform a streaming write to a sink

#### Tasks
For the sake of illustrating how continuously generated (unbounded) data is incrementally ingested into a data lakehouse, this demo will execute the following job steps: 
* Execute Spark Structured Streaming code to incrementally ingest data from storage to Delta Lake
* Observe what happens when a new file arrives in a directory
* Query a table fed by a streaming query

#### Data
This lab will demonstrate ingesting artificially generated medical data, in JSON format, that simulates heart rate monitor signals captured from numerous devices; therefore, this data represents what would be expected from a *Streaming* data source.

| Field | Type |
| --- | --- 
| device_id | int |
| mrn | long |
| time | double |
| heartrate | double |

#### 1.1. Import Shared Libraries

In [None]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import shutil
import pandas as pd
import time

import pyspark
from delta import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

#### 1.2. Instantiate Global Variables

In [None]:
# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'lab_data')
data_dir = os.path.join(base_dir, 'healthcare')
stream_dir = os.path.join(data_dir, 'streaming')
tracker_source_dir = os.path.join(stream_dir, 'tracker')
tracker_files_processed_dir = os.path.join(stream_dir, 'tracker_files_processed')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "healthcare_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
database_dir = os.path.join(sql_warehouse_dir, dest_database)

tracker_output = os.path.join(database_dir, 'heartbeat_tracker')

#### 1.3. Define Utilities

In [None]:
def create_directory(path: str):
    '''If the destination directory doesn't exist then create it'''
    try:
        if not os.path.exists(path):
            os.mkdir(path)
            new_dir = os.path.split(path)[1]
            print(f"The directory '{new_dir}' has been created successfully.")
            
    except Exception as e:
        return f"An error occured while attempting to create the directory: {e}"


def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))]

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def load_file(src_path: str, dst_path):
    '''Get the first file in the source directory, and move it to the destination directory'''
    try:
        '''First, ensure the destination directory exists.'''
        create_directory(dst_path)
        
        file = sorted(os.listdir(src_path))[0]
        src_file_path = os.path.join(src_path, file)
        dst_file_path = os.path.join(dst_path, file)
        
        retval = shutil.move(src_file_path, dst_file_path)

        dst_dir = os.path.split(dst_path)[1]
        if retval:
            return f"The file '{file}' has been successfully moved to the directory '{dst_dir}'."
        else:
            return f"The file '{file}' was not moved to the directory '{dst_dir}'."

    except Exception as e:
        return f"An error occured: {e}"


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def restore_files(src_path: str, dst_path: str):
    '''Move all files in the source directory to the destination directory'''
    try:
        for file in os.listdir(src_path):
            src_file_path = os.path.join(src_path, file)
            dst_file_path = os.path.join(dst_path, file)
            retval = shutil.move(src_file_path, dst_file_path)
            dst_dir = os.path.split(dst_path)[1]
            if retval:
                print(f"The file '{file}' has been successfully moved to the directory '{dst_dir}'.")

    except Exception as e:
        return f"An error occured: {e}"


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"

#### 1.4. Prepare the file system environment
- Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency
- List all files in the specified directory to ensure their existance and location

In [None]:
remove_directory_tree(database_dir)

In [None]:
get_file_info(tracker_source_dir)

#### 1.5. Create a New Spark Session

In [None]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"
shuffle_partitions = int(os.cpu_count())

builder = pyspark.sql.SparkSession.builder \
    .appName('PySpark Customers Delta Table in Juptyer')\
    .master(worker_threads)\
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '2g')\
    .config('spark.sql.adaptive.enabled', 'false') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.shuffle.partitions', shuffle_partitions) \
    .config('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .config('spark.sql.streaming.schemaInference', 'true') \
    .config('spark.sql.warehouse.dir', database_dir) \
    .config('spark.streaming.stopGracefullyOnShutdown', 'true')

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark

### 2.0. Using Spark Structured Streaming for Incremental Ingestion
In the cell below, a function is defined to demonstrate using Apache Spark Structured Streaming with the PySpark API. This code includes both a Structured Streaming read and write. If you wish to learn more about options for creating streaming dataframes and datasets, refer to the <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets" target="_blank">documentation</a>.

Note that when using Structured Streaming with automatic <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#schema-inference-and-partition-of-streaming-dataframesdatasets" target="_blank">schema inference and evolution</a>, the 4 arguments shown here should allow ingestion of most datasets. These arguments are explained below.

| argument | what it is | how it's used |
| --- | --- | --- |
| **`data_source`** | The directory of the source data | Spark will detect new files as they arrive in this location and queue them for ingestion; passed to the **`.load()`** method |
| **`source_format`** | The format of the source data |  The format of the source data should always be specified for the **`format`** option |
| **`table_name`** | The name of the target table | Spark Structured Streaming supports writing directly to Delta Lake tables by passing a table name as a string to the **`.table()`** method. Note that you can either append to an existing table or create a new table |
| **`checkpoint_directory`** | The location for storing metadata about the stream | This argument is passed to the **`checkpointLocation`** and **`schemaLocation`** options. Checkpoints keep track of streaming progress, while the schema location tracks updates to the fields in the source dataset |

**NOTE**: The code below has been streamlined to demonstrate Structured Streaming functionality. Later we'll see how additional transformations can be applied to source data before saving them to Delta Lake.

In [None]:
def load_to_table(data_source, source_format, table_name, checkpoint_directory):
    query = (spark.readStream \
             .format(source_format) \
             .option("schemaLocation", checkpoint_directory) \
             .option("maxFilesPerTrigger", 1) \
             .option("multiLine", "true") \
             .option("inferSchema", "true") \
             .load(data_source) \
             .writeStream \
             .format("delta") \
             .outputMode("append") \
             .option("checkpointLocation", checkpoint_directory) \
             .option("mergeSchema", "true") \
             .toTable(table_name))
    return query

We will now use the previously defined function and some path variables defined in the setup script to begin a structured streaming process that reads from a source directory of JSON files.

In [None]:
load_file(tracker_source_dir, tracker_files_processed_dir)

In [None]:
tracker_checkpoint = os.path.join(tracker_output, '_checkpoint')

query = load_to_table(data_source = tracker_files_processed_dir, \
                      source_format = "json", \
                      table_name = "heartbeat_tracker", \
                      checkpoint_directory = tracker_checkpoint)

Because Spark Structured Streaming loads data incrementally, the code above doesn't appear to finish executing. That's because this is a **continuously active query**, which means that as soon as new data arrives in our data source, it will be processed through our logic and loaded into our target table.

#### 2.1. Helper Function for Streaming Lessons
This lesson demonstrates combining streaming functions with batch and streaming queries against the results of their operations. This notebook is for instructional purposes and intended for interactive, cell-by-cell execution. That being said, here we define a helper function to prevent our notebook from executing the next cell just long enough to ensure data has been written out by a given streaming query. This code would not be necessary in a production job.

In [None]:
wait_until_stream_is_ready(query, 1)

### 3.0. Query Target Table
Once data has been ingested into Delta Lake, it can be interacted with in just the same manner as any SQL database table.

In [None]:
spark.sql("SELECT * FROM heartbeat_tracker").toPandas().head(5)

#### 3.1. Inspect the Streaming Table
While the field names for our data have been captured correctly, note that it encoded all fields as **`STRING`** type. Because JSON is a text-based format, this is the safest and most permissive type, ensuring that the least amount of data is dropped or ignored at ingestion due to type mismatch.

In [None]:
spark.sql("DESCRIBE TABLE heartbeat_tracker").toPandas()

#### 3.2. Create a Temporary View
Here we'll define a temporary view that summarizes the recordings in our target table.  This view will be used to demonstrate how new data is automatically ingested.

In [None]:
sql_tempview = """
CREATE OR REPLACE TEMP VIEW device_counts AS
  SELECT device_id, COUNT(*) AS total_recordings
  FROM heartbeat_tracker
  GROUP BY device_id;
"""
spark.sql(sql_tempview)

#### 3.2. Query the Temporary View
Now we can query the temporary view to inspect the progress of incrememtal data ingestion by our structured streaming query. 

In [None]:
def display_device_counts(records: int):
    sql_device_counts = f"""
        SELECT device_id AS `Device ID`
            , total_recordings AS `Total Recordings`
        FROM device_counts
        ORDER BY total_recordings DESC
        LIMIT {records}
    """
    dframe = spark.sql(sql_device_counts).toPandas()
    
    return dframe

display_device_counts(10)

### 4.0. Land New Data

As mentioned previously, Spark Structured Streaming is configured to incrementally process files from a directory in object storage into a Delta Lake table.  We have configured and are currently executing a query to process JSON files from the location specified by **`source_path`** into a table named **`heartbeat_tracker`**.

#### 4.1. Inspect Data Source
First, let's review the contents of the **`source_path`** directory.

In [None]:
get_file_info(tracker_files_processed_dir)

#### 4.2. Load Another JSON Data File
At present, you should see a single JSON file listed in this location. The function invoked in the following cell was configured by our setup script to allow us to mimic an external system writing new data to this directory. Each time you execute the cell below, a new file will land in the **`source_path`** directory.

In [None]:
load_file(tracker_source_dir, tracker_files_processed_dir)

#### 4.3. Confirm the New JSON File
When we list the contents of the **`source_path`** again using the cell below we should see an additional JSON file for each time you ran the previous cell.

In [None]:
get_file_info(tracker_files_processed_dir)

### 5.0. Tracking Ingestion Progress
Historically, many systems have been configured to either reprocess all records in a source directory to calculate current results or require data engineers to implement custom logic to identify new data that's arrived since the last time a table was updated.  With Spark Structured Streaming, your table has already been updated. Run the query below to confirm that new data has been ingested.

In [None]:
wait_until_stream_is_ready(query, 1)

display_device_counts(10)

The Spark streaming query we configured earlier automatically detects and processes records from the source directory into the target table. While there is a slight delay as records are ingested, a streaming query executing with default streaming configuration should update results in near real time.  The query below shows the table history. A new table version should be indicated for each **`STREAMING UPDATE`**. These update events coincide with new batches of data arriving at the source.

In [None]:
spark.sql("DESCRIBE HISTORY heartbeat_tracker").toPandas()

### 6.0. Clean Up
- Restore JSON files to their Original Storage Location
- Stop the Spark Session

In [None]:
restore_files(tracker_files_processed_dir, tracker_source_dir)

In [None]:
spark.stop()