In [1]:
###########################################           Lesson 1 of 15           ######################################################
###########################################    Build an EDSS with Delta Lake   ###################################################### 


# In the Databricks Academy course Fundamentals of Delta Lake, we discussed Enterprise Decision Support Systems (EDSS) and their use in Online Analytics Processing (OLAP). In particular, we reviewed how using Delta Lake technology can help build robust Cloud Data Platforms and single sources of truth to help organizations make intelligent, data-driven business decisions. 

# In this course, we go from theory to practice.

# An Internet of Things Data Ingestion Pipeline

# In this course, we will use Apache Spark and Delta Lake to:

#               1.Ingest data
#               2.Create a table that will serve as a single source of truth
#               3.Build a downstream aggregate table on this single source of truth
#               4.Use Delta Lake to perform operations  on the table to make our data more robust

# The domain of this course is the Internet of Things (IoT). In particular, we will be using simulated health tracker data passing measurements of a user’s heart rate once an hour. In the real world, such a pipeline might appear as follows:

In [2]:
###########################################           Lesson 2 of 15           ######################################################
###########################################            Course Setup            ###################################################### 

# During this course, we'll write a series of commands using Apache Spark. In this lesson, we will guide you through working with a notebook, including:

#             1.Configuring your notebook environment 
#             2.Loading and exploring data
#             3.Visualizing data

In [3]:
# Configure Apache Spark
# First, we will need to perform a few configuration operations on the Apache Spark session to get optimal performance. These will include:
# Specifying a database in which to work
# Configuring the number of shuffle partitions to use
# We specify a database to keep the default database clean and to provide more organization in a shared workspace.

# Step 1: Specify the database

# For this course, we use the database dbacademy. This cell:
# Creates this database if it does not exist
# Sets the database for use in this Spark session
# Defines a path variable for the location of the Delta files to be used throughout the course
# For this dataset, the most appropriate number of partitions is eight.


username = "mankarm"
dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy_{username}")
spark.sql(f"USE dbacademy_{username}")
health_tracker = f"/dbacademy/{username}/DLRS/healthtracker/"

In [4]:
# Step 2: Configure the Number of Shuffle Partitions
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [5]:
#############################     Reviewing and loading data    #############################
#############################    Review health tracker data     #############################

# One common use case for working with Delta Lake is to collect and process Internet of Things (IoT) Data. Here, we provide a mock IoT sensor dataset for demonstration purposes. The data simulates heart rate data measured by a health tracker device.

# In a typical system, high flux event data will be delivered to the system via a stream processing server like Apache Kafka. For educational purposes, we have made this data available for download from static files. The next few commands are used to download the data into our system and are intended to simulate the arrival of high flux event data.

# Here, we will simulate the streaming of data that is normally done by a stream processing platform like Apache Kafka by accessing files from the raw directory.

# These files are multi-line JSON files and resemble the strings passed by Kafka. A multi-line JSON file is one in which each line is a complete JSON object, but the entire file itself is not a valid JSON file. 

# Each file consists of five users whose heart rate is measured each hour, 24 hours a day, every day.

# Here is a sample of the data we will be using. Each line is a string representing a valid JSON object and is similar to the kind of string that would be passed by a Kafka stream processing server.



# Health tracker data sample
# {"device_id":0,"heartrate":52.8139067501,"name":"Deborah Powell","time":1.5778368E9}
# {"device_id":0,"heartrate":53.9078900098,"name":"Deborah Powell","time":1.5778404E9}
# {"device_id":0,"heartrate":52.7129593616,"name":"Deborah Powell","time":1.577844E9}
# {"device_id":0,"heartrate":52.2880422685,"name":"Deborah Powell","time":1.5778476E9}
# {"device_id":0,"heartrate":52.5156095386,"name":"Deborah Powell","time":1.5778512E9}
# {"device_id":0,"heartrate":53.6280743846,"name":"Deborah Powell","time":1.5778548E9}

In [6]:
# Health tracker data schema

# name: string
# heartrate: double
# device_id: long
# time: long

In [7]:
# Step 1: Download the data to the driver using the following shell script.

In [8]:
%sh

wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2_late.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_3.json

In [9]:
# Step 2: Verify the downloads
# Use an ls command to view the files that have been downloaded. You should see three json files

In [10]:
%sh ls

In [11]:
# Step 3: Move the data to the raw directory
# Move the data you have downloaded into the raw directory.

dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json", 
              health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2.json", 
              health_tracker + "raw/health_tracker_data_2020_2.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2_late.json", 
              health_tracker + "raw/health_tracker_data_2020_2_late.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_3.json", 
              health_tracker + "raw/health_tracker_data_2020_3.json")

In [12]:
# Step 4: Load the data
# Load the data as a Spark DataFrame from the raw directory. This is done using the .format("json") option.  

file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
 
health_tracker_data_2020_1_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [13]:
################################################              Visualize data      ################################################

# Step 1: Display the data

# Strictly speaking, this is not part of the ETL process, but displaying the data gives us a look at the data that we are working with. 

# We note a few phenomena in the data:

          # Sensor anomalies - Sensors cannot record negative heart rates, so any negative values in the data are anomalies.
          # Wake/Sleep cycle - We notice that users have a consistent wake/sleep cycle alternating between steady high and low heart rates.
          # Elevated activity - Some users have irregular periods of high activity.
  
# Step 1: Display the data

display(health_tracker_data_2020_1_df)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [14]:
# Step 2: Configure the visualization

# Note that we have used a Databricks visualization to visualize the sensor data over time. We have used the following plot options to configure the visualization: 
      # Keys: time
      # Series groupings: device_id
      # Values: heartrate
      # Aggregation: SUM
      # Display Type: Bar Chart

In [15]:
###########################################           Lesson 3 of 15           ######################################################
###########################################         Create a Parquet table     ###################################################### 

# Now that we have used Databricks to preview the data, we'll work through the process of creating a Parquet table. This table will be used in the next lesson to show the ease of converting existing Parquet tables to Delta tables.

# The development pattern used to create a Parquet table is similar to that used in creating a Delta table. There are a few issues that arise as part of the process, however. In particular, working with Parquet-based tables often requires table repairs to work with them.

# In subsequent lessons, we'll see that creating a Delta table does not have the same issues.

In [16]:
# Step 1: Remove files in the /dbacademy/DLRS/healthtracker/processed directory

dbutils.fs.rm(health_tracker + "processed", recurse=True)

In [17]:
# Step 2: Transform the data 
# We will perform data engineering on the data with the following transformations:
# Use the from_unixtime Spark SQL function to transform the unix timestamp into a time string
# Cast the time column to type timestamp to replace the column time
# Cast the time column to type date to create the column dte
# Select the columns in the order in which we would like them to be writte
# As this is a process that we will perform on each dataset as it is loaded we compose a function to perform the necessary transformations.

# This function, process_health_tracker_data, can be reused each time.

from pyspark.sql.functions import col, from_unixtime
 
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)
processedDF.head(10)
processedDF.columns

In [18]:
## Step 3: Write the Files to the processed directory
(processedDF.write
 .mode("overwrite")
 .format("parquet")
 .partitionBy("p_device_id")
 .save(health_tracker + "processed"))

In [19]:
# Step 4: Register the table in the metastore
# Next, use Spark SQL to register the table in the metastore. We specify the table format as parquet and we refer to the location where we wrote the parquet files.

In [20]:
%sql 

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed                        
USING PARQUET                
LOCATION "/dbacademy/mankarm/DLRS/healthtracker/processed"

In [21]:
############### Step 5: Verify and repair the Parquet-based Data Lake table

# Step 5a: Count the records in the health_tracker_processed table
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [22]:
# Step 5b: Register the partitions

In [23]:
%sql

MSCK REPAIR TABLE health_tracker_processed

In [24]:
# Step 5c: Count the records in the health_tracker_processed table
health_tracker_processed.count()

In [25]:
###########################################           Lesson 4 of 15           ######################################################
###########################################       DELTA TABLE FUNDAMENTALS     ######################################################



# Recall that a Delta table consists of three things:

          # 1. The data files kept in object storage (AWS S3, Azure Data Lake Storage)

          # 2. The Delta Transaction Log saved with the data files in object storage 

          # 3. A table registered in the Metastore. This step is optional

# You can create a Delta table by either of the following methods:

          # 1. Convert parquet files using the Delta Lake API

          # 2. Write new files using the Spark DataFrame writer with .format("delta")

# Either of these will automatically create the Transaction Log in the same top-level directory as the data files. Optionally, you can also register the table in the Metastore.

#############################################       Creating a table                 ###########################################################

# Creating a table is one of the most fundamental actions performed when working with Delta Lake. With Delta Lake, you create tables:

          # 1. When ingesting new files into a Delta Table for the first time

          # 2. By transforming an existing Parquet table to a Delta table 

In [26]:
# Step 1: Describe the health_tracker_processed table

# Before we convert the health_tracker_processed table, let's use the DESCRIBE DETAIL Spark SQL command to display the attributes of the table.

# Note that the table has format PARQUET.

In [27]:
%sql

DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
PARQUET,,dbacademy_mankarm.health_tracker_processed,,dbfs:/dbacademy/mankarm/DLRS/healthtracker/processed,2020-08-08T12:02:02.000+0000,,List(p_device_id),,,Map(transient_lastDdlTime -> 1596888124),,


In [28]:
###########################################           Lesson 5 of 15           ######################################################
#####################################    Convert a Parquet table to a Delta table     ###############################################


# When working with Delta Lake on Databricks, Parquet files can be converted in-place to Delta files. Next, we will convert the Parquet-based data lake table we created previously into a Delta table. In doing so, we are defining the single source of truth at the heart of our EDSS.

In [29]:
# Step 1: Convert the files to Delta files

# First, we'll convert the files in place to Parquet files. The conversion creates a Delta Lake transaction log that tracks the files. Now, the directory is a directory of Delta files.

from delta.tables import DeltaTable

parquet_table = f"parquet.`{health_tracker}processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

In [30]:
#Step 2: Register the Delta table

#At this point, the files containing our records have been converted to Delta files. The Metastore, however, has not been updated to reflect the change. To change this we re-register the table in the Metastore. The Spark SQL command will automatically infer the data schema by reading the footers of the Delta files. 

In [31]:
%sql

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"

In [32]:
# Step 3: Describe the health_tracker_processed table

# We can verify the conversion of the Parquet-based data lake table to a Delta table using the DESCRIBE Spark SQL command. Note that the format of the table is Delta.

In [33]:
%sql
DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,43226958-0043-4ce9-99f2-4d581614d24d,dbacademy_mankarm.health_tracker_processed,,dbfs:/dbacademy/mankarm/DLRS/healthtracker/processed,2020-08-08T12:02:07.711+0000,2020-08-08T12:02:10.000+0000,List(p_device_id),5,56938,Map(),1,2


In [34]:
# Step 4: Count the records in the health_tracker_processed table

# We count the records in the health_tracker_processed table with Apache Spark. 

# With Delta Lake, the Delta table is immediately ready for use. The transaction log stored with the Delta files contains all metadata needed for an immediate query.

health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [35]:
###########################################           Lesson 6 of 15           ######################################################
###########################################       Create a new Delta table     #####################################################

  
#  Now, we'll create a new Delta table. We'll do this by creating an aggregate table from the data in the health_track_processed Delta table we just created. Within the context of our EDSS, this is a downstream aggregate table or data mart.

In [36]:
# Step 1: Remove files in the health_tracker_user_analytics directory

#This step will make the notebook idempotent.

dbutils.fs.rm(health_tracker + "gold/health_tracker_user_analytics",recurse=True)

In [37]:
# Step 2: Create an aggregate DataFrame

# The subquery used to define the table aggregates the health_tracker_processed Delta table by device, and computes summary statistics.

from pyspark.sql.functions import col, avg, max, stddev

health_tracker_gold_user_analytics = (
  health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
)

In [38]:
# Write the Delta files

(health_tracker_gold_user_analytics.write
 .format("delta")
 .mode("overwrite")
 .save(health_tracker + "gold/health_tracker_user_analytics"))

In [39]:
# Step 4: Register the Delta table in the Metastore

In [40]:
%sql

DROP TABLE IF EXISTS health_tracker_gold_user_analytics;

CREATE TABLE health_tracker_gold_user_analytics
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/gold/health_tracker_user_analytics"

In [41]:
# Prepare a dashboard using the health_tracker_user_analytics table
# The health_tracker_user_analytics table could be used to define a dashboard. The query used to create the table could be issued against the health_tracker_processed table nightly to prepare the dashboard for the following business day.

          # Keys: p_device_id

          # Series groupings: None

          # Values: max_heartrate, avg_heartrate, stddev_heartrate

          # Aggregation: SUM

          # Display Type: Bar Chart

In [42]:
display(spark.read.table("health_tracker_gold_user_analytics"))

p_device_id,avg_heartrate,max_heartrate,stddev_heartrate
4,83.08377376550952,173.5770785921,34.16032267669617
2,79.99574196662837,184.7433209566,31.408007741222
1,78.5776567337699,168.114687819,31.61967903784856
3,82.65419819635204,171.8435388833,30.92932874000444
0,81.21484441523789,186.4790827731,31.343789198032887


In [43]:
###########################################           Lesson 7 of 15           ######################################################
###########################################       Batch write to Delta tables  ######################################################

# Appending files to an existing Delta table

# Next, we look at two patterns for modifying existing Delta tables:

# 1.appending files to an existing directory of Delta files
# 2.merging a set of updates and insertions 

# In this lesson, we explore the first.

# Within the context of our data ingestion pipeline, this is the addition of new raw files to our single source of truth.

In [44]:
# Step 1: Load the next month of data

#Here, we append the next month of records. We begin by loading the data from the file health_tracker_data_2020_2.json, using the .format("json") option as before.

file_path = health_tracker + "raw/health_tracker_data_2020_2.json"
 
health_tracker_data_2020_2_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [45]:
# Step 2: Transform the data
# We perform the same data engineering on the data:
##         use the from_unixtime Spark SQL function to transform the unixtime into a time string 
##         cast the time column to type timestamp to replace the column time
##         cast the time column to type date to create the column dte

# This is done using the process_health_tracker_data function we defined previously.

processedDF = process_health_tracker_data(health_tracker_data_2020_2_df)

In [46]:
# Step 3: Append the data to the health_tracker_processed Delta table

#We do this using .mode("append"). Note that it is not necessary to perform any action on the Metastore.

(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

In [47]:
#######################               View the commit using Time Travel                ##############################

#Delta Lake can query an earlier version of a Delta table using a feature known as Time Travel. Here, we query the data as of version 0, that is, the initial conversion of the table from Parquet.

In [48]:
#####################################         Step 1: View the table as of version 0              ##################################### 

#This is done by specifying the option "versionAsOf" as 0. When we time travel to Version 0, we see only the first month of data (five device measurements, 24 hours a day, for 31 days).

In [49]:
(spark.read
 .option("versionAsOf", 0)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [50]:
#####################              Step 2: Count the most recent version        #########################

# When we query the table without specifying a version, it shows the latest version of the table and includes the new records added.

# When we look at the current version, we expect to see two months of data, five device measurements, 24 hours a day for (31 + 29) days, or 7200 records. (The data was recorded during the month of February in a leap year, which is why there are 29 days in the month.)

In [51]:
# Note that we do not have a correct count. We are missing 72 records.
health_tracker_processed.count()

In [52]:
###########################################           Lesson 8 of 15           ######################################################
###########################################       Late-arriving data           ######################################################

# In the previous lesson, we used Apache Spark to perform a batch update of the health_tracker_processed table. When the process was complete, we counted the number of records in the table. We discovered that some records were missing.

# The absence of records from the last few days of the month shows a phenomenon that may often occur in a production data pipeline: late-arriving data. Delta Lake allows us to process data as it arrives and is prepared to handle the occurrence of late-arriving data. 

In [53]:
## Step 1: Count the number of records per device
# It looks like device 4 is missing 72 records. 

from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id")
  .agg(count("*"))
)

p_device_id,count(1)
3,1440
1,1440
4,1368
2,1440
0,1440


In [54]:
##Step 2: Plot the missing records

# Let’s run a query to discover the timing of the missing records. We use a Databricks visualization to display the number of records per day. It appears that we have no records for device 4 for the last few days of the month.

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)

dte,time,heartrate,name,p_device_id
2020-02-01,2020-02-01T00:00:00.000+0000,49.7052799735,Minh Nguyen,3
2020-02-01,2020-02-01T01:00:00.000+0000,49.3746419604,Minh Nguyen,3
2020-02-01,2020-02-01T02:00:00.000+0000,49.6839001467,Minh Nguyen,3
2020-02-01,2020-02-01T03:00:00.000+0000,48.7108091292,Minh Nguyen,3
2020-02-01,2020-02-01T04:00:00.000+0000,48.8903628109,Minh Nguyen,3
2020-02-01,2020-02-01T05:00:00.000+0000,49.1219650233,Minh Nguyen,3
2020-02-01,2020-02-01T06:00:00.000+0000,50.8105042748,Minh Nguyen,3
2020-02-01,2020-02-01T07:00:00.000+0000,50.0776314468,Minh Nguyen,3
2020-02-01,2020-02-01T08:00:00.000+0000,82.3788322489,Minh Nguyen,3
2020-02-01,2020-02-01T09:00:00.000+0000,83.9594005885,Minh Nguyen,3


In [55]:
# Keys: dte
# Series groupings: p_device_id
# Values: heartrate
# Aggregation: COUNT
# Display Type: Bar Chart

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)

dte,time,heartrate,name,p_device_id
2020-02-01,2020-02-01T00:00:00.000+0000,49.7052799735,Minh Nguyen,3
2020-02-01,2020-02-01T01:00:00.000+0000,49.3746419604,Minh Nguyen,3
2020-02-01,2020-02-01T02:00:00.000+0000,49.6839001467,Minh Nguyen,3
2020-02-01,2020-02-01T03:00:00.000+0000,48.7108091292,Minh Nguyen,3
2020-02-01,2020-02-01T04:00:00.000+0000,48.8903628109,Minh Nguyen,3
2020-02-01,2020-02-01T05:00:00.000+0000,49.1219650233,Minh Nguyen,3
2020-02-01,2020-02-01T06:00:00.000+0000,50.8105042748,Minh Nguyen,3
2020-02-01,2020-02-01T07:00:00.000+0000,50.0776314468,Minh Nguyen,3
2020-02-01,2020-02-01T08:00:00.000+0000,82.3788322489,Minh Nguyen,3
2020-02-01,2020-02-01T09:00:00.000+0000,83.9594005885,Minh Nguyen,3


In [56]:
###########################################            Lesson 9 of 15                            ######################################### 
###########################################      Broken readings in the table                    ######################################### 

# Upon our initial load of data into the health_tracker_processed table, we noted that there are broken records in the data. In particular, we made a note of the fact that several negative readings were present even though it is impossible to record a negative heart rate. 

# Let’s assess the extent of these broken readings in our table.

# Step 1: Create temporary view for broken readings

# First, we create a temporary view for the Broken Readings in the health_tracker_processed table.

broken_readings = (
  health_tracker_processed
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)

broken_readings.createOrReplaceTempView("broken_readings")

In [57]:
# Step 2: Display broken_readings

# Display the records in the broken_readings view, again using a Databricks visualization. 

# Note that most days have at least one broken reading and that some have more than one. 


In [58]:
%sql

SELECT * FROM broken_readings

dte,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


In [59]:
# Step 3: Sum the broken readings

In [60]:
%sql
 
SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
60


In [61]:
# At this point, we've finished exploring the data in our single source of truth. Next, we will upsert into a Delta table. 

In [62]:
###########################################                 Lesson 10 of 15        ###########################################
###########################################        Repair records with an Upsert   ###########################################

# In the previous lesson, we identified two issues with the health_tracker_processed table:

# There were 72 missing records

# There were 67 records with broken readings

# In this lesson, we will repair the table by modifying the health_tracker_processed table.

# There are two patterns for modifying existing Delta tables. 

#                      appending files to an existing directory of Delta files
#                      merging a set of updates and insertions 

# In this lesson, we explore the second.

#he word "upsert" is a portmanteau of the words "update" and "insert," and this is what it does. An upsert will update records where some criteria are met and otherwise will insert the record. 

#When upserting into an existing Delta table, use Spark SQL to perform the merge from another registered table or view. The Transaction Log records the transaction, and the Metastore immediately reflects the changes.

#The merge appends both the new/inserted files and the files containing the updates to the Delta file directory. The transaction log tells the Delta reader which file to use for each record.

In [63]:
################################ Prepare updates DataFrame ################################################

# To repair the broken sensor readings (less than zero), we'll interpolate using the value recorded before and after for each device. The Spark SQL functions LAG and LEAD will make this a trivial calculation. 

# We'll write these values to a temporary view called updates. This view will be used later to upsert values into our health_tracker_processed Delta table.

In [64]:
# Step 1: Create a DataFrame interpolating broken values

from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead
 
dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
 
interpolatedDF = (
  spark.read
  .table("health_tracker_processed")
  .select(col("dte"),
          col("time"),
          col("heartrate"),
          lag(col("heartrate")).over(dteWindow).alias("prev_amt"),
          lead(col("heartrate")).over(dteWindow).alias("next_amt"),
          col("name"),
          col("p_device_id"))
)

In [65]:
# Step 2: Create a DataFrame of updates

updatesDF = (
  interpolatedDF
  .where(col("heartrate") < 0)
  .select(col("dte"),
          col("time"),
          ((col("prev_amt") + col("next_amt"))/2).alias("heartrate"),
          col("name"),
          col("p_device_id"))
)

In [66]:
# Step 3: View the schemas of the updatesDF and health_tracker_processed table

# We use the .printSchema() function to view the schema of the health_tracker_processed table
health_tracker_processed.printSchema()
updatesDF.printSchema()

In [67]:
# Step 4: Verify UpdatesDF

# Perform a .count() on the updatesDF view. It should have the same number of records as the SUM performed on the broken_readings view.

updatesDF.count()

In [68]:
##############################     Prepare inserts DataFrame           ##################################

# It turns out that our expectation of receiving the missing records late was correct. These records have subsequently been made available to us as the file health_tracker_data_2020_02_01.json.

# Step 1: Load the late-arriving data

file_path = health_tracker + "raw/health_tracker_data_2020_2_late.json"
 
health_tracker_data_2020_2_late_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [69]:
# Step 2: Transform the data

# In addition to updating the broken records, we wish to add this late-arriving data. We begin by preparing another temporary view with the appropriate transformations:

# Use the from_unixtime Spark SQL function to transform the unixtime into a time string

# Cast the time column to type timestamp to replace the column time

# Cast the time column to type date to create the column dte

insertsDF = process_health_tracker_data(health_tracker_data_2020_2_late_df)

In [70]:
# Step 3: View the schema of the inserts DataFrame

insertsDF.printSchema()

In [71]:
#####################################         Prepare Upserts DataFrame        ##################################

# Step 1: Create the union DataFrame

# Finally, we prepare the upsertsDF that consists of all the records in both the updatesDF and the insertsDF. We use the DataFrame .union() command to create the view.

upsertsDF = updatesDF.union(insertsDF)

In [72]:
# Step 2: View the schema

upsertsDF.printSchema()

In [73]:
#####################    Perform Upsert into the health_tracker_processed table      ########################

# You can upsert data into a Delta table using the merge operation. This operation is similar to the SQL MERGE command but has added support for deletes and other conditions in updates, inserts, and deletes. In other words, using the DeltaTable command .merge() provides full support for an upsert operation.

# Step 1: Perform the Upsert

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""

update = { "heartrate" : "upserts.heartrate" }

insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [74]:
###################################         View the commit using time travel     ######################################

# Step 1: View the table as of version 1

# This is done by specifying the option "versionAsOf" as 1. When we time travel to Version 0, we see only the first month of data.

# When we time travel to Version 1, we see the first two months of data, minus the 72 missing records.


(spark.read
 .option("versionAsOf", 1)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [75]:
# Step 2: Count the most recent version

health_tracker_processed.count()

In [76]:
# Step 3: Describe the history of the health_tracker_processed table

# The .history() Delta table command provides provenance information, including the operation, user, and so on, for each action performed on a table. 

# Note that each operation performed on the table is given a version number. These are the numbers we have been using when performing a time travel query on the table, e.g., SELECT COUNT(*) FROM health_tracker_processed VERSION AS OF 1.

display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
2,2020-08-08T12:03:45.000+0000,8642660593864136,mangesh.mankar04@gmail.com,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(1650038959257540),0808-095332-abed997,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numFiles -> 40, numTargetFilesAfterSkipping -> 10, numTargetFilesAdded -> 40, numTargetRowsInserted -> 72, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numParts -> 40, numOutputBytes -> 166790, numSourceRows -> 132, numTargetFilesRemoved -> 10, numTargetFilesBeforeSkipping -> 10)"
1,2020-08-08T12:02:45.000+0000,8642660593864136,mangesh.mankar04@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1650038959257540),0808-095332-abed997,0.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52716, numOutputRows -> 3408, numParts -> 5)"
0,2020-08-08T12:02:10.000+0000,8642660593864136,mangesh.mankar04@gmail.com,CONVERT,"Map(numFiles -> 5, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(1650038959257540),0808-095332-abed997,,,,Map(numConvertedFiles -> 5)


In [77]:
##############################################              Lesson 11 of 15             ##################################################
##############################################           Perform a second Upsert        ##################################################
# In the previous lesson, we performed an upsert to the health_tracker_processed table, simultaneously:

# updated records containing broken readings

# Inserting the late-arriving data

# In doing so, we added more broken readings!

In [78]:
# Step 1: Sum the broken readings

# Let’s sum the records in the broken_readings view once more. Note that there are still broken readings in the table. This is because many of the records inserted as part of the upsert also contained broken readings. 

In [79]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
1


In [80]:
# Step 2: Verify that these are new broken readings

# Let’s query the broken_readings with a WHERE clause to verify that these are indeed new broken readings introduced by inserting the late-arriving data. 

In [81]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings WHERE dte < '2020-02-25'

sum(count(heartrate))
""


In [82]:
# Step 3: Verify updates

# Perform a .count() on the updatesDF view. 

# Note that it is not necessary to redefine the DataFrame. Recall that a Spark DataFrame is lazily defined, pulling the correct number of updates when an action is triggered.

# It should have the same number of records as the SUM performed on the broken_readings view.

updatesDF.count()

In [83]:
# Step 4: Perform Upsert into the health_tracker_processed table

# Once more, we upsert into the health_tracker_processed Table using the DeltaTable command .merge().

upsertsDF = updatesDF

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [84]:
# Step 5: Sum the broken readings

# Let’s sum the records in the broken_readings view one last time. Finally, there are no more broken readings in the table. 

In [85]:
%sql

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
""


In [86]:
############################                      Lesson 12 of 15                    #######################################
############################                Evolution of data being ingested           #######################################

# Evolution of data being ingested

# It is not uncommon that schema of the data being ingested into the EDSS will evolve over time. In this case, the simulated health tracker device has a new version available and the data being transmitted now contains an additional field indicating which type of device is being used.

# Here is a sample of the data we will be using. Each line is a string representing a valid JSON object and is similar to the kind of string that would be passed by a Kafka stream processing server.


#           {"device_id":0,"heartrate":57.6447293596,"name":"Deborah Powell","time":1.5830208E9,"device_type":"version 2"}
#           {"device_id":0,"heartrate":57.6175546013,"name":"Deborah Powell","time":1.5830244E9,"device_type":"version 2"}
#           {"device_id":0,"heartrate":57.8486376876,"name":"Deborah Powell","time":1.583028E9,"device_type":"version 2"}
#           {"device_id":0,"heartrate":57.8821378637,"name":"Deborah Powell","time":1.5830316E9,"device_type":"version 2"}
#           {"device_id":0,"heartrate":59.0531490807,"name":"Deborah Powell","time":1.5830352E9,"device_type":"version 2"}

# The data now has the following schema:

# name: string
# heartrate: double
# device_id: long
# time: long
# device_type: string

In [87]:
#####            Appending files to an existing Delta table           #####  

# Our goal is to append the next month of data.

# Step 1: Load the next month of data

# We begin by loading the data from the file health_tracker_data_2020_3.json, using the .format("json") option as before.

file_path = health_tracker + "raw/health_tracker_data_2020_3.json"

health_tracker_data_2020_3_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [88]:
# Step 2: Transform the data

# We perform the same data engineering on the data:

# 1.Use the from_unixtime Spark SQL function to transform the unix timestamp into a time string

# 2.Cast the time column to type timestamp to replace the column time

# 3.Cast the time column to type date to create the column dte

# Note that we redefine the function process_health_tracker_data to accommodate the new schema.

def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "device_type", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_3_df)

In [89]:
# Step 3: Append the data to the health_tracker_processed Delta table

# We do this using .mode("append"). 

(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

# When we try to run this command, we receive the error shown below because there is a mismatch between the table and data schemas.

In [90]:
# What Is schema enforcement?

# Schema enforcement, also known as schema validation, is a safeguard in Delta Lake that ensures data quality by rejecting writes to a table that do not match the table’s schema. Like the front desk manager at a busy restaurant that only accepts reservations, it checks to see whether each column in data inserted into the table is on its list of expected columns (in other words, whether each one has a “reservation”), and rejects any writes with columns that aren’t on the list or with data type mismatches.

In [91]:
###################### Appending files to an existing Delta table with schema evolution #############################

# In this case, we would like our table to accept the new schema and add the data to the table.

# What Is schema evolution?

# Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns.

In [92]:
# Step 1: Append the data with schema evolution to the health_tracker_processed Delta table

(processedDF.write
 .mode("append")
 .option("mergeSchema", True)
 .format("delta")
 .save(health_tracker + "processed"))

In [93]:
######### Verify the commit 

# Step 1: Count the most recent version

# When we look at the current version, we expect to see three months of data, five device measurements, 24 hours a day for (31 + 29 + 31) days, or 10920 records. 

health_tracker_processed.count()

In [94]:
######################################                 Lesson 13 of 15                    ########################################
######################################           Delete data and recover lost data        ########################################

In [95]:
######################################   Delete all records associated with a user    ######################################

# Under the European Union General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA), a user of the health tracker device has the right to request that their data be expunged from the system. We might simply do this by deleting all records associated with that user's device id.

# Step 1: Delete all records for device 4

# Here, we use the DELETE Spark SQL command to remove all records from the health_tracker_processed table that match the given predicate.

processedDeltaTable.delete("p_device_id = 4")

In [96]:
######################################              Recover lost data         ######################################

# In the previous lesson, we deleted all records from the health_tracker_processed table for the health tracker device with id 4. Suppose that the user did not wish to remove all of their data, but merely to have their name scrubbed from the system. In this lesson, we use the Time Travel capability of Delta Lake to recover everything but the user’s name.


# Step 1: Prepare new Upserts view

# We prepare a view for upserting using Time Travel to recover the missing records. 

# Note that we have replaced the entire name column with the value NULL.

from pyspark.sql.functions import lit

upsertsDF = (
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
)

In [97]:
# Step 2: Perform Upsert into the health_tracker_processed table

# Once more, we upsert into the health_tracker_processed table using the  Delta table command .merge().

# Note that it is necessary to define 1) the reference to the Delta table and 2) the insert logic because the schema has changed.


processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

insert = {
  "dte" : "upserts.dte",
  "time" : "upserts.time",
  "device_type" : "upserts.device_type",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "p_device_id" : "upserts.p_device_id"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [98]:
# Step 3: Count the most recent version

# When we look at the current version, we expect to see three months of data, five device measurements, 24 hours a day for (31 + 29 + 31) days, or 10920 records.

health_tracker_processed.count()

In [99]:
# Step 4: Query device 4 to demonstrate compliance

# We query the health_tracker_processed table to demonstrate that the name associated with device 4 has indeed been removed.

display(health_tracker_processed.where("p_device_id = 4"))

dte,time,heartrate,name,p_device_id
2020-03-01,2020-03-01T00:00:00.000+0000,97.8678768636,,4
2020-03-01,2020-03-01T01:00:00.000+0000,97.586595396,,4
2020-03-01,2020-03-01T02:00:00.000+0000,97.188151848,,4
2020-03-01,2020-03-01T03:00:00.000+0000,97.4361573672,,4
2020-03-01,2020-03-01T04:00:00.000+0000,95.8997954454,,4
2020-03-01,2020-03-01T05:00:00.000+0000,96.5277339825,,4
2020-03-01,2020-03-01T06:00:00.000+0000,98.1774838993,,4
2020-03-01,2020-03-01T07:00:00.000+0000,95.8929343311,,4
2020-03-01,2020-03-01T08:00:00.000+0000,95.545442375,,4
2020-03-01,2020-03-01T09:00:00.000+0000,95.3578614286,,4


In [100]:
######################################                 Lesson 14 of 15                    ########################################
######################################      Maintain compliance with a vacuum operation   ########################################


#Due to the power of the Delta Lake Time Travel feature, we are not yet in compliance as the table could simply be queried against an earlier version to identify the name of the user associated with device 4.

In [101]:
# Step 1: Query an earlier table version 

# We query the health_tracker_processed table against an earlier version to demonstrate that it is still possible to retrieve the name associated with device 4.

display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id,device_type
2020-01-01,2020-01-01T02:00:00.000+0000,59.7552762926,James Hou,4,
2020-01-01,2020-01-01T03:00:00.000+0000,61.8018342845,James Hou,4,
2020-01-01,2020-01-01T13:00:00.000+0000,100.881575671,James Hou,4,
2020-01-01,2020-01-01T17:00:00.000+0000,101.1937386946,James Hou,4,
2020-01-02,2020-01-02T09:00:00.000+0000,82.960108924,James Hou,4,
2020-01-02,2020-01-02T14:00:00.000+0000,83.1715551333,James Hou,4,
2020-01-02,2020-01-02T22:00:00.000+0000,83.3651211951,James Hou,4,
2020-01-03,2020-01-03T01:00:00.000+0000,59.3915537411,James Hou,4,
2020-01-03,2020-01-03T07:00:00.000+0000,98.4183170538,James Hou,4,
2020-01-04,2020-01-04T09:00:00.000+0000,81.2410115476,James Hou,4,


In [102]:
# Step 2: Vacuum table to remove old files

# The VACUUM Spark SQL command can be used to solve this problem. The VACUUM command recursively vacuums directories associated with the Delta table and removes files that are no longer in the latest state of the transaction log for that table and that are older than a retention threshold. The default threshold is 7 days. 

processedDeltaTable.vacuum(0)

#Delta table retention period

#When we run this command, we receive the below error. The default threshold is in place to prevent corruption of the Delta table.

In [103]:
# Step 3: Set Delta to allow the operation

# To demonstrate the VACUUM command, we set our retention period to 0 hours to be able to remove the questionable files now. This is typically not a best practice and in fact, there are safeguards in place to prevent this operation from being performed.

# For demonstration purposes, we will set Delta to allow this operation

spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

In [104]:
# Step 4: Vacuum table to remove old files

processedDeltaTable.vacuum(0)

In [105]:
# Step 5: Attempt to query an earlier version

# Now when we attempt to query an earlier version, an error is thrown.

# This error indicates that we are not able to query data from this earlier version because the files have been expunged from the system.

display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

In [106]:
######################################                 Lesson 15 of 15                    ########################################
######################################      Maintain compliance with a vacuum operation   ########################################

# Congratulations!

# You have completed Delta Lake Rapid Start with Python.

# At this point, we invite you to think about the work we have done and how it relates to the full IoT data ingestion pipeline we have been designing. 

# In this course, we used Spark SQL and Delta Lake to create a single source of truth in our EDSS: the health_tracker_processed Delta table. 

# We did this through the following steps:

#          1.We converted an existing Parquet-based data lake table to a Delta table, health_tracker_processed.
#          2.We performed a batch upload of new data to this table.
#          3.We used Apache Spark to identify broken and missing records in this table.
#          4.We used Delta Lake’s upsert functionality, where we updated broken records and inserted missing records.
#          5.We evolved the schema of the Delta table.
#          6.We used Delta Lake’s Time Travel feature to scrub the personal data of a user intelligently.

# Additionally, we used Delta Lake to create an aggregate table, health_tracker_user_analytics, downstream from the health_tracker_processed table.