In [1]:
username = "csjusr1"
dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS dbacademy_{username}")
spark.sql(f"USE dbacademy_{username}")
health_tracker = f"/dbacademy/{username}/DLRS/healthtracker/"

In [2]:
spark.conf.set("spark.sql.shuffle.partitions",8)

In [3]:
%sh

wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_1.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_2_late.json
wget https://hadoop-and-big-data.s3-us-west-2.amazonaws.com/fitness-tracker/health_tracker_data_2020_3.json

In [4]:
%sh ls

In [5]:
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_1.json",health_tracker + "raw/health_tracker_data_2020_1.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2.json",health_tracker + "raw/health_tracker_data_2020_2.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_2_late.json",health_tracker + "raw/health_tracker_data_2020_2_late.json")
dbutils.fs.mv("file:/databricks/driver/health_tracker_data_2020_3.json",health_tracker + "raw/health_tracker_data_2020_3.json")

In [6]:
file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
health_tracker_data_2020_1_df = (spark.read.format("json").load(file_path))

In [7]:
display(health_tracker_data_2020_1_df)

device_id,heartrate,name,time
0,52.8139067501,Deborah Powell,1577836800.0
0,53.9078900098,Deborah Powell,1577840400.0
0,52.7129593616,Deborah Powell,1577844000.0
0,52.2880422685,Deborah Powell,1577847600.0
0,52.5156095386,Deborah Powell,1577851200.0
0,53.6280743846,Deborah Powell,1577854800.0
0,52.1760037066,Deborah Powell,1577858400.0
0,90.0456721836,Deborah Powell,1577862000.0
0,89.4695644522,Deborah Powell,1577865600.0
0,88.1490304138,Deborah Powell,1577869200.0


In [8]:
dbutils.fs.rm(health_tracker + "processed", recurse=True)

In [9]:
from pyspark.sql.functions import col, from_unixtime
 
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)
  

In [10]:
(processedDF.write
.mode("overwrite")
 .format("parquet")
 .partitionBy("p_device_id")
 .save(health_tracker + "processed"))

In [11]:
display(processedDF)

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,52.8139067501,Deborah Powell,0
2020-01-01,2020-01-01T01:00:00.000+0000,53.9078900098,Deborah Powell,0
2020-01-01,2020-01-01T02:00:00.000+0000,52.7129593616,Deborah Powell,0
2020-01-01,2020-01-01T03:00:00.000+0000,52.2880422685,Deborah Powell,0
2020-01-01,2020-01-01T04:00:00.000+0000,52.5156095386,Deborah Powell,0
2020-01-01,2020-01-01T05:00:00.000+0000,53.6280743846,Deborah Powell,0
2020-01-01,2020-01-01T06:00:00.000+0000,52.1760037066,Deborah Powell,0
2020-01-01,2020-01-01T07:00:00.000+0000,90.0456721836,Deborah Powell,0
2020-01-01,2020-01-01T08:00:00.000+0000,89.4695644522,Deborah Powell,0
2020-01-01,2020-01-01T09:00:00.000+0000,88.1490304138,Deborah Powell,0


In [12]:
%sql
DROP TABLE IF EXISTS health_tracker_processed;
CREATE TABLE health_tracker_processed
USING PARQUET
LOCATION "/dbacademy/csjusr1/DLRS/healthtracker/processed"


In [13]:
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [14]:
%sql
MSCK REPAIR TABLE  health_tracker_processed

In [15]:
health_tracker_processed.count()

In [16]:
%sql
DESCRIBE DETAIL health_tracker_processed

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
PARQUET,,dbacademy_csjusr1.health_tracker_processed,,dbfs:/dbacademy/csjusr1/DLRS/healthtracker/processed,2020-10-04T19:58:46.000+0000,,List(p_device_id),,,Map(),,


In [17]:
from delta.tables import DeltaTable

parquet_table = f"parquet.`{health_tracker}processed`"
partitioning_scheme = "p_device_id int"

DeltaTable.convertToDelta(spark, parquet_table, partitioning_scheme)

In [18]:
%sql
DROP TABLE IF EXISTS health_tracker_processed;
CREATE TABLE health_tracker_processed
USING DELTA
LOCATION "/dbacademy/csjusr1/DLRS/healthtracker/processed"

In [19]:
%sql
DESCRIBE DETAIL health_tracker_processed


format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,4be413d0-ff8d-4f10-89e2-7107e8069dd6,dbacademy_csjusr1.health_tracker_processed,,dbfs:/dbacademy/csjusr1/DLRS/healthtracker/processed,2020-10-04T19:58:59.723+0000,2020-10-04T19:59:02.000+0000,List(p_device_id),5,57108,Map(),1,2


In [20]:
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

In [21]:
dbutils.fs.rm(health_tracker + "gold/health_tracker_user_analytics", recurse=True)

In [22]:
from pyspark.sql.functions import col,avg,max,stddev

health_tracker_gold_user_analytics = (
health_tracker_processed
  .groupby("p_device_id")
  .agg(avg(col("heartrate")).alias("avg_heartrate"),
       max(col("heartrate")).alias("max_heartrate"),
       stddev(col("heartrate")).alias("stddev_heartrate"))
       
)

In [23]:
(health_tracker_gold_user_analytics.write
.format("delta")
 .mode("overwrite")
 .save(health_tracker + "gold/health_tracker_user_analytics"))


In [24]:
%sql
DROP TABLE  IF EXISTS health_tracker_gold_user_analytics;
CREATE TABLE  health_tracker_gold_user_analytics
USING DELTA
LOCATION "dbacademy/csjusr1/DLRS/healthtracker/gold/health_tracker_user_analytics"


In [25]:
display(spark.read.table("health_tracker_gold_user_analytics"))

p_device_id,avg_heartrate,max_heartrate,stddev_heartrate
1,78.5776567337699,168.114687819,31.61967903784856
3,82.65419819635204,171.8435388833,30.92932874000444
4,83.08377376550952,173.5770785921,34.16032267669617
2,79.99574196662837,184.7433209566,31.408007741222
0,81.21484441523789,186.4790827731,31.343789198032887


In [26]:
file_path = health_tracker + "raw/health_tracker_data_2020_2.json"
 
health_tracker_data_2020_2_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [27]:
processedDF = process_health_tracker_data(health_tracker_data_2020_2_df)


In [28]:
(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))

In [29]:
(spark.read
 .option("versionAsOf", 0)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [30]:
health_tracker_processed.count()


In [31]:
from pyspark.sql.functions import count

display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .groupby("p_device_id")
  .agg(count("*"))
)

p_device_id,count(1)
1,1440
3,1440
2,1440
4,1368
0,1440


In [32]:
display(
  spark.read
  .format("delta")
  .load(health_tracker + "processed")
  .where(col("p_device_id").isin([3,4]))
)

dte,time,heartrate,name,p_device_id
2020-01-01,2020-01-01T00:00:00.000+0000,55.2272036665,Minh Nguyen,3
2020-01-01,2020-01-01T01:00:00.000+0000,56.035689123,Minh Nguyen,3
2020-01-01,2020-01-01T02:00:00.000+0000,55.6403282219,Minh Nguyen,3
2020-01-01,2020-01-01T03:00:00.000+0000,56.3692513843,Minh Nguyen,3
2020-01-01,2020-01-01T04:00:00.000+0000,56.5412281859,Minh Nguyen,3
2020-01-01,2020-01-01T05:00:00.000+0000,55.8311481148,Minh Nguyen,3
2020-01-01,2020-01-01T06:00:00.000+0000,54.9402513831,Minh Nguyen,3
2020-01-01,2020-01-01T07:00:00.000+0000,92.2205431894,Minh Nguyen,3
2020-01-01,2020-01-01T08:00:00.000+0000,93.8159033652,Minh Nguyen,3
2020-01-01,2020-01-01T09:00:00.000+0000,92.0210547557,Minh Nguyen,3


In [33]:
broken_readings = (
  health_tracker_processed
  .select(col("heartrate"), col("dte"))
  .where(col("heartrate") < 0)
  .groupby("dte")
  .agg(count("heartrate"))
  .orderBy("dte")
)
 
broken_readings.createOrReplaceTempView("broken_readings")

In [34]:
%sql
select * from broken_readings

dte,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


In [35]:
%sql
SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
34


In [36]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead
 
dteWindow = Window.partitionBy("p_device_id").orderBy("dte")
 
interpolatedDF = (
  spark.read
  .table("health_tracker_processed")
  .select(col("dte"),
          col("time"),
          col("heartrate"),
          lag(col("heartrate")).over(dteWindow).alias("prev_amt"),
          lead(col("heartrate")).over(dteWindow).alias("next_amt"),
          col("name"),
          col("p_device_id"))
)

In [37]:
updatesDF = (
  interpolatedDF
  .where(col("heartrate") < 0)
  .select(col("dte"),
          col("time"),
          ((col("prev_amt") + col("next_amt"))/2).alias("heartrate"),
          col("name"),
          col("p_device_id"))
)

In [38]:
health_tracker_processed.printSchema()
updatesDF.printSchema()

In [39]:
updatesDF.count()


In [40]:
file_path = health_tracker + "raw/health_tracker_data_2020_2_late.json"
 
health_tracker_data_2020_2_late_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [41]:
insertsDF = process_health_tracker_data(health_tracker_data_2020_2_late_df)


In [42]:
insertsDF.printSchema()

In [43]:
upsertsDF = updatesDF.union(insertsDF)


In [44]:
upsertsDF.printSchema()


In [45]:
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

update_match = """
  health_tracker.time = upserts.time 
  AND 
  health_tracker.p_device_id = upserts.p_device_id
"""

update = { "heartrate" : "upserts.heartrate" }

insert = {
  "p_device_id" : "upserts.p_device_id",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "time" : "upserts.time",
  "dte" : "upserts.dte"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [46]:
(spark.read
 .option("versionAsOf", 1)
 .format("delta")
 .load(health_tracker + "processed")
 .count())

In [47]:
health_tracker_processed.count()


In [48]:
display(processedDeltaTable.history())


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
2,2020-10-04T20:50:36.000+0000,5423717937249933,chahuljain@microsoft.com,MERGE,Map(predicate -> ((health_tracker.`time` = upserts.`time`) AND (health_tracker.`p_device_id` = upserts.`p_device_id`))),,List(4180960103771869),1004-115430-ladle449,1,WriteSerializable,False,"Map(numTargetRowsCopied -> 7068, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 40, numTargetRowsInserted -> 72, numTargetRowsUpdated -> 60, numOutputRows -> 7200, numSourceRows -> 132, numTargetFilesRemoved -> 10)",
1,2020-10-04T20:35:32.000+0000,5423717937249933,chahuljain@microsoft.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4180960103771869),1004-115430-ladle449,0,WriteSerializable,True,"Map(numFiles -> 5, numOutputBytes -> 52887, numOutputRows -> 3408)",
0,2020-10-04T19:59:02.000+0000,5423717937249933,chahuljain@microsoft.com,CONVERT,"Map(numFiles -> 5, partitionedBy -> [""p_device_id""], collectStats -> true)",,List(4180960103771869),1004-115430-ladle449,-1,Serializable,False,Map(numConvertedFiles -> 5),


In [49]:
%sql 

SELECT SUM(`count(heartrate)`) FROM broken_readings

sum(count(heartrate))
34


In [50]:
{"device_id":0,"heartrate":57.6447293596,"name":"Deborah Powell","time":1.5830208E9,"device_type":"version 2"}
{"device_id":0,"heartrate":57.6175546013,"name":"Deborah Powell","time":1.5830244E9,"device_type":"version 2"}
{"device_id":0,"heartrate":57.8486376876,"name":"Deborah Powell","time":1.583028E9,"device_type":"version 2"}
{"device_id":0,"heartrate":57.8821378637,"name":"Deborah Powell","time":1.5830316E9,"device_type":"version 2"}
{"device_id":0,"heartrate":59.0531490807,"name":"Deborah Powell","time":1.5830352E9,"device_type":"version 2"}

In [51]:
file_path = health_tracker + "raw/health_tracker_data_2020_3.json"

health_tracker_data_2020_3_df = (
  spark.read
  .format("json")
  .load(file_path)
)

In [52]:
def process_health_tracker_data(dataframe):
  return (
    dataframe
    .withColumn("time", from_unixtime("time"))
    .withColumnRenamed("device_id", "p_device_id")
    .withColumn("time", col("time").cast("timestamp"))
    .withColumn("dte", col("time").cast("date"))
    .withColumn("p_device_id", col("p_device_id").cast("integer"))
    .select("dte", "time", "device_type", "heartrate", "name", "p_device_id")
    )
  
processedDF = process_health_tracker_data(health_tracker_data_2020_3_df)

In [53]:
(processedDF.write
 .mode("append")
 .format("delta")
 .save(health_tracker + "processed"))


In [54]:
(processedDF.write
 .mode("append")
 .option("mergeSchema", True)
 .format("delta")
 .save(health_tracker + "processed"))

In [55]:
health_tracker_processed.count()


In [56]:
processedDeltaTable.delete("p_device_id = 4")


In [57]:
from pyspark.sql.functions import lit

upsertsDF = (
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
  .select("dte", "time", "device_type", "heartrate", lit(None).alias("name"), "p_device_id")
)

In [58]:
processedDeltaTable = DeltaTable.forPath(spark, health_tracker + "processed")

insert = {
  "dte" : "upserts.dte",
  "time" : "upserts.time",
  "device_type" : "upserts.device_type",
  "heartrate" : "upserts.heartrate",
  "name" : "upserts.name",
  "p_device_id" : "upserts.p_device_id"
}

(processedDeltaTable.alias("health_tracker")
 .merge(upsertsDF.alias("upserts"), update_match)
 .whenMatchedUpdate(set=update)
 .whenNotMatchedInsert(values=insert)
 .execute())

In [59]:
health_tracker_processed.count()


In [60]:
display(health_tracker_processed.where("p_device_id = 4"))


dte,time,heartrate,name,p_device_id


In [61]:
display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id,device_type


In [62]:
processedDeltaTable.vacuum(0)

In [63]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)

In [64]:
processedDeltaTable.vacuum(0)


In [65]:
display(
  spark.read
  .option("versionAsOf", 4)
  .format("delta")
  .load(health_tracker + "processed")
  .where("p_device_id = 4")
)

dte,time,heartrate,name,p_device_id,device_type
