In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder.appName('StructStreaming').getOrCreate()
spark

### A full loop reading data from kafka

In [3]:
spark.readStream
  .format("kafka")
  .option("subscribe", "input")
  .load()  ### reading IN
  .groupby('value.case("string") as key')
  .agg(count("*") as "value") ### transforming
  .writeStream()
  .format("kafka")
  .option("topic", "output") ## writing OUT
  .trigger("1 minute")
  .outputMode("update")  ## trigger
  .option("checkpointLocation", "...") 
  .withWaterMark("timestamp" "2 minutes") ## check point
  .start()

# From raw data to bronze table

### read stream "text" format as STRING data type, load from "streamingPath", then create a tempView



In [None]:
spark.readStream
  .format("text")
  .schema("data STRING")
  .option("maxFilesPerTrigger", 1)  # This is used for testing to simulate 1 file arriving at a time.  Generally, don't set this in production.
  .load(streamingPath)
  .createOrReplaceTempView("recordings_raw_temp") # The path can be here or set as another option just as the kafka example

### Some transformations on the recordings_raw_temp, creating a tempView

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW recordings_bronze_temp AS (
  SELECT current_timestamp() receipt_time, "recordings" dataset, *
  FROM recordings_raw_temp
)

### write out the transformed tempView

In [None]:
spark.table("recordings_bronze_temp")
  .writeStream
  .format("delta")
  .option("checkpointLocation", bronzeCheckpoint)
  .outputMode("append")
  .start(bronzePath)  ## saving to brozePath in delta format

# From bronze to silver

### load data from bronze table

In [None]:
spark.readStream
  .format("delta")
  .load(bronzePath) # from where
  .createOrReplaceTempView("bronze_unparsed_temp")

### data transforming in SQL
Parse JSON data from source of "recordings"


In [None]:
%sql
CREATE OR REPLACE TEMP VIEW recordings_parsed_temp AS
  SELECT json.device_id device_id, json.mrn mrn, json.heartrate heartrate, json.time time 
  FROM (
    SELECT from_json(data, "device_id INTEGER, mrn LONG, heartrate DOUBLE, time DOUBLE") json
    FROM bronze_unparsed_temp
    WHERE dataset = "recordings")

### Write the parsed JSON data to silver_table_1(recordings_parsed_temp)
in delta format, also configure a checkpoint path

In [None]:
spark.table("recordings_parsed_temp")
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", recordingsParsedCheckpoint)
  .start(recordingsParsedPath) # to where

### Load another table for enrichment

In [None]:
spark
  .read
  .format("csv")
  .schema("mrn STRING, name STRING")
  .option("header", True)
  .load(f"{source_dir}/patient/patient_info.csv")
  .createOrReplaceTempView("pii")

# Or use SQL

### Load parsed data

In [None]:
spark.readStream
  .format("delta")
  .load(recordingsParsedPath)
  .createOrReplaceTempView("silver_recordings_temp")

## Enrich Join

In [None]:
%sql
CREATE OR REPLACE TEMP VIEW recordings_w_pii AS (
  SELECT device_id, a.mrn, b.name, cast(from_unixtime(time, 'yyyy-MM-dd HH:mm:ss') AS timestamp) time, heartrate
  FROM silver_recordings_temp a
  INNER JOIN pii b
  ON a.mrn = b.mrn
  WHERE heartrate > 0)

### Write out the Enriched data to silver_table_2(recordings_w_pii)

In [None]:
spark.table("recordings_w_pii")
  .writeStream
  .format("delta")
  .option("checkpointLocation", recordingsEnrichedCheckpoint)
  .outputMode("append")
  .start(recordingsEnrichedPath)

# From Silver to Gold

### Read in silver_table_2 from recordingsEnrichedPath

In [None]:
spark.readStream
  .format("delta")
  .load(recordingsEnrichedPath)
  .createOrReplaceTempView("recordings_enriched_temp")

### Transform data with aggregation 

In [None]:
%sql
CREATE OR REPLACE TEMP VIEW patient_avg AS (
  SELECT mrn, name, MEAN(heartrate) avg_heartrate, date_trunc("DD", time) date
  FROM recordings_enriched_temp
  GROUP BY mrn, name, date_trunc("DD", time))

### write the aggregation table to dailyAvgPath

In [None]:
spark.table("patient_avg")
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", dailyAvgCheckpoint)
  .trigger(once=True) # only run once then stop this stream
  .start(dailyAvgPath)

Note that we're using `.trigger(once=True)` above. This provides us the ability to continue to use the strengths of structured streaming while trigger this job as a single batch. To recap, these strengths include:
- exactly once end-to-end fault tolerant processing
- automatic detection of changes in upstream data sources

### In other words, if every pipeline showing above has configured .trigger(once = True), then this Notebook job can be triggered by scheduler once a day/hours (for batch processing)
### one use case can be streaming collecting data, batch processing data etc

## Register gold_table (patient_avg) to Hive Metastore

In [None]:
spark.sql("""
  DROP TABLE IF EXISTS daily_patient_avg
""")
spark.sql(f"""
  CREATE TABLE daily_patient_avg
  USING DELTA
  LOCATION '{dailyAvgPath}'
""")

# Now can query the saved Gold Table