# COURSE SETUP

In [0]:
dbutils.fs.rm("/hls/wfdb.sh")
dbutils.fs.put("/hls/wfdb.sh", '''#!/bin/bash
set -ex
/databricks/python/bin/python -V
. /databricks/conda/etc/profile.d/conda.sh
conda activate /databricks/python
conda install -c conda-forge -y wfdb mlflow tqdm''')

In [0]:
# Creates this database if it does not exist
# Sets the database for use in this Spark session
# Defines a path variable for the location of the Delta files to be used throughout the course
username = "martoso"
dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy_{username}")
spark.sql(f"USE dbacademy_{username}")
health_tracker = f"/dbacademy/{username}/DLRS/healthtracker/"

In [0]:
# Configure the Number of Shuffle Partitions
# Recall that for this dataset, the most appropriate number of partitions is eight.
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [0]:
# Step 1: Download the data to the driver
# First, download the data to the driver using the following shell script.

In [0]:
%sh

wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2_late.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_3.json

In [0]:
# Step 2: Verify the downloads
# Use an ls command to view the files that have been downloaded. You should see three json files.

In [0]:
%sh
ls

In [0]:
# Step 3: Move the data to the raw directory
# Move the data you have downloaded into the raw directory. 
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json", 
              health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2.json", 
              health_tracker + "raw/health_tracker_data_2020_2.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2_late.json", 
              health_tracker + "raw/health_tracker_data_2020_2_late.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_3.json", 
              health_tracker + "raw/health_tracker_data_2020_3.json")

In [0]:
# Step 4: Load the data
# Load the data as a Spark DataFrame from the raw directory. This is done using the .format("json") option.  
file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
 
health_tracker_data_2020_1_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
health_tracker_data_2020_1_df.toPandas()

Unnamed: 0,device_id,heartrate,name,time
0,0,52.813907,Deborah Powell,1.577837e+09
1,0,53.907890,Deborah Powell,1.577840e+09
2,0,52.712959,Deborah Powell,1.577844e+09
3,0,52.288042,Deborah Powell,1.577848e+09
4,0,52.515610,Deborah Powell,1.577851e+09
...,...,...,...,...
3715,4,99.192641,James Hou,1.580497e+09
3716,4,100.414030,James Hou,1.580501e+09
3717,4,60.458992,James Hou,1.580504e+09
3718,4,59.998505,James Hou,1.580508e+09


In [0]:
# Step 1: Display the data
# Strictly speaking, this is not part of the ETL process, but displaying the data gives us a look at the data that we are working with. 
display(health_tracker_data_2020_1_df)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


---

# CREATE A PARQUET TABLE

In [0]:
# Step 1: Remove files in the /dbacademy/DLRS/healthtracker/processed directory
dbutils.fs.rm(health_tracker + "processed", recurse=True)

In [0]:
# Step 2: Transform the data 
# We will perform data engineering on the data with the following transformations:
#     Use the from_unixtime Spark SQL function to transform the unix timestamp into a time string
#     Cast the time column to type timestamp to replace the column time
#     Cast the time column to type date to create the column dte
#     Select the columns in the order in which we would like them to be written
from pyspark.sql.functions import col, from_unixtime
 
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

In [0]:
processedDF.toPandas()

Unnamed: 0,dte,time,heartrate,name,p_device_id
0,2020-01-01,2020-01-01 00:00:00,52.813907,Deborah Powell,0
1,2020-01-01,2020-01-01 01:00:00,53.907890,Deborah Powell,0
2,2020-01-01,2020-01-01 02:00:00,52.712959,Deborah Powell,0
3,2020-01-01,2020-01-01 03:00:00,52.288042,Deborah Powell,0
4,2020-01-01,2020-01-01 04:00:00,52.515610,Deborah Powell,0
...,...,...,...,...,...
3715,2020-01-31,2020-01-31 19:00:00,99.192641,James Hou,4
3716,2020-01-31,2020-01-31 20:00:00,100.414030,James Hou,4
3717,2020-01-31,2020-01-31 21:00:00,60.458992,James Hou,4
3718,2020-01-31,2020-01-31 22:00:00,59.998505,James Hou,4


In [0]:
# Step 3: Write the Files to the processed directory
# Note that we are partitioning the data by device id.
(
  processedDF.write
  .mode("overwrite")
  .format("parquet")
  .partitionBy("p_device_id")
  .save(health_tracker + "processed")
)

In [0]:
# Step 4: Register the table in the metastore
# Next, use Spark SQL to register the table in the metastore. We specify the table format as parquet and we refer to the location where we wrote the parquet files.

In [0]:
%sql 

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed                        
USING PARQUET                
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"

In [0]:
# Step 5: Verify and repair the Parquet-based Data Lake table
# Step 5a: Count the records in the health_tracker_processed table
# Per best practice, we have created a partitioned table. However, if you create a partitioned table from existing data, Spark SQL does not automatically discover the partitions and register them in the Metastore. 
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [0]:
# Step 5b: Register the partitions
# To register the partitions, run the following to generate the partitions.

In [0]:
%sql

MSCK REPAIR TABLE health_tracker_processed

In [0]:
# Step 5c: Count the records in the health_tracker_processed table
# Count the records in the health_tracker_processed table. With the table repaired and the partitions registered, we now have results. We expect there to be 3720 records: five device measurements, 24 hours a day for 31 days.
health_tracker_processed.count()

---

# DELTA TABLE FUNDAMENTALS

In [0]:
# Step 1: Describe the health_tracker_processed table
# Before we convert the health_tracker_processed table, let's use the DESCRIBE DETAIL Spark SQL command to display the attributes of the table.

In [0]:
%sql

DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
PARQUET,,dbacademy_martoso.health_tracker_processed,,dbfs:/dbacademy/martoso/DLRS/healthtracker/processed,2022-02-22T18:33:42.000+0000,,List(p_device_id),,,Map(),,


---

# CONVERT A PARQUET TABLE TO A DELTA TABLE

In [0]:
# Step 1: Convert the files to Delta files
# First, we'll convert the files in place to Delta files. The conversion creates a Delta Lake transaction log that tracks the files. Now, the directory is a directory of Delta files.
from delta.tables import DeltaTable

parquet_table = f"parquet.`{health_tracker}processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

In [0]:
# Step 2: Register the Delta table
# At this point, the files containing our records have been converted to Delta files. The Metastore, however, has not been updated to reflect the change. To change this we re-register the table in the Metastore. The Spark SQL command will automatically infer the data schema by reading the footers of the Delta files. 

In [0]:
%sql

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"

In [0]:
# Step 3: Describe the health_tracker_processed table
# We can verify the conversion of the Parquet-based data lake table to a Delta table using the DESCRIBE Spark SQL command. Note that the format of the table is Delta.

In [0]:
%sql

DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,dc2f31ce-a137-4dbd-9820-76a2742658c5,dbacademy_martoso.health_tracker_processed,,dbfs:/dbacademy/martoso/DLRS/healthtracker/processed,2022-02-22T18:40:18.955+0000,2022-02-22T18:40:21.000+0000,List(p_device_id),5,57278,Map(),1,2


In [0]:
# Step 4: Count the records in the health_tracker_processed table
# We count the records in the health_tracker_processed table with Apache Spark. 
# With Delta Lake, the Delta table is immediately ready for use. The transaction log stored with the Delta files contains all metadata needed for an immediate query.
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

---

# CREATE A NEW DELTA TABLE

In [0]:
# Step 1: Remove files in the health_tracker_user_analytics directory
dbutils.fs.rm(health_tracker + "gold/health_tracker_user_analytics", recurse=True)

In [0]:
# Step 2: Create an aggregate DataFrame
# The subquery used to define the table aggregates the health_tracker_processed Delta table by device, and computes summary statistics.

In [0]:
from pyspark.sql.functions import col, avg, max, stddev

health_tracker_gold_user_analytics = (
  health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
)

In [0]:
# Write the Delta files
(
  health_tracker_gold_user_analytics.write
  .format("delta")
  .mode("overwrite")
  .save(health_tracker + "gold/health_tracker_user_analytics")
)

In [0]:
# Step 4: Register the Delta table in the Metastore
# Finally, register this table in the Metastore. 

In [0]:
%sql

DROP TABLE IF EXISTS health_tracker_gold_user_analytics;

CREATE TABLE health_tracker_gold_user_analytics
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/gold/health_tracker_user_analytics"

In [0]:
# Prepare a dashboard using the health_tracker_user_analytics table
# The health_tracker_user_analytics table could be used to define a dashboard. The query used to create the table could be issued against the health_tracker_processed table nightly to prepare the dashboard for the following business day.
# Here we use Databricks’ built-in ability to create visualizations to create a basic dashboard.
display(spark.read.table("health_tracker_gold_user_analytics"))

p_device_id,avg_heartrate,max_heartrate,stddev_heartrate
0,81.21484441523789,186.4790827731,31.343789198032887
2,79.99574196662837,184.7433209566,31.408007741222
3,82.65419819635204,171.8435388833,30.92932874000444
1,78.5776567337699,168.114687819,31.61967903784856
4,83.08377376550952,173.5770785921,34.16032267669617


---

# BATCH WRITE TO DELTA TABLES

In [0]:
# Step 1: Load the next month of data
# Here, we append the next month of records. We begin by loading the data from the file health_tracker_data_2020_2.json, using the .format("json") option as before.
file_path = health_tracker + "raw/health_tracker_data_2020_2.json"
 
health_tracker_data_2020_2_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
# tep 2: Transform the data
# We perform the same data engineering on the data:
#     use the from_unixtime Spark SQL function to transform the unixtime into a time string
#     cast the time column to type timestamp to replace the column time
#     cast the time column to type date to create the column dte
# This is done using the process_health_tracker_data function we defined previously.
processedDF = process_health_tracker_data(health_tracker_data_2020_2_df)

In [0]:
# Step 3: Append the data to the health_tracker_processed Delta table
# We do this using .mode("append"). Note that it is not necessary to perform any action on the Metastore.
(
  processedDF.write
  .mode("append")
  .format("delta")
  .save(health_tracker + "processed")
)

In [0]:
# View the commit using Time Travel
# Delta Lake can query an earlier version of a Delta table using a feature known as Time Travel. Here, we query the data as of version 0, that is, the initial conversion of the table from Parquet.
# Step 1: View the table as of version 0
# This is done by specifying the option "versionAsOf" as 0. When we time travel to Version 0, we see only the first month of data (five device measurements, 24 hours a day, for 31 days).
(
  spark.read
  .option("versionAsOf", 0)
  .format("delta")
  .load(health_tracker + "processed")
  .count()
)

In [0]:
# Step 2: Count the most recent version
# When we query the table without specifying a version, it shows the latest version of the table and includes the new records added.
# When we look at the current version, we expect to see two months of data, five device measurements, 24 hours a day for (31 + 29) days, or 7200 records. (The data was recorded during the month of February in a leap year, which is why there are 29 days in the month.)
health_tracker_processed.count()

---

# LATE-ARRIVING DATA

In [0]:
# Step 1: Count the number of records per device
# Let’s run a query to count the number of records per device. 
from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id")
  .agg(count("*"))
)

p_device_id,count(1)
1,1440
3,1440
2,1440
4,1368
0,1440


In [0]:
# Step 2: Plot the missing records
# Let’s run a query to discover the timing of the missing records. We use a Databricks visualization to display the number of records per day. It appears that we have no records for device 4 for the last few days of the month.
display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,55.2272036665,Minh Nguyen,3
2020-01-01,2020-01-01T01:00:00.000+0000,56.035689123,Minh Nguyen,3
2020-01-01,2020-01-01T02:00:00.000+0000,55.6403282219,Minh Nguyen,3
2020-01-01,2020-01-01T03:00:00.000+0000,56.3692513843,Minh Nguyen,3
2020-01-01,2020-01-01T04:00:00.000+0000,56.5412281859,Minh Nguyen,3
2020-01-01,2020-01-01T05:00:00.000+0000,55.8311481148,Minh Nguyen,3
2020-01-01,2020-01-01T06:00:00.000+0000,54.9402513831,Minh Nguyen,3
2020-01-01,2020-01-01T07:00:00.000+0000,92.2205431894,Minh Nguyen,3
2020-01-01,2020-01-01T08:00:00.000+0000,93.8159033652,Minh Nguyen,3
2020-01-01,2020-01-01T09:00:00.000+0000,92.0210547557,Minh Nguyen,3


---

# BROKEN READINGS IN THE TABLE

In [0]:
# Step 1: Create temporary view for broken readings
# First, we create a temporary view for the Broken Readings in the health_tracker_processed table.
broken_readings = (
  health_tracker_processed
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)
 
broken_readings.createOrReplaceTempView("broken_readings")

In [0]:
# Step 2: Display broken_readings
# Display the records in the broken_readings view, again using a Databricks visualization. 
# Note that most days have at least one broken reading and that some have more than one. 

In [0]:
%sql

SELECT * FROM broken_readings

dte,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


In [0]:
# Step 3: Sum the broken readings
# Next, we sum the records in the view. 

In [0]:
%sql
 
SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
60


---

# REPAIR RECORDS WITH AN UPSERT

In [0]:
# Step 1: Create a DataFrame interpolating broken values
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead
 
dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
 
interpolatedDF = (
  spark.read
  .table("health_tracker_processed")
  .select(
    col("dte"),
    col("time"),
    col("heartrate"),
    lag(col("heartrate")).over(dteWindow).alias("prev_amt"),
    lead(col("heartrate")).over(dteWindow).alias("next_amt"),
    col("name"),
    col("p_device_id"))
)

In [0]:
# Step 2: Create a DataFrame of updates
updatesDF = (
  interpolatedDF
  .where(col("heartrate") < 0)
  .select(col("dte"),
          col("time"),
          ((col("prev_amt") + col("next_amt"))/2).alias("heartrate"),
          col("name"),
          col("p_device_id"))
)

In [0]:
# Step 3: View the schemas of the updatesDF and health_tracker_processed table
# We use the .printSchema() function to view the schema of the health_tracker_processed table.
health_tracker_processed.printSchema()
updatesDF.printSchema()

In [0]:
# Step 4: Verify UpdatesDF
# Perform a .count() on the updatesDF view. It should have the same number of records as the SUM performed on the broken_readings view.
updatesDF.count()

---

# PREPARE INSERTS DATAFRAME

In [0]:
# Step 1: Load the late-arriving data
file_path = health_tracker + "raw/health_tracker_data_2020_2_late.json"
 
health_tracker_data_2020_2_late_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
# Step 2: Transform the data
# In addition to updating the broken records, we wish to add this late-arriving data. We begin by preparing another temporary view with the appropriate transformations:
#     Use the from_unixtime Spark SQL function to transform the unixtime into a time string
#     Cast the time column to type timestamp to replace the column time
#     Cast the time column to type date to create the column dte

insertsDF = process_health_tracker_data(health_tracker_data_2020_2_late_df)

In [0]:
# Step 3: View the schema of the inserts DataFrame
insertsDF.printSchema()

---

# PREPARE UPSERTS DATAFRAME

In [0]:
# Step 1: Create the union DataFrame
# Finally, we prepare the upsertsDF that consists of all the records in both the updatesDF and the insertsDF. We use the DataFrame .union() command to create the view.
upsertsDF = updatesDF.union(insertsDF)

In [0]:
# Step 2: View the schema
upsertsDF.printSchema()

---

# PERFORM UPSERT INTO healt_tracker_processed TABLE

In [0]:
# Step 1: Perform the Upsert
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""

update = { "heartrate" : "upserts.heartrate" }

insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}

(
  processedDeltaTable.alias("health_tracker")
  .merge(upsertsDF.alias("upserts"), update_match)
  .whenMatchedUpdate(set=update)
  .whenNotMatchedInsert(values=insert)
  .execute()
)

---

# VIEW THE COMMIT USING TIME TRAVEL

In [0]:
# Step 1: View the table as of version 1
# This is done by specifying the option "versionAsOf" as 1. When we time travel to Version 0, we see only the first month of data.
# When we time travel to Version 1, we see the first two months of data, minus the 72 missing records.
(
  spark.read
  .option("versionAsOf", 1)
  .format("delta")
  .load(health_tracker + "processed")
  .count()
)

In [0]:
# Step 2: Count the most recent version
health_tracker_processed.count()

In [0]:
# Step 3: Describe the history of the health_tracker_processed table
# The .history() Delta table command provides provenance information, including the operation, user, and so on, for each action performed on a table. 
# Note that each operation performed on the table is given a version number. These are the numbers we have been using when performing a time travel query on the table, e.g., SELECT COUNT(*) FROM health_tracker_processed VERSION AS OF 1.
display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
2,2022-02-22T19:04:03.000+0000,2848635821716678,ext.oracy.martos@farfetch.com,MERGE,"Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`)), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(3070889731298303),1019-111805-zings40,1,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 5, numTargetRowsInserted -> 72, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numSourceRows -> 132, numTargetFilesRemoved -> 10)",
1,2022-02-22T18:49:29.000+0000,2848635821716678,ext.oracy.martos@farfetch.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3070889731298303),1019-111805-zings40,0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 53057, numOutputRows -> 3408)",
0,2022-02-22T18:40:21.000+0000,2848635821716678,ext.oracy.martos@farfetch.com,CONVERT,"Map(numFiles -> 5, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(3070889731298303),1019-111805-zings40,-1,Serializable,False,Map(numConvertedFiles -> 5),


---

# PERFORM A SECOND UPSERT

In [0]:
# Step 1: Sum the broken readings
# Let’s sum the records in the broken_readings view once more. Note that there are still broken readings in the table. This is because many of the records inserted as part of the upsert also contained broken readings. 

In [0]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
1


In [0]:
# Step 2: Verify that these are new broken readings
# Let’s query the broken_readings with a WHERE clause to verify that these are indeed new broken readings introduced by inserting the late-arriving data. 
# Note that there are no broken readings before ‘2020-02-25’.

In [0]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings WHERE dte < '2020-02-25'

sum(count(heartrate))
""


In [0]:
# Step 3: Verify updates
# Perform a .count() on the updatesDF view. 
# Note that it is not necessary to redefine the DataFrame. Recall that a Spark DataFrame is lazily defined, pulling the correct number of updates when an action is triggered.
# It should have the same number of records as the SUM performed on the broken_readings view.
updatesDF.count()

In [0]:
# Step 4: Perform Upsert into the health_tracker_processed table
# Once more, we upsert into the health_tracker_processed Table using the DeltaTable command .merge().

upsertsDF = updatesDF

(
  processedDeltaTable.alias("health_tracker")
  .merge(upsertsDF.alias("upserts"), update_match)
  .whenMatchedUpdate(set=update)
  .whenNotMatchedInsert(values=insert)
  .execute()
)

In [0]:
# Step 5: Sum the broken readings
# Let’s sum the records in the broken_readings view one last time. Finally, there are no more broken readings in the table. %sql

SELECT SUM(`count(heartrate)`) FROM broken_readings

In [0]:
%sql

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
""


---

# APPENDING FILES TO AN EXISTING DELTA TABLE

In [0]:
# Step 1: Load the next month of data
# We begin by loading the data from the file health_tracker_data_2020_3.json, using the .format("json") option as before.

file_path = health_tracker + "raw/health_tracker_data_2020_3.json"

health_tracker_data_2020_3_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [0]:
# Step 2: Transform the data
# We perform the same data engineering on the data:
#     Use the from_unixtime Spark SQL function to transform the unix timestamp into a time string
#     Cast the time column to type timestamp to replace the column time
#     Cast the time column to type date to create the column dte
# Note that we redefine the function process_health_tracker_data to accommodate the new schema.
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "device_type", "heartrate", "name", "p_device_id")
  )

In [0]:
processedDF = process_health_tracker_data(health_tracker_data_2020_3_df)

In [0]:
health_tracker_processed.toPandas()

Unnamed: 0,dte,time,heartrate,name,p_device_id
0,2020-01-01,2020-01-01 00:00:00,47.537856,Kristin Vasser,1
1,2020-01-01,2020-01-01 01:00:00,48.349697,Kristin Vasser,1
2,2020-01-01,2020-01-01 02:00:00,49.121203,Kristin Vasser,1
3,2020-01-01,2020-01-01 03:00:00,47.998280,Kristin Vasser,1
4,2020-01-01,2020-01-01 04:00:00,47.841083,Kristin Vasser,1
...,...,...,...,...,...
7195,2020-02-29,2020-02-29 19:00:00,92.008182,Sam Knopp,2
7196,2020-02-29,2020-02-29 20:00:00,91.025895,Sam Knopp,2
7197,2020-02-29,2020-02-29 21:00:00,94.141270,Sam Knopp,2
7198,2020-02-29,2020-02-29 22:00:00,90.246392,Sam Knopp,2


In [0]:
health_tracker + "processed"

In [0]:
# Step 3: Append the data to the health_tracker_processed Delta table
# We do this using .mode("append"). 
(
  processedDF.write
  .mode("append")
  .format("delta")
  .save(health_tracker + "processed")
)

---

# APPENDING FILES TO AN EXISTING DELTA TABLE WITH SCHEMA EVOLUTION

In [0]:
# Step 1: Append the data with schema evolution to the health_tracker_processed Delta table
# We do this using .mode("append"). 
(
  processedDF.write
  .mode("append")
  .option("mergeSchema", True)
  .format("delta")
  .save(health_tracker + "processed")
)

---

# VERIFY THE COMMIT

In [0]:
# Step 1: Append the data with schema evolution to the health_tracker_processed Delta table
# We do this using .mode("append"). 
health_tracker_processed.count()

---

# DELETE DATA AND RECOVER LOST DATA

In [0]:
# Step 1: Delete all records for device 4
# Here, we use the DELETE Spark SQL command to remove all records from the health_tracker_processed table that match the given predicate.
processedDeltaTable.delete("p_device_id = 4")

---

# RECOVER LOST DATA

In [0]:
# Step 1: Prepare new Upserts view
# We prepare a view for upserting using Time Travel to recover the missing records. 
# Note that we have replaced the entire name column with the value NULL.
from pyspark.sql.functions import lit

upsertsDF = (
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time", "heartrate", lit(None).alias("name"), "p_device_id")
)

In [0]:
# Step 2: Perform Upsert into the health_tracker_processed table
# Once more, we upsert into the health_tracker_processed table using the  Delta table command .merge().
# Note that it is necessary to define 1) the reference to the Delta table and 2) the insert logic because the schema has changed.

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

insert = {
  "dte": "upserts.dte",
  "time": "upserts.time",
#   "device_type": "upserts.device_type",
  "heartrate": "upserts.heartrate",
  "name": "upserts.name",
  "p_device_id": "upserts.p_device_id",
}

(
  processedDeltaTable.alias("health_tracker")
  .merge(upsertsDF.alias("upserts"), update_match)
  .whenMatchedUpdate(set=update)
  .whenNotMatchedInsert(values=insert)
  .execute()
)

In [0]:
# Step 3: Count the most recent version
# When we look at the current version, we expect to see three months of data, five device measurements, 24 hours a day for (31 + 29 + 31) days, or 10920 records.
health_tracker_processed.count()

In [0]:
# Step 4: Query device 4 to demonstrate compliance
# We query the health_tracker_processed table to demonstrate that the name associated with device 4 has indeed been removed.
display(health_tracker_processed.where("p_device_id = 4"))

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,60.7236962271,,4
2020-01-01,2020-01-01T01:00:00.000+0000,59.7518357438,,4
2020-01-01,2020-01-01T02:00:00.000+0000,59.7552762926,,4
2020-01-01,2020-01-01T03:00:00.000+0000,61.8018342845,,4
2020-01-01,2020-01-01T04:00:00.000+0000,60.3112488045,,4
2020-01-01,2020-01-01T05:00:00.000+0000,60.0099058887,,4
2020-01-01,2020-01-01T06:00:00.000+0000,59.8323375338,,4
2020-01-01,2020-01-01T07:00:00.000+0000,59.9795666159,,4
2020-01-01,2020-01-01T08:00:00.000+0000,100.6013295271,,4
2020-01-01,2020-01-01T09:00:00.000+0000,100.1857471896,,4


---

# MAINTAIN COMPLIANCE WITH A VACUUM OPERATION

In [0]:
# Step 1: Query an earlier table version 
# We query the health_tracker_processed table against an earlier version to demonstrate that it is still possible to retrieve the name associated with device 4.
display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,60.7236962271,James Hou,4
2020-01-01,2020-01-01T01:00:00.000+0000,59.7518357438,James Hou,4
2020-01-01,2020-01-01T02:00:00.000+0000,59.7552762926,James Hou,4
2020-01-01,2020-01-01T03:00:00.000+0000,61.8018342845,James Hou,4
2020-01-01,2020-01-01T04:00:00.000+0000,60.3112488045,James Hou,4
2020-01-01,2020-01-01T05:00:00.000+0000,60.0099058887,James Hou,4
2020-01-01,2020-01-01T06:00:00.000+0000,59.8323375338,James Hou,4
2020-01-01,2020-01-01T07:00:00.000+0000,59.9795666159,James Hou,4
2020-01-01,2020-01-01T08:00:00.000+0000,100.6013295271,James Hou,4
2020-01-01,2020-01-01T09:00:00.000+0000,100.1857471896,James Hou,4


In [0]:
# Step 2: Vacuum table to remove old files
# The VACUUM Spark SQL command can be used to solve this problem. The VACUUM command recursively vacuums directories associated with the Delta table and removes files that are no longer in the latest state of the transaction log for that table and that are older than a retention threshold. The default threshold is 7 days. 
processedDeltaTable.vacuum(0)

In [0]:
# Step 3: Set Delta to allow the operation
# To demonstrate the VACUUM command, we set our retention period to 0 hours to be able to remove the questionable files now. This is typically not a best practice and in fact, there are safeguards in place to prevent this operation from being performed.
# For demonstration purposes, we will set Delta to allow this operation.
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

In [0]:
# Step 4: Vacuum table to remove old files
processedDeltaTable.vacuum(0)

In [0]:
# Step 5: Attempt to query an earlier version
# Now when we attempt to query an earlier version, an error is thrown.
# This error indicates that we are not able to query data from this earlier version because the files have been expunged from the system.
display(
  spark.read
  .option("versionAsOf", 4)
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

Congratulations!

You have completed Delta Lake Rapid Start with Python.

At this point, we invite you to think about the work we have done and how it relates to the full IoT data ingestion pipeline we have been designing. 

In this course, we used Spark SQL and Delta Lake to create a single source of truth in our EDSS: the health_tracker_processed Delta table. 
1_OLAP_EDSS.png

We did this through the following steps:

    We converted an existing Parquet-based data lake table to a Delta table, health_tracker_processed.

    We performed a batch upload of new data to this table.

    We used Apache Spark to identify broken and missing records in this table.

    We used Delta Lake’s upsert functionality, where we updated broken records and inserted missing records.

    We evolved the schema of the Delta table.

    We used Delta Lake’s Time Travel feature to scrub the personal data of a user intelligently.

Additionally, we used Delta Lake to create an aggregate table, health_tracker_user_analytics, downstream from the health_tracker_processed table.