#Panda Databricks Notebook: Structured Streaming Workshop

#Part 0: Workshop functions

In [0]:
import os

# Define a subfolder for the structured streaming work
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
notebook_dir = "/".join(notebook_path.split("/")[:-1])
base_path = f"/Workspace{notebook_dir}"

data_path = f"{base_path}/data"
checkpoint_path = f"{data_path}/checkpoints"
input_data_json_path = f"{data_path}/input_data.json"

os.makedirs(data_path, exist_ok=True)
os.makedirs(checkpoint_path, exist_ok=True)

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp, sha1

# Data contents and Delivery
initial_data_delivery = (
    [("Software Lizenzen", "2023-01-01", 100), 
     ("Betriebskosten", "2023-01-02", 200), 
     ("Personalkosten", "2023-01-03", 300)
    ], ["name", "datum", "wert"]
)

follow_up_data_delivery = (
    [("Zusatzkosten", "2023-01-04", 400),
     ("Zusaetzliche Zusatzkosten", "2023-01-05", 500)
    ], ["name", "datum", "wert"]
)

def write_data_as_json(data_delivery, input_data_json_path):
    df = spark.createDataFrame(data_delivery[0], schema=data_delivery[1])
    df.write.mode("overwrite").json(f"{input_data_json_path}")
    print(f"Data written to {input_data_json_path}")

def append_data_as_json(data_delivery, input_data_json_path):
    df = spark.createDataFrame(data_delivery[0], schema=data_delivery[1])
    df.write.mode("append").json(f"{input_data_json_path}")
    print(f"Data appended to {input_data_json_path}")

#Transformation function
def panda_ws_transform(df):
    return (df
          .withColumn("steuer", df.wert * 0.1)
          .withColumn("datum", to_timestamp(df.datum, "yyyy-MM-dd"))
          .withColumn("hash_key", sha1(df.name))
          )


# Part 1: Basic Write

In [0]:
write_data_as_json(initial_data_delivery, input_data_json_path)
output_table_path = f"{data_path}/simple_table_write"
dbutils.fs.rm(output_table_path, recurse=True)

# Step 1: Read data
input_df = spark.read.format("delta").json(input_data_json_path)
print("Incoming data:")
input_df.show()

# Step 2: Transform
transformed_df = panda_ws_transform(input_df)

# Step 3: Write Output
transformed_df.write.mode("append").parquet(output_table_path)

print(f"Data written to {output_table_path}")
display(spark.read.parquet(output_table_path))

#Part 2: Basic ETL

In [0]:
write_data_as_json(initial_data_delivery, input_data_json_path)
output_table_path = f"{data_path}/simple_table_write"
dbutils.fs.rm(output_table_path, recurse=True)

for write_iteration in ["First write", "Second write", "Third Update"]:
    print(f"\n{write_iteration}:")
    # Step 1: Read data
    input_df = spark.read.format("delta").json(input_data_json_path)
    print("Incoming data:")
    input_df.show()

    # Step 2: Transform
    transformed_df = panda_ws_transform(input_df)

    # Step 3: Write Output
    transformed_df.write.mode("append").parquet(output_table_path)

    print(f"Data written to {output_table_path}")
    display(spark.read.parquet(output_table_path))

#Part 3: Basic Merge Into

In [0]:
from delta.tables import DeltaTable

#Write functions
def ensure_table_exists(df, path):
    try:
        dbutils.fs.ls(path)
    except Exception:
        print("table does not exist, creating empty table")
        spark.createDataFrame([], df.schema).write.format("delta").mode("overwrite").save(path)
        
def panda_ws_write(df, batch_id, target_table_path, throwError = 0):
    key_column = "hash_key"
    merge_condition = f"t.{key_column} = s.{key_column}"
    delta_table = DeltaTable.forPath(spark, target_table_path)
    (
        delta_table.alias("t")
        .merge(df.alias("s"), merge_condition)
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

    # Artificial error for test
    if throwError == 1:
        raise Exception("Artificial error for testing error handling in foreachBatch.")


In [0]:
write_data_as_json(initial_data_delivery, input_data_json_path)
output_table_path = f"{data_path}/simple_merge_write"
dbutils.fs.rm(output_table_path, recurse = True)

for write_iteration in ["First write", "Second write", "Third Update"]:
    print(f"\n{write_iteration}:")
    # Step 1: Read data as table
    input_df = spark.read.json(input_data_json_path)
    print("incoming data:")
    input_df.show()

    # Step 2: Transform
    transformed_df = panda_ws_transform(input_df)

    # Step 3: Write Output
    ensure_table_exists(transformed_df, output_table_path)
    panda_ws_write(transformed_df, 0, output_table_path)
    print(f"data written to {output_table_path}")
    display(spark.read.format('delta').load(output_table_path))

    # Step 4: Repeat write, update source at penultimate iteration
    if write_iteration == "Second write":
        append_data_as_json(follow_up_data_delivery, input_data_json_path)

# Part 4: Structured Streaming ETL

In [0]:
def delete_streaming_table(output_table_path, output_checkpoint_path):
    dbutils.fs.rm(output_table_path, recurse = True)
    dbutils.fs.rm(output_checkpoint_path, recurse = True)

def get_schema_from_json(input_data_path):
    return spark.read.json(input_data_json_path).schema

In [0]:
write_data_as_json(initial_data_delivery, input_data_json_path)
output_table_path = f"{data_path}/merge_stream"
output_checkpoint_path = output_table_path.replace(data_path, checkpoint_path)
delete_streaming_table(output_table_path, output_checkpoint_path)

for write_iteration in ["First write", "Second write", "Third Update"]:
    print(f"\n{write_iteration}:")

    # Step 1: Stream Read
    streaming_df = spark.readStream.schema(get_schema_from_json(input_data_json_path)).json(input_data_json_path)
    #compare with input_df = spark.read.json(input_data_json_path)

    # Step 2: Stream Transform
    transformed_df = panda_ws_transform(streaming_df)

    # # Any attempts to reference source data before writestream.start() throws an error:
    # transformed_df.show()

    # Step 3: Stream Write with foreachBatch
    ensure_table_exists(transformed_df, output_table_path)
    write_operation = (
        transformed_df
        .writeStream
        .queryName("panda_stream_demo")
        .option("checkpointLocation", output_checkpoint_path)
        .trigger(availableNow=True)
        .foreachBatch(lambda df, batch_id: panda_ws_write(df, batch_id, output_table_path)
        ).start().awaitTermination()
    )

    print(f"data written to {output_table_path}")
    display(spark.read.format('delta').load(output_table_path))

    # Step 4: Repeat write, update source at penultimate iteration
    if write_iteration == "Second write":
        append_data_as_json(follow_up_data_delivery, input_data_json_path)

#Part 5: Error handling

In [0]:
output_table_path = f"{data_path}/merge_stream"
output_checkpoint_path = output_table_path.replace(data_path, checkpoint_path)
delete_streaming_table(output_table_path, output_checkpoint_path) #consider not initializing the streaming table and then running the code, just to check

# Step 1: Stream Read
streaming_df = spark.readStream.schema(get_schema_from_json(input_data_json_path)).json(input_data_json_path)

# Step 2: Stream Transform
transformed_df = panda_ws_transform(streaming_df)

# Step 3: Stream Write with foreachBatch
ensure_table_exists(transformed_df, output_table_path)
write_operation = (
    transformed_df
    .writeStream
    .queryName("panda_stream_demo")
    .option("checkpointLocation", output_checkpoint_path)
    .trigger(availableNow=True)
    .foreachBatch(lambda df, batch_id: panda_ws_write(df, batch_id, output_table_path, throwError = 1) #throwError has/should be set to 1, to examine error handling
    ).start().awaitTermination()
)
display(spark.read.format('delta').load(output_table_path))