
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

## Promoting to Silver

Here we'll pull together the concepts of streaming from Delta Tables, deduplication, and quality enforcement to finalize our approach to our silver table.

<img src="https://files.training.databricks.com/images/ade/ADE_arch_heartrate_silver.png" width="60%" />

## Learning Objectives
By the end of this lesson, students will be able to:
- Apply table constraints to Delta Lake tables
- Use flagging to identify records failing to meet certain conditions
- Apply de-duplication within an incremental microbatch
- Use **`MERGE`** to avoid inserting duplicate records to a Delta Lake table

## Setup

In [0]:
%run ../Includes/Classroom-Setup-4.3

Start by creating our **`heart_rate_silver`** table

In [0]:
%sql
CREATE TABLE IF NOT EXISTS heart_rate_silver
  (device_id LONG, time TIMESTAMP, heartrate DOUBLE, bpm_check STRING)
USING DELTA
LOCATION '${da.paths.user_db}/heart_rate_silver'

## Table Constraint
Add a table constraint before inserting data. Name this constraint **`dateWithinRange`** and make sure that the time is greater than January 1, 2017.

In [0]:
%sql
-- TODO
ALTER TABLE -- <FILL-IN>

Note that adding and removing constraints is recorded in the transaction log.

In [0]:
%sql
DESCRIBE HISTORY heart_rate_silver

## Define a Streaming Read and Transformation
Using the cell below we will create a streaming read that includes:
1. A filter for the topic **`bpm`**
2. Logic to flatten the JSON payload and cast data to the appropriate schema
3. A **`bpm_check`** column to flag negative records
4. A duplicate check on **`device_id`** and **`time`** with a 30 second watermark on **`time`**

In [0]:
from pyspark.sql import functions as F

json_schema = "device_id LONG, time TIMESTAMP, heartrate DOUBLE"

streaming_df = (spark.readStream
                     .table("bronze")
                     .filter("topic = 'bpm'")
                     .select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
                     .select("v.*", F.when(F.col("v.heartrate") <= 0, "Negative BPM")
                                     .otherwise("OK")
                                     .alias("bpm_check"))
                     .withWatermark("time", "30 seconds")
                     .dropDuplicates(["device_id", "time"]))

## Define Upsert Query
Below, the upsert class used in the previous notebooks is provided.

In [0]:
class Upsert:
    def __init__(self, sql_query, update_temp="stream_updates"):
        self.sql_query = sql_query
        self.update_temp = update_temp 
        
    def upsert_to_delta(self, micro_batch_df, batch):
        micro_batch_df.createOrReplaceTempView(self.update_temp)
        micro_batch_df._jdf.sparkSession().sql(self.sql_query)

Use the cell below to define the upsert query to instantiate our class. 

Alternatetively, <a href="https://docs.databricks.com/delta/delta-update.html#upsert-into-a-table-using-merge&language-python" target="_blank">consult the documentation</a> and try implementing this using the **`DeltaTable`** Python class.

In [0]:
# TODO
sql_query = """<FILL-IN>"""
 
streaming_merge=Upsert(sql_query)

## Apply Upsert and Write
Now execute a write with trigger-available-now logic to process all existing data from the bronze table.

In [0]:
def process_silver_heartrate():
    query = (streaming_df.writeStream
                         .foreachBatch(streaming_merge.upsert_to_delta)
                         .outputMode("update")
                         .option("checkpointLocation", f"{DA.paths.checkpoints}/recordings")
                         .trigger(availableNow=True)
                         .start())
    query.awaitTermination()
    
process_silver_heartrate()

We should see the same number of total records in our silver table as the deduplicated count from the lesson 2.5, and a small percentage of these will correctly be flagged with "Negative BPM".

In [0]:
new_total = spark.read.table("heart_rate_silver").count()

print(f"Lesson #5: {731987:,}")
print(f"New Total: {new_total:,}")

In [0]:
%sql
SELECT COUNT(*)
FROM heart_rate_silver
WHERE bpm_check = "Negative BPM"

Now land a new batch of data and propagate changes through bronze into the silver table.

<img src="https://files.training.databricks.com/images/icon_note_32.png"> The following two methods were recreated for us from previous lessons

In [0]:
DA.daily_stream.load() # Load a day's worth of data
DA.process_bronze()    # Execute 1 iteration of the daily to bronze stream

process_silver_heartrate()

In [0]:
end_total = spark.read.table("heart_rate_silver").count()

print(f"Lesson #5:   {731987:,}")
print(f"New Total:   {new_total:,}")
print(f"End Total: {end_total:,}")

Run the following cell to delete the tables and files associated with this lesson.

In [0]:
DA.cleanup()

&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>