# DLT pipeline

This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/aht_fitbit_pipeline.yml.

In [0]:
# Import DLT and src/aht_fitbit
import dlt
import sys
sys.path.append(spark.conf.get("bundle.sourcePath", "."))
from aht_fitbit import main

In [0]:

from pyspark.sql.functions import explode, struct, from_json,col,first,arrays_zip,col,map_from_entries,expr,to_timestamp,to_date,array

In [None]:
vars = main.get_vars()
vol = vars['vol']+'/raw_fitbitapi'

In [None]:
@dlt.table(
  comment="Parsing fitbit sleep data from JSON delivery - using volumes"
)
def bronze_sleep():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    .load(f"{vol}/sleep")
    .filter(col("sleep") !=array())
    .select('summary',explode('sleep').alias('sleep'))
    )

In [None]:
@dlt.table(
  comment="Parsing bronze to silver, exploding structs "
)
def silver_sleep():
  return (
    dlt.read('bronze_sleep')
    .select(to_date(col('sleep.dateOfSleep'), 'yyyy-MM-dd').alias('dateOfSleep')
            ,to_timestamp(col('sleep.startTime')).alias('startTime')
            ,to_timestamp(col('sleep.endTime')).alias('endTime')
            ,col('summary.totalMinutesAsleep').cast('int').alias('totalMinutesAsleep')
            ,col('summary.totalSleepRecords').cast('int').alias('totalSleepRecords')
            ,col('summary.totalTimeInBed').cast('int').alias('totalTimeInBed')
            ,col('sleep.efficiency').cast('int').alias('efficiency')
            ,col('sleep.isMainSleep').alias('isMainSleep')
            ,'summary.stages'
            )
    .sort("dateOfSleep", ascending=False)
    )

### Fitbit Actitivy Parsing

In [None]:
@dlt.table(
  comment="Raw Delivery of json data to Delta Lake"
)
def raw_activities():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    .load(f"{vol}/activitylog")
    )


@dlt.table(
  comment="Load and flatten activities into Bronze" 
)
def bronze_activities():
  return (
    dlt.read_stream("raw_activities")
        .select(explode("activities").alias("activities_exploded"))
        .select('activities_exploded.*')
        .drop('source')
    )

@dlt.table(
  comment="Parse Data - load distincts and convert maps to structs in silver" 
)
def silver_activities():
  return (
    dlt.read("bronze_activities")
            .distinct()
            .withColumn(
                "activityLevelMap",
                map_from_entries(
                    expr("transform(activityLevel, x -> struct(x.name, x.minutes))")
                )
            )
            .withColumn(
                "heartRateZonesMap",
                map_from_entries(
                    expr("transform(heartRateZones, x -> struct(x.name, struct(x.caloriesOut, x.max, x.min, x.minutes)))")
                )
            )
            .selectExpr(
                    "logId", 
                    "startTime",
                    "activityName",
                    "steps",
                    "speed",
                    "pace",
                    "duration",
                    "logType",
                    "averageHeartRate",
                    "calories",
                    "distance",
                    "distanceUnit",
                    "hasActiveZoneMinutes",
                    "hasGps",
                    "activityLevelMap as activityLevels", 
                    "heartRateZonesMap as heartRateZones")
    )

### Time to Parse XML Data from Fitbit

In [None]:
@dlt.table(
  comment="Parsing fitbit sleep data from XML - TCX Files"
)
@dlt.expect_or_drop("valid_xml", "Activity.Lap IS NOT NULL")
def raw_tcx():
  return (
    spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "xml")
          .option("cloudFiles.inferColumnTypes", "true")
          .option("mergeSchema", "true") 
          .option("rowTag", "Activities")  # Replace with the appropriate row tag
          .load(f'{vol}/TCX')
          .selectExpr("*", "_metadata as source_metadata")
          )