In [0]:
%fs ls /healthtracker/raw/

path,name,size
dbfs:/healthtracker/raw/health_tracker_data_2020_1.json,health_tracker_data_2020_1.json,310628
dbfs:/healthtracker/raw/health_tracker_data_2020_2.json,health_tracker_data_2020_2.json,284670
dbfs:/healthtracker/raw/late/,late/,0


In [0]:
dbutils.fs.rm(bronzePath, recurse=True)
dbutils.fs.rm(silverPath, recurse=True)
dbutils.fs.rm(goldPath, recurse=True)
dbutils.fs.rm(checkpointPath, recurse=True)

Out[97]: True

In [0]:
basePath = "/healthtracker/"

rawPath = basePath + "raw/"
bronzePath = basePath + "bronze/"
silverPath = basePath + "silver/"
goldPath = basePath + "gold/"

checkpointPath = basePath + "checkpoints/"
bronzeCheckpoint = checkpointPath + "bronze/"
silverCheckpoint = checkpointPath + "silver/"
goldCheckpoint = checkpointPath + "gold/"

In [0]:
raw_schema = "value STRING"  #raw string in JSON format

raw_health_tracker_df = (
    spark.readStream.format("text").schema(raw_schema).load(rawPath)
)

In [0]:
display(raw_health_tracker_df, streamName="raw_df")

value
"{""time"":1577836800.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.8139067501}"
"{""time"":1577840400.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":53.9078900098}"
"{""time"":1577844000.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.7129593616}"
"{""time"":1577847600.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.2880422685}"
"{""time"":1577851200.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.5156095386}"
"{""time"":1577854800.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":53.6280743846}"
"{""time"":1577858400.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.1760037066}"
"{""time"":1577862000.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":90.0456721836}"
"{""time"":1577865600.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":89.4695644522}"
"{""time"":1577869200.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":88.1490304138}"


In [0]:
from pyspark.sql.functions import *

In [0]:
# recording ingestion metadata

raw_health_tracker_df = raw_health_tracker_df.select(
  lit("local source").alias("datasource"),
  current_timestamp().alias("ingesttime"),
  "value",
  current_timestamp().cast("date").alias("ingestdate")
)

In [0]:
(raw_health_tracker_df.select(
  "datasource", "ingesttime", "value", col("ingestdate").alias("p_ingestdate")
)
.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", bronzeCheckpoint)
.partitionBy("p_ingestdate")
.queryName("writing_raw_to_bronze")
.start(bronzePath))

Out[103]: <pyspark.sql.streaming.StreamingQuery at 0x7f1bb51041c0>

In [0]:
bronze_health_tracker_df = spark.readStream.format("delta").load(bronzePath)
display(bronze_health_tracker_df, streamName="display_bronze")

datasource,ingesttime,value,p_ingestdate
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577836800.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.8139067501}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577840400.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":53.9078900098}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577844000.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.7129593616}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577847600.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.2880422685}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577851200.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.5156095386}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577854800.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":53.6280743846}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577858400.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.1760037066}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577862000.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":90.0456721836}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577865600.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":89.4695644522}",2021-12-21
local source,2021-12-21T10:17:34.241+0000,"{""time"":1577869200.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":88.1490304138}",2021-12-21


In [0]:
display(dbutils.fs.ls(bronzePath))

path,name,size
dbfs:/healthtracker/bronze/_delta_log/,_delta_log/,0
dbfs:/healthtracker/bronze/p_ingestdate=2021-12-21/,p_ingestdate=2021-12-21/,0


In [0]:
for s in spark.streams.active:
    print(s.name)

display_bronze
raw_df
writing_raw_to_bronze


In [0]:
spark.sql("""
CREATE DATABASE IF NOT EXISTS healthtracker
""")

spark.sql("USE healthtracker")

Out[107]: DataFrame[]

In [0]:
spark.sql("DROP TABLE IF EXISTS healthtracker_bronze")

spark.sql(f"""
CREATE TABLE healthtracker_bronze
USING DELTA
LOCATION "{bronzePath}"
""")

Out[108]: DataFrame[]

In [0]:
%sql
describe detail healthtracker_bronze

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,5defff30-d9a4-49af-b12a-e0c60148731a,healthtracker.healthtracker_bronze,,dbfs:/healthtracker/bronze,2021-12-21T10:17:34.836+0000,2021-12-21T10:17:37.000+0000,List(p_ingestdate),2,148800,Map(),1,2


In [0]:
%sql
describe extended healthtracker_bronze

col_name,data_type,comment
datasource,string,
ingesttime,timestamp,
value,string,
p_ingestdate,date,
,,
# Partitioning,,
Part 0,p_ingestdate,
,,
# Detailed Table Information,,
Name,healthtracker.healthtracker_bronze,


In [0]:
log_df = spark.read.json(bronzePath + "/_delta_log/00000000000000000000.json")
display(log_df)

add,commitInfo,metaData,protocol,txn
,,,"List(1, 2)",
,,"List(1640081854836, List(parquet), 5defff30-d9a4-49af-b12a-e0c60148731a, List(p_ingestdate), {""type"":""struct"",""fields"":[{""name"":""datasource"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""ingesttime"",""type"":""timestamp"",""nullable"":true,""metadata"":{}},{""name"":""value"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""p_ingestdate"",""type"":""date"",""nullable"":true,""metadata"":{}}]})",,
,,,,"List(134de757-1bb1-4463-b992-ad28de5b0022, 1640081855922, 0)"
"List(true, 1640081856000, List(2021-12-21), p_ingestdate=2021-12-21/part-00000-9243f2a3-7474-4cc1-9f0e-fadaa37886c0.c000.snappy.parquet, 77448, {""numRecords"":3720,""minValues"":{""datasource"":""local source"",""ingesttime"":""2021-12-21T10:17:34.241Z"",""value"":""{\""time\"":1577836800.0,\""name\"":\""Deb""},""maxValues"":{""datasource"":""local source"",""ingesttime"":""2021-12-21T10:17:34.241Z"",""value"":""{\""time\"":1580511600.0,\""name\"":\""Sam�""},""nullCount"":{""datasource"":0,""ingesttime"":0,""value"":0}}, List(1640081856000000, 268435456))",,,,
"List(true, 1640081856000, List(2021-12-21), p_ingestdate=2021-12-21/part-00001-a28fc2dc-d90c-4131-b7c3-d4fd62a1aeac.c000.snappy.parquet, 71352, {""numRecords"":3408,""minValues"":{""datasource"":""local source"",""ingesttime"":""2021-12-21T10:17:34.241Z"",""value"":""{\""time\"":1580515200.0,\""name\"":\""Deb""},""maxValues"":{""datasource"":""local source"",""ingesttime"":""2021-12-21T10:17:34.241Z"",""value"":""{\""time\"":1583017200.0,\""name\"":\""Sam�""},""nullCount"":{""datasource"":0,""ingesttime"":0,""value"":0}}, List(1640081856000001, 268435456))",,,,
,"List(1221-082311-eeqqlyuw, true, WriteSerializable, List(3049203889599516), STREAMING UPDATE, List(2, 148800, 7128, 0), List(0, Append, 134de757-1bb1-4463-b992-ad28de5b0022), 1640081856095, 7708969006792739, aamirsohel96@gmail.com)",,,


In [0]:
display(bronze_health_tracker_df.groupby().count(), "bronze_count")

count
7200


In [0]:
# performing transformation
json_schema = "device_id INTEGER, heartrate DOUBLE, name STRING, time FLOAT"

silver_health_tracker_df = bronze_health_tracker_df.select(
  from_json(col("value"), json_schema).alias("nested_json")
).select("nested_json.*")

In [0]:
silver_health_tracker_df = silver_health_tracker_df.select(
  "device_id",
  "heartrate",
  "name",
  from_unixtime("time").cast("timestamp").alias("eventtime"),
  from_unixtime("time").cast("date").alias("p_eventdate")
)

In [0]:
#writing stream to silver table

(
  silver_health_tracker_df.writeStream
  .format("delta")
  .option("checkpointLocation", silverCheckpoint)
  .outputMode("append")
  .partitionBy("p_eventdate")
  .queryName("writing_to_silver")
  .start(silverPath)
)

Out[113]: <pyspark.sql.streaming.StreamingQuery at 0x7f1bb5104640>

In [0]:
spark.sql("DROP TABLE IF EXISTS healthtracker_silver")

spark.sql(
  f"""
  CREATE TABLE healthtracker_silver
  USING DELTA
  LOCATION "{silverPath}"
  """
)

Out[114]: DataFrame[]

In [0]:
display(spark.readStream.table("healthtracker_silver") , streamName="display_silver")

device_id,heartrate,name,eventtime,p_eventdate
0,56.8004920211,Deborah Powell,2020-01-13T00:00:00.000+0000,2020-01-13
0,57.3427304091,Deborah Powell,2020-01-13T00:59:44.000+0000,2020-01-13
0,57.3753078662,Deborah Powell,2020-01-13T01:59:28.000+0000,2020-01-13
0,57.5585488599,Deborah Powell,2020-01-13T02:59:12.000+0000,2020-01-13
0,56.0829660354,Deborah Powell,2020-01-13T03:58:56.000+0000,2020-01-13
0,56.5231267033,Deborah Powell,2020-01-13T05:00:48.000+0000,2020-01-13
0,94.2689405255,Deborah Powell,2020-01-13T06:00:32.000+0000,2020-01-13
0,94.9195777222,Deborah Powell,2020-01-13T07:00:16.000+0000,2020-01-13
0,95.2584820986,Deborah Powell,2020-01-13T08:00:00.000+0000,2020-01-13
0,94.1239514075,Deborah Powell,2020-01-13T08:59:44.000+0000,2020-01-13


In [0]:
# expected total data for month Jan and Feb for 5 devices
(31+29)*24*5

Out[116]: 7200

In [0]:
display(spark.read.table("healthtracker_silver").groupby("device_id").agg(count("*")))

device_id,count(1)
1,1440
3,1440
4,1368
2,1440
0,1440


In [0]:
# as we can see from above viz some heartrates are having negative value
broken_readings = (
  spark.read.format("delta")
  .load(silverPath)
  .select(col("heartrate"), col("p_eventdate"))
  .where(col("heartrate") < 0)
  .groupby("p_eventdate")
  .agg(count("heartrate"))
  .orderBy("p_eventdate")
)
broken_readings.createOrReplaceTempView("broken_readings")

In [0]:
%sql
select * from broken_readings

p_eventdate,count(heartrate)
2020-01-01,1
2020-01-02,1
2020-01-04,1
2020-01-06,1
2020-01-07,1
2020-01-09,2
2020-01-12,3
2020-01-13,2
2020-01-14,1
2020-01-16,2


In [0]:
%sql
select sum(`count(heartrate)`) from broken_readings

sum(count(heartrate))
60


In [0]:
# interpolating broken values
from pyspark.sql.window import Window

dateWindow = Window.orderBy("p_eventdate")
interpolated_df =  spark.read.table("healthtracker_silver").select(
  "*",
  lag(col("heartrate")).over(dateWindow).alias("prev_value"),
  lead(col("heartrate")).over(dateWindow).alias("next_value"),
)

In [0]:
updates_df = interpolated_df.where(col("heartrate") < 0).select(
  "device_id",
  ((col("prev_value") + col("next_value")) / 2).alias("heartrate"),
    "eventtime",
    "name",
    "p_eventdate"
)

display(updates_df)

device_id,heartrate,eventtime,name,p_eventdate
4,101.2125469924,2020-01-01T18:59:12.000+0000,James Hou,2020-01-01
1,79.4468974614,2020-01-02T10:59:12.000+0000,Kristin Vasser,2020-01-02
1,92.10732860255,2020-01-04T20:01:04.000+0000,Kristin Vasser,2020-01-04
1,54.18606982715,2020-01-06T23:00:16.000+0000,Kristin Vasser,2020-01-06
1,78.55314577125,2020-01-07T22:00:32.000+0000,Kristin Vasser,2020-01-07
2,86.8898895492,2020-01-09T15:00:16.000+0000,Sam Knopp,2020-01-09
4,104.7714541545,2020-01-09T03:58:56.000+0000,James Hou,2020-01-09
2,97.2776487597,2020-01-12T18:59:12.000+0000,Sam Knopp,2020-01-12
4,63.68804357805,2020-01-12T01:59:28.000+0000,James Hou,2020-01-12
4,105.83667578195,2020-01-12T08:00:00.000+0000,James Hou,2020-01-12


In [0]:
from delta.tables import DeltaTable

silverTable = DeltaTable.forPath(spark, silverPath)

update_match = """
  health_tracker.eventtime = updates.eventtime
  and
  health_tracker.device_id = updates.device_id
"""

update = {"heartrate":"updates.heartrate"}

(
 silverTable.alias("health_tracker")
.merge(updates_df.alias("updates"), update_match)
.whenMatchedUpdate(set=update)
.execute()
)

In [0]:
#handle late arriving data
spark.read.json(rawPath + '/late').count()

Out[126]: 3480

In [0]:
def transform_raw(df: DataFrame) -> DataFrame:
    return df.select(
        lit("local source").alias("datasource"),
        current_timestamp().alias("ingesttime"),
        "value",
        current_timestamp().cast("date").alias("p_ingestdate"),
    )
raw_schema = "value STRING"

lateRawDF = spark.read.format("text").schema(raw_schema).load(rawPath + "/late")

transformedLateRawDF = transform_raw(lateRawDF)

In [0]:
# merge late-arriving data with bronze table
bronzeTable = DeltaTable.forPath(spark, bronzePath)

existing_record_match = "bronze.value = latearrivals.value"

(
  bronzeTable.alias("bronze")
  .merge(transformedLateRawDF.alias("latearrivals"), existing_record_match)
  .whenNotMatchedInsertAll()
  .execute()
)

In [0]:
display(
  spark.read.table("healthtracker_silver").groupby("device_id").agg(count("*"))
)

device_id,count(1)
1,1440
3,1440
4,1440
2,1440
0,1440


In [0]:
%sql
SELECT COUNT(*) FROM healthtracker_silver VERSION AS OF 0

count(1)
7128


In [0]:
%sql
SELECT COUNT(*) FROM healthtracker_silver VERSION AS OF 1

count(1)
7128


In [0]:
%sql
SELECT COUNT(*) FROM healthtracker_silver VERSION AS OF 2

count(1)
7200


In [0]:
#building aggregate data marts

gold_health_tracker_df = silver_health_tracker_df.groupBy("device_id").agg(
  mean(col("heartrate")).alias("mean_heartrate"),
  stddev(col("heartrate")).alias("stddev_heartrate"),
  max(col("heartrate")).alias("max_heartrate")
)

In [0]:
tableName = "/aggregate_heartrate"
tableCheckpoint = goldCheckpoint + tableName
tablePath = goldPath + tableName

(
    gold_health_tracker_df.writeStream.format("delta")
    .outputMode("complete")
    .option("checkpointLocation", tableCheckpoint)
    .queryName("write_silver_to_gold")
    .start(tablePath)
)

Out[141]: <pyspark.sql.streaming.StreamingQuery at 0x7f1bb54d8640>

In [0]:
spark.sql("DROP TABLE IF EXISTS health_tracker_gold_aggregate_heartrate")

spark.sql(
f"""
CREATE TABLE health_tracker_gold_aggregate_heartrate
USING DELTA
LOCATION "{tablePath}"
"""
)

Out[142]: DataFrame[]

In [0]:
%sql
select * from health_tracker_gold_aggregate_heartrate

device_id,mean_heartrate,stddev_heartrate,max_heartrate
3,81.98698705713325,30.11076269005531,175.0032148522
4,80.24473380424911,34.22455418767537,199.092971234
2,80.00336378220479,31.17861138868274,189.2113455089
1,78.44184525561052,32.827357416972134,168.114687819
0,81.72446103322305,31.368367448699964,186.4790827731


In [0]:
for stream in spark.streams.active:
  stream.stop()