In [1]:
## Specify the database
username = "Bruno_M_Cervantes"
dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy_{username}")
spark.sql(f"USE dbacademy_{username}")
health_tracker = f"/dbacademy/{username}/DLRS/healthtracker/"

In [2]:
## Configure the Number of Shuffle Partitions
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [3]:
### Download the data to the driver

In [4]:
%sh


wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2_late.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_3.json

In [5]:
%sh ls

In [6]:
##Move the data to the raw directory
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json", 
              health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2.json", 
              health_tracker + "raw/health_tracker_data_2020_2.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2_late.json", 
              health_tracker + "raw/health_tracker_data_2020_2_late.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_3.json", 
              health_tracker + "raw/health_tracker_data_2020_3.json")

In [7]:
## Load the data
file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
 
health_tracker_data_2020_1_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [8]:
## Visualize data
display(health_tracker_data_2020_1_df)

#Keys: time
#Series groupings: device_id
#Values: heartrate
#Aggregation: SUM
#Display Type: Bar Chart

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [9]:
### Create a Parquet table

In [10]:
## Step 1: Remove files in the /dbacademy/DLRS/healthtracker/processed directory
dbutils.fs.rm(health_tracker + "processed", recurse=True)

In [11]:
## Step 2: Transform the data 

from pyspark.sql.functions import col, from_unixtime
 
def process_health_tracker_data(dataframe):
  """Use the from_unixtime Spark SQL function to transform the unixtime into a time string
     Cast the time column to type timestamp to replace the column time
     Cast the time column to type date to create the column dte"""
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

In [12]:
## Step 3: Write the Files to the processed directory
(processedDF.write
 .mode("overwrite")
 .format("parquet")
 .partitionBy("p_device_id")
 .save(health_tracker + "processed"))

In [13]:
## Step 4: Register the table in the metastore

In [14]:
%sql 

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed                        
USING PARQUET                
LOCATION "/dbacademy/Bruno_M_Cervantes/DLRS/healthtracker/processed"



In [15]:
# Step 5: Verify and repair the Parquet-based Data Lake table

health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()


In [16]:
# Step 5b: Register the partitions

In [17]:
%sql

MSCK REPAIR TABLE health_tracker_processed

In [18]:
health_tracker_processed.count()

In [19]:
### Delta table fundamentals
## Step 1: Describe the health_tracker_processed table

In [20]:
%sql

DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
PARQUET,,dbacademy_bruno_m_cervantes.health_tracker_processed,,dbfs:/dbacademy/Bruno_M_Cervantes/DLRS/healthtracker/processed,2020-09-09T09:32:56.000+0000,,List(p_device_id),,,Map(),,


In [21]:
### Convert a Parquet table to a Delta table
## Step 1: Convert the files to Delta files

In [22]:
from delta.tables import DeltaTable

parquet_table = f"parquet.`{health_tracker}processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

In [23]:
## Step 2: Register the Delta table

In [24]:
%sql

DROP TABLE IF EXISTS health_tracker_processed;

CREATE TABLE health_tracker_processed
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/processed"


In [25]:
## Step 3: Describe the health_tracker_processed table

In [26]:
%sql
DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,84ff9d22-8b0a-4deb-99c4-d3dadb188dc6,dbacademy_bruno_m_cervantes.health_tracker_processed,,dbfs:/dbacademy/Bruno_M_Cervantes/DLRS/healthtracker/processed,2020-09-09T09:39:02.899+0000,2020-09-09T09:39:07.000+0000,List(p_device_id),5,57108,Map(),1,2


In [27]:
## Step 4: Count the records in the health_tracker_processed table
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [28]:
### Create a new Delta table


In [29]:
## Step 1: Remove files in the health_tracker_user_analytics directory
dbutils.fs.rm(health_tracker + "gold/health_tracker_user_analytics",
              recurse=True)

In [30]:
## Step 2: Create an aggregate DataFrame

from pyspark.sql.functions import col, avg, max, stddev

health_tracker_gold_user_analytics = (
  health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
)

In [31]:
## Write the Delta files

(health_tracker_gold_user_analytics.write
 .format("delta")
 .mode("overwrite")
 .save(health_tracker + "gold/health_tracker_user_analytics"))

In [32]:
## Step 4: Register the Delta table in the Metastore

In [33]:
%sql

DROP TABLE IF EXISTS health_tracker_gold_user_analytics;

CREATE TABLE health_tracker_gold_user_analytics
USING DELTA
LOCATION "/dbacademy/$username/DLRS/healthtracker/gold/health_tracker_user_analytics"

In [34]:
## Prepare a dashboard using the health_tracker_user_analytics table
display(spark.read.table("health_tracker_gold_user_analytics"))
#Keys: p_device_id
#Series groupings: None
#Values: max_heartrate, avg_heartrate, stddev_heartrate
#Aggregation: SUM
#Display Type: Bar Chart


p_device_id,avg_heartrate,max_heartrate,stddev_heartrate
1,78.5776567337699,168.114687819,31.61967903784856
3,82.65419819635204,171.8435388833,30.92932874000444
4,83.08377376550952,173.5770785921,34.16032267669617
2,79.99574196662837,184.7433209566,31.408007741222
0,81.21484441523789,186.4790827731,31.343789198032887


In [35]:
### Batch write to Delta tables via Appending files to an existing directory of Delta files


In [36]:
## Step 1: Load the next month of data

file_path = health_tracker + "raw/health_tracker_data_2020_2.json"
 
health_tracker_data_2020_2_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [37]:
## Step 2: Transform the data

processedDF = process_health_tracker_data(health_tracker_data_2020_2_df)


In [38]:
## Step 3: Append the data to the health_tracker_processed Delta table
(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

In [39]:
### View the commit using Time Travel

In [40]:
## Step 1: View the table as of version 0
(spark.read
 .option("versionAsOf", 0)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [41]:
## Step 2: Count the most recent version
health_tracker_processed.count()

In [42]:
### Late Arriving Data

In [43]:
### Step 1: Count the number of records per device

from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id")
  .agg(count("*"))
)

# It looks like device 4 is missing 72 records. 

p_device_id,count(1)
1,1440
3,1440
2,1440
4,1368
0,1440


In [44]:
## Step 2: Plot the missing records

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)

#Keys: dte
#Series groupings: p_device_id
#Values: heartrate
#Aggregation: COUNT
#Display Type: Bar Chart

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,55.2272036665,Minh Nguyen,3
2020-01-01,2020-01-01T01:00:00.000+0000,56.035689123,Minh Nguyen,3
2020-01-01,2020-01-01T02:00:00.000+0000,55.6403282219,Minh Nguyen,3
2020-01-01,2020-01-01T03:00:00.000+0000,56.3692513843,Minh Nguyen,3
2020-01-01,2020-01-01T04:00:00.000+0000,56.5412281859,Minh Nguyen,3
2020-01-01,2020-01-01T05:00:00.000+0000,55.8311481148,Minh Nguyen,3
2020-01-01,2020-01-01T06:00:00.000+0000,54.9402513831,Minh Nguyen,3
2020-01-01,2020-01-01T07:00:00.000+0000,92.2205431894,Minh Nguyen,3
2020-01-01,2020-01-01T08:00:00.000+0000,93.8159033652,Minh Nguyen,3
2020-01-01,2020-01-01T09:00:00.000+0000,92.0210547557,Minh Nguyen,3


In [45]:
### Broken readings in the table

In [46]:
## Step 1: Create temporary view for broken readings
broken_readings = (
  health_tracker_processed
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)
 
broken_readings.createOrReplaceTempView("broken_readings")

In [47]:
## Step 2: Display broken_readings

In [48]:
%sql

SELECT * FROM broken_readings

dte,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


In [49]:
## Step 3: Sum the broken readings

In [50]:
%sql
 
SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
60


In [51]:
###  Repair records with an Upsert - merging a set of updates and insertions 

"""In the previous lesson, we identified two issues with the health_tracker_processed table:
There were 72 missing records
There were 60 records with broken readings

The word "upsert" is a portmanteau of the words "update" and "insert," and this is what it does. An upsert will update records where some criteria are met and otherwise will insert the record. 

When upserting into an existing Delta table, use Spark SQL to perform the merge from another registered table or view. The Transaction Log records the transaction, and the Metastore immediately reflects the changes.

The merge appends both the new/inserted files and the files containing the updates to the Delta file directory. The transaction log tells the Delta reader which file to use for each record.

To repair the broken sensor readings (less than zero), we'll interpolate using the value recorded before and after for each device. The Spark SQL functions LAG and LEAD will make this a trivial calculation. 

We'll write these values to a temporary view called updates. This view will be used later to upsert values into our health_tracker_processed Delta table.

"""

In [52]:
## Step 1: Create a DataFrame interpolating broken values

from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead
 
dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
 
interpolatedDF = (
  spark.read
  .table("health_tracker_processed")
  .select(col("dte"),
          col("time"),
          col("heartrate"),
          lag(col("heartrate")).over(dteWindow).alias("prev_amt"),
          lead(col("heartrate")).over(dteWindow).alias("next_amt"),
          col("name"),
          col("p_device_id"))
)

In [53]:
## Step 2: Create a DataFrame of updates
updatesDF = (
  interpolatedDF
  .where(col("heartrate") < 0)
  .select(col("dte"),
          col("time"),
          ((col("prev_amt") + col("next_amt"))/2).alias("heartrate"),
          col("name"),
          col("p_device_id"))
)

In [54]:
## Step 3: View the schemas of the updatesDF and health_tracker_processed table

health_tracker_processed.printSchema()
updatesDF.printSchema()

In [55]:
## Step 4: Verify UpdatesDF 
updatesDF.count()

In [56]:
## Prepare inserts DataFrame

""" It turns out that our expectation of receiving the missing records late was correct. These records have subsequently been made available to us as the file health_tracker_data_2020_02_01.json."""

In [57]:
## Step 1: Load the late-arriving data
file_path = health_tracker + "raw/health_tracker_data_2020_2_late.json"
 
health_tracker_data_2020_2_late_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [58]:
## Step 2: Transform the data

insertsDF = process_health_tracker_data(health_tracker_data_2020_2_late_df)


In [59]:
## Step 3: View the schema of the inserts DataFrame
insertsDF.printSchema()

In [60]:
### Prepare Upserts DataFrame

In [61]:
## Step 1: Create the union DataFrame

upsertsDF = updatesDF.union(insertsDF)


In [62]:
## Step 2: View the schema
upsertsDF.printSchema()

In [63]:
### Perform Upsert into the health_tracker_processed table

"""You can upsert data into a Delta table using the merge operation. This operation is similar to the SQL MERGE command but has added support for deletes and other conditions in updates, inserts, and deletes. In other words, using the DeltaTable command .merge() provides full support for an upsert operation."""

In [64]:
## Step 1: Perform the Upsert

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""

update = { "heartrate" : "upserts.heartrate" }

insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())


In [65]:
### View the commit using time travel

In [66]:
## Step 1: View the table as of version 1
(spark.read
 .option("versionAsOf", 1)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [67]:
## Step 2: Count the most recent version
health_tracker_processed.count()

In [68]:
## Step 3: Describe the history of the health_tracker_processed table

"""The .history() Delta table command provides provenance information, including the operation, user, and so on, for each action performed on a table. 

Note that each operation performed on the table is given a version number. These are the numbers we have been using when performing a time travel query on the table, e.g., SELECT COUNT(*) FROM health_tracker_processed VERSION AS OF 1."""

display(processedDeltaTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
2,2020-09-09T10:11:46.000+0000,3112411829984944,bruno.m.cervantes.q@gmail.com,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(806653998034433),0909-084717-fluff799,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 40, numTargetRowsInserted -> 72, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numSourceRows -> 132, numTargetFilesRemoved -> 10)"
1,2020-09-09T09:51:06.000+0000,3112411829984944,bruno.m.cervantes.q@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(806653998034433),0909-084717-fluff799,0.0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52887, numOutputRows -> 3408)"
0,2020-09-09T09:39:07.000+0000,3112411829984944,bruno.m.cervantes.q@gmail.com,CONVERT,"Map(numFiles -> 5, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(806653998034433),0909-084717-fluff799,,,,Map(numConvertedFiles -> 5)


In [69]:
### Perform a second Upsert

In [70]:
## Step 1: Sum the broken readings

In [71]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
1


In [72]:
## Step 2: Verify that these are new broken readings

In [73]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings WHERE dte < '2020-02-25'

sum(count(heartrate))
""


In [74]:
## Step 3: Verify updates
"""Note that it is not necessary to redefine the DataFrame. Recall that a Spark DataFrame is lazily defined, pulling the correct number of updates when an action is triggered.

It should have the same number of records as the SUM performed on the broken_readings view."""

updatesDF.count()

In [75]:
## Step 4: Perform Upsert into the health_tracker_processed table

upsertsDF = updatesDF

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [76]:
## Step 5: Sum the broken readings

In [77]:
%sql

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
""


In [78]:
### Evolution of data being ingested
"""It is not uncommon that schema of the data being ingested into the EDSS will evolve over time. In this case, the simulated health tracker device has a new version available and the data being transmitted now contains an additional field indicating which type of device is being used."""
### Appending files to an existing Delta table


In [79]:
## Step 1: Load the next month of data
file_path = health_tracker + "raw/health_tracker_data_2020_3.json"

health_tracker_data_2020_3_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [80]:
## Step 2: Transform the data: Note that we redefine the function process_health_tracker_data to accommodate the new schema.

In [81]:
def process_health_tracker_data(dataframe):
  """Use the from_unixtime Spark SQL function to transform the unix timestamp into a time string
     Cast the time column to type timestamp to replace the column time
     Cast the time column to type date to create the column dte"""
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "device_type", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_3_df)

In [82]:
(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

# This fails due tos chema enforcement
"""Schema enforcement, also known as schema validation, is a safeguard in Delta Lake that ensures data quality by rejecting writes to a table that do not match the table’s schema. Like the front desk manager at a busy restaurant that only accepts reservations, it checks to see whether each column in data inserted into the table is on its list of expected columns (in other words, whether each one has a “reservation”), and rejects any writes with columns that aren’t on the list or with data type mismatches."""

In [83]:
### Appending files to an existing Delta table with schema evolution

"""Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns."""

In [85]:
## Step 1: Append the data with schema evolution to the health_tracker_processed Delta table.
#See the mergeSchema
(processedDF.write
 .mode("append")
 .option("mergeSchema", True)
 .format("delta")
 .save(health_tracker + "processed"))

In [86]:
## Count the most recent version
health_tracker_processed.count()

In [87]:
### Delete data and recover lost data

In [88]:
## Step 1: Delete all records for device 4
processedDeltaTable.delete("p_device_id = 4")


In [89]:
### Recover lost data
"""Suppose that the user did not wish to remove all of their data, but merely to have their name scrubbed from the system. In this lesson, we use the Time Travel capability of Delta Lake to recover everything but the user’s name."""

In [90]:
## Step 1: Prepare new Upserts view

from pyspark.sql.functions import lit

upsertsDF = (
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
)

In [91]:
## Step 2: Perform Upsert into the health_tracker_processed table
"""Note that it is necessary to define 
1) the reference to the Delta table and 
2) the insert logic because the schema has changed."""

processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

insert = {
  "dte" : "upserts.dte",
  "time" : "upserts.time",
  "device_type" : "upserts.device_type",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "p_device_id" : "upserts.p_device_id"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [92]:
## Step 3: Count the most recent version
health_tracker_processed.count()

In [93]:
## Step 4: Query device 4 to demonstrate compliance
display(health_tracker_processed.where("p_device_id = 4"))

dte,time,heartrate,name,p_device_id
2020-03-01,2020-03-01T00:00:00.000+0000,97.8678768636,,4
2020-03-01,2020-03-01T01:00:00.000+0000,97.586595396,,4
2020-03-01,2020-03-01T02:00:00.000+0000,97.188151848,,4
2020-03-01,2020-03-01T03:00:00.000+0000,97.4361573672,,4
2020-03-01,2020-03-01T04:00:00.000+0000,95.8997954454,,4
2020-03-01,2020-03-01T05:00:00.000+0000,96.5277339825,,4
2020-03-01,2020-03-01T06:00:00.000+0000,98.1774838993,,4
2020-03-01,2020-03-01T07:00:00.000+0000,95.8929343311,,4
2020-03-01,2020-03-01T08:00:00.000+0000,95.545442375,,4
2020-03-01,2020-03-01T09:00:00.000+0000,95.3578614286,,4


In [94]:
### Maintain compliance with a vacuum operation
## Step 1: Query an earlier table version 
"""We query the health_tracker_processed table against an earlier version to demonstrate that it is still possible to retrieve the name associated with device 4."""

display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id,device_type
2020-03-01,2020-03-01T00:00:00.000+0000,97.8678768636,James Hou,4,version 2
2020-03-01,2020-03-01T01:00:00.000+0000,97.586595396,James Hou,4,version 2
2020-03-01,2020-03-01T02:00:00.000+0000,97.188151848,James Hou,4,version 2
2020-03-01,2020-03-01T03:00:00.000+0000,97.4361573672,James Hou,4,version 2
2020-03-01,2020-03-01T04:00:00.000+0000,95.8997954454,James Hou,4,version 2
2020-03-01,2020-03-01T05:00:00.000+0000,96.5277339825,James Hou,4,version 2
2020-03-01,2020-03-01T06:00:00.000+0000,98.1774838993,James Hou,4,version 2
2020-03-01,2020-03-01T07:00:00.000+0000,95.8929343311,James Hou,4,version 2
2020-03-01,2020-03-01T08:00:00.000+0000,95.545442375,James Hou,4,version 2
2020-03-01,2020-03-01T09:00:00.000+0000,95.3578614286,James Hou,4,version 2


In [95]:
## Step 2: Vacuum table to remove old files
"""The VACUUM Spark SQL command can be used to solve this problem. The VACUUM command recursively vacuums directories associated with the Delta table and removes files that are no longer in the latest state of the transaction log for that table and that are older than a retention threshold. The default threshold is 7 days. """
processedDeltaTable.vacuum(0)

# When we run this command, we receive the below error. The default threshold is in place to prevent corruption of the Delta table.

In [96]:
## Step 3: Set Delta to allow the operation

"""To demonstrate the VACUUM command, we set our retention period to 0 hours to be able to remove the questionable files now. This is typically not a best practice and in fact, there are safeguards in place to prevent this operation from being performed."""
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)


In [97]:
## Step 4: Vacuum table to remove old files
processedDeltaTable.vacuum(0)

In [98]:
## Step 5: Attempt to query an earlier version
display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

# This error indicates that we are not able to query data from this earlier version because the files have been expunged from the system.

In [99]:
"""
In this course, we used Spark SQL and Delta Lake to create a single source of truth in our EDSS: the health_tracker_processed Delta table. 


We did this through the following steps:

We converted an existing Parquet-based data lake table to a Delta table, health_tracker_processed.

We performed a batch upload of new data to this table.

We used Apache Spark to identify broken and missing records in this table.

We used Delta Lake’s upsert functionality, where we updated broken records and inserted missing records.

We evolved the schema of the Delta table.

We used Delta Lake’s Time Travel feature to scrub the personal data of a user intelligently.

Additionally, we used Delta Lake to create an aggregate table, health_tracker_user_analytics, downstream from the health_tracker_processed table."""