In [0]:
"""
WARNING
    > Remove unnecessary code from notebooks that would return results, such as display and count.
    > Do not run Structured Streaming workloads on interactive clusters; always schedule streams as jobs.
    > To help streaming jobs recover automatically, configure jobs with infinite retries.
    > Do not use auto-scaling for workloads with Structured Streaming.
"""

In [0]:
import pyspark.sql.functions as f
import pyspark.sql.utils
import pandas as pd
from datetime import timedelta, datetime

In [0]:
startdate = (datetime.now() - timedelta(1)).strftime("%Y-%m-%d")
enddate = (datetime.now() - timedelta(1)).strftime("%Y-%m-%d")
dates = [startdate,enddate]
yesterday = (datetime.now() - timedelta(1)).strftime("%Y-%m-%d")
now = datetime.now()

#### Streaming with Delta format data source

In [0]:
source_path = "s3://datalake-binary/event-types/data/shop.tracking.outfit-card.click"
target_path = "s3://tracking-analytics/stream"
target_checkpoint_path = "s3://tracking-analytics/checkpoints/target"

In [0]:
# Check the latest delta file in source s3 bucket directory
dbutils.fs.ls(source_path)[-2:-1]

In [0]:
# Read from source Delta table as Stream
source_query = (
    spark.readStream
    .format("delta")
    .load(source_path)
    .filter(f.col("dt").between(*dates))
    .limit(5)
)

In [0]:
# display() on a streaming DataFrame starts a streaming job.
display(source_query, streamName = "read_stream")  

In [0]:
# Stopping the reading stream currently running
for stream in spark.streams.active:   
    if stream.name == "read_stream":
        s = spark.streams.get(stream.id)
        s.stop()

In [0]:
# Write Stream to Delta Lake
# The "availableNow=True" option processes all available data in multiple batches then terminates the query
# The processingTime = '60 seconds' option checks evey 1 min
# The path of "checkpointLocation" should be unique for each writer streaming. 

target_query =  (
    source_query
        .withColumn('etl_date',f.lit(now))
        .writeStream
        .format("delta")
        .option("checkpointLocation", target_checkpoint_path)
        .option("path", target_path)
        .outputMode("append")
        #.trigger(processingTime = '60 seconds')  
        .trigger(availableNow=True)
        #.toTable(table_name)
        .start()
    )

In [0]:
# Check writing stream in specified seconds, if it's still running it will return false
target_query.awaitTermination(timeout=10)

True

In [0]:
# Stop the target stream if needed
target_query.stop()

In [0]:
# Create a Stream Function
def stream_delta(source_path : str, source_checkpoint: str, target_path: str, target_checkpoint: str):
    """ Load Delta files from source path and write to Delta format """
    spark = SparkSession.builder.getOrCreate()
    
    source_query = (
        spark.readStream
        .format("delta")
        .load(source_path)
    )

    target_query =  (
        source_query
            .writeStream
            .format("delta")
            .option("checkpointLocation", target_checkpoint)
            .option("path", target_path)
            .outputMode("append")
            .trigger(processingTime = '60 seconds')  
            .partitionBy('dt')
            .start()
        )  
    
# Run the stream
stream_delta(source_path, source_checkpoint, target_path, target_checkpoint):

In [0]:
# Check the data at the Target Delta Lake
target_df = (
        spark.read
        .format("delta")
        .load(target_path)
        )

target_df.display()

In [0]:
target_df.count()

In [0]:
# Check the target stream delta files
files = dbutils.fs.ls(target_path)
display(files)

In [0]:
# Check the checkpoints info
dbutils.fs.ls(target_checkpoint_path)

#### Streaming with JSON format data source

In [0]:
# Stream reading JSON files with Auto Loader - cloudFiles
source_path = "s3a://datalake-eu-central-1/data/eventqueue/shop.tracking.outfit-card.click"
source_checkpoint = "s3://tracking-analytics/checkpoints/source"
target_path = "s3://tracking-analytics/stream"
target_checkpoint = "s3://tracking-analytics/checkpoints/target"

source_query= (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", source_checkpoint)
            .load(f"{source_path}/dt={yesterday}")
            .limit(5)
    )

In [0]:
# display the source data, this will trigger a streaming DataFrame
display(source_query, streamName = "json_read_stream")  

In [0]:
# Stop the reading stream currently running
for stream in spark.streams.active:   
    if stream.name == "json_read_stream":
        s = spark.streams.get(stream.id)
        s.stop()

In [0]:
# Create a Stream Function
def stream_json(source_path : str, source_checkpoint: str, target_path: str, target_checkpoint: str):
    """ Load JSON files from source path and write to Delta format """
    spark = SparkSession.builder.getOrCreate()

    source_query= (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", source_checkpoint)
            .load(f"{source_path}/")
        )

    target_query =  (
        source_query
            .writeStream
            .format("delta")
            .option("checkpointLocation", target_checkpoint)
            .option("path", target_path)
            .outputMode("append")
            .trigger(processingTime = '60 seconds')
            .partitionBy('dt')  
            .start()
        )
    
# Run the stream
stream_json(source_path, source_checkpoint, target_path, target_checkpoint)

In [0]:
# Check the target stream delta files
files = dbutils.fs.ls(target_path)
display(files)

In [0]:
# Check the data at the Target Delta Lake
target_df = (
        spark.read
        .format("delta")
        .load(target_path)
        .where(f.col("dt") == yesterday)
        .limit(5)
        )

target_df.display()