## 02_Transforming

The objective of this notebook is to apply business rules to extract meaningful information from the multiplex `Bronze` table and auxiliary tables (e.g., date_lookup from 01_Ingestion).<br>
By the end of this process, the environment will consist of nine new tables: seven on the Silver layer and two views on the Gold layer.<br>
Similar to previous steps, we will begin by setting up the environment for this notebook.

## Environment Settings

The objective here is to prepare the notebook by following these steps::<br>
<br>
- Process a new Bronze table with ten billion rows instead of eight hundred thousand rows
- Import necessary libraries
- Create base objects to use throughout the pipeline

In [0]:
# Import libraries
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql.window import Window

# Setting path objects
main_path = "dbfs:/mnt/borges_portifolio"
files_path = f"{main_path}/auto_loader_files/full" #Path with all the samples

# Setting parallelism
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

# Setting the bronze schema as default for this notebook
spark.catalog.setCurrentDatabase('BRONZE')

# Estabilishing the folder where Auto Loader and Structured Streaming will maintain identifying information of the stream
checkpoint_path = f'{main_path}/_checkpoints'

# Defining data type of each column in the JSON files for the Bronze table
bronze_schema = "key BINARY, value BINARY, topic STRING, partition LONG, offset LONG, timestamp LONG"

# Time traveling to obtain the date_lookup in a DataFrame
date_lookup_df = spark.read.option("versionAsOf", 0).load(f"{main_path}/date_lookup").select("date", "week_part")

# Creating the function that will ingest the full load of samples into the Bronze table
def process_full_bronze():
    (spark.readStream
                  .format("cloudFiles")
                  .schema(bronze_schema)
                  .option("cloudFiles.format", "json")
                  .option("cloudFiles.schemaLocation", f"{checkpoint_path}/bronze_schema") # Location to store schema information
                  .load(files_path) # Source Folder
                  .join(f.broadcast(date_lookup_df), f.to_date((f.col("timestamp")/1000).cast("timestamp")) == f.col("date"), "left")
                  .writeStream
                        .option("checkpointLocation", f"{checkpoint_path}/bronze") # Location to store checkpoint information
                        .partitionBy("topic", "week_part") # Partitioning by topic and week_part to increase performance
                        .trigger(availableNow=True)
                        .table("bronze.bronze")
                        .awaitTermination()
    )

# Creating a function for reprocessing the Bronze table
def reprocess_bronze_full():

    spark.sql("drop table if exists bronze.bronze")

    dbutils.fs.rm(f"{checkpoint_path}/bronze", True)
    
    process_full_bronze()

In [0]:
# Since a Bronze table already exists, it's necessary to reprocess it
reprocess_bronze_full()

## Initial tests
<br>
These initial tests are conducted to determine how to clean the data. <br>
Given that the raw data contains two binary columns, decoding them is the first step before proceeding to subsequent tasks.

In [0]:
# Acquire data from the Bronze table to a DataFrame
bronze_df = spark.table("bronze")

# Check a sample of rows
display(bronze_df.limit(10))

# Verify the partitioning, location, and other details of the Bronze table
display(spark.sql("DESCRIBE bronze.bronze"))

# Obtain different values from the topic column
display(bronze_df.select(f.col("topic")).distinct())

# Count of ingested rows, should have more then 10 millions
print("")
print(f"The Bronze table has {bronze_df.count()} rows")
print("")

key,value,topic,partition,offset,timestamp,date,week_part
MTI0ODgw,eyJkZXZpY2VfaWQiOiAxMjQ4ODAsICJ0aW1lIjogMTU3NTI0NDc5MCwgImhlYXJ0cmF0ZSI6IDQ0LjAwOTMzMjQzNTA4MzczfQ==,bpm,0,65197029,1575244801224,2019-12-02,2019-49
MTMzMDIy,eyJkZXZpY2VfaWQiOiAxMzMwMjIsICJ0aW1lIjogMTU3NTI0NDgwOCwgImhlYXJ0cmF0ZSI6IDQ3LjQ1MTAxMjM0OTU5NjA1Nn0=,bpm,2,65215178,1575244817439,2019-12-02,2019-49
MTgwMDI3,eyJkZXZpY2VfaWQiOiAxODAwMjcsICJ0aW1lIjogMTU3NTI0NDgwNCwgImhlYXJ0cmF0ZSI6IDQ2LjMwMDgxMzE3Njk5MDQ3fQ==,bpm,2,65215177,1575244817093,2019-12-02,2019-49
MTE1NjQz,eyJkZXZpY2VfaWQiOiAxMTU2NDMsICJ0aW1lIjogMTU3NTI0NDgxMCwgImhlYXJ0cmF0ZSI6IDQyLjE5NDYwMTIzMjk3NTkzNH0=,bpm,3,64982092,1575244817280,2019-12-02,2019-49
MTAyNTU4,eyJkZXZpY2VfaWQiOiAxMDI1NTgsICJ0aW1lIjogMTU3NTI0NDgxMCwgImhlYXJ0cmF0ZSI6IDUxLjY2ODMxMTIwMzY1Mzg0fQ==,bpm,3,64982107,1575244823081,2019-12-02,2019-49
MTAyNTU4,eyJkZXZpY2VfaWQiOiAxMDI1NTgsICJ0aW1lIjogMTU3NTI0NDgxMCwgImhlYXJ0cmF0ZSI6IDUxLjY2ODMxMTIwMzY1Mzg0fQ==,bpm,3,64982110,1575244823081,2019-12-02,2019-49
MTQ2NjQ1,eyJkZXZpY2VfaWQiOiAxNDY2NDUsICJ0aW1lIjogMTU3NTI0NDgyNiwgImhlYXJ0cmF0ZSI6IDM1LjUxMzM0NDc1Nzg2MjkyNH0=,bpm,0,65197068,1575244833149,2019-12-02,2019-49
MTA3OTUy,eyJkZXZpY2VfaWQiOiAxMDc5NTIsICJ0aW1lIjogMTU3NTI0NDg0MCwgImhlYXJ0cmF0ZSI6IDYxLjQwMzM2NTExMjUxOTE3fQ==,bpm,2,65215240,1575244850048,2019-12-02,2019-49
MTMzMDIy,eyJkZXZpY2VfaWQiOiAxMzMwMjIsICJ0aW1lIjogMTU3NTI0NDgzNywgImhlYXJ0cmF0ZSI6IDQyLjA4MDE2MTM5MjA3MDExfQ==,bpm,2,65215226,1575244849427,2019-12-02,2019-49
MTYxMzQz,eyJkZXZpY2VfaWQiOiAxNjEzNDMsICJ0aW1lIjogMTU3NTI0NDg1MCwgImhlYXJ0cmF0ZSI6IDMyLjgxMzIyNjYwNjIyNDI1NX0=,bpm,3,64982140,1575244856309,2019-12-02,2019-49


col_name,data_type,comment
key,binary,
value,binary,
topic,string,
partition,bigint,
offset,bigint,
timestamp,bigint,
date,date,
week_part,string,
# Partition Information,,
# col_name,data_type,comment


topic
workout
user_info
bpm



The Bronze table has 10841978 rows



### Binary decoding

As shown previously, the columns `key` and `value` are encoded.  The method I use for decoding involves transforming them from binary back to string, though other methods may exist. (*Specific decoding methods or decryption should be provided by the customer*)<br>
Let's now take a look at what we'll be working with.

In [0]:
display(
    bronze_df.withColumn("key_decoded", bronze_df.key.cast("string"))
            .withColumn("value_decoded", bronze_df.value.cast("string"))
            .select("key_decoded", "value_decoded", "key", "value")
            .limit(10)
)

key_decoded,value_decoded,key,value
124880,"{""device_id"": 124880, ""time"": 1575244790, ""heartrate"": 44.00933243508373}",MTI0ODgw,eyJkZXZpY2VfaWQiOiAxMjQ4ODAsICJ0aW1lIjogMTU3NTI0NDc5MCwgImhlYXJ0cmF0ZSI6IDQ0LjAwOTMzMjQzNTA4MzczfQ==
133022,"{""device_id"": 133022, ""time"": 1575244808, ""heartrate"": 47.451012349596056}",MTMzMDIy,eyJkZXZpY2VfaWQiOiAxMzMwMjIsICJ0aW1lIjogMTU3NTI0NDgwOCwgImhlYXJ0cmF0ZSI6IDQ3LjQ1MTAxMjM0OTU5NjA1Nn0=
180027,"{""device_id"": 180027, ""time"": 1575244804, ""heartrate"": 46.30081317699047}",MTgwMDI3,eyJkZXZpY2VfaWQiOiAxODAwMjcsICJ0aW1lIjogMTU3NTI0NDgwNCwgImhlYXJ0cmF0ZSI6IDQ2LjMwMDgxMzE3Njk5MDQ3fQ==
115643,"{""device_id"": 115643, ""time"": 1575244810, ""heartrate"": 42.194601232975934}",MTE1NjQz,eyJkZXZpY2VfaWQiOiAxMTU2NDMsICJ0aW1lIjogMTU3NTI0NDgxMCwgImhlYXJ0cmF0ZSI6IDQyLjE5NDYwMTIzMjk3NTkzNH0=
102558,"{""device_id"": 102558, ""time"": 1575244810, ""heartrate"": 51.66831120365384}",MTAyNTU4,eyJkZXZpY2VfaWQiOiAxMDI1NTgsICJ0aW1lIjogMTU3NTI0NDgxMCwgImhlYXJ0cmF0ZSI6IDUxLjY2ODMxMTIwMzY1Mzg0fQ==
102558,"{""device_id"": 102558, ""time"": 1575244810, ""heartrate"": 51.66831120365384}",MTAyNTU4,eyJkZXZpY2VfaWQiOiAxMDI1NTgsICJ0aW1lIjogMTU3NTI0NDgxMCwgImhlYXJ0cmF0ZSI6IDUxLjY2ODMxMTIwMzY1Mzg0fQ==
146645,"{""device_id"": 146645, ""time"": 1575244826, ""heartrate"": 35.513344757862924}",MTQ2NjQ1,eyJkZXZpY2VfaWQiOiAxNDY2NDUsICJ0aW1lIjogMTU3NTI0NDgyNiwgImhlYXJ0cmF0ZSI6IDM1LjUxMzM0NDc1Nzg2MjkyNH0=
107952,"{""device_id"": 107952, ""time"": 1575244840, ""heartrate"": 61.40336511251917}",MTA3OTUy,eyJkZXZpY2VfaWQiOiAxMDc5NTIsICJ0aW1lIjogMTU3NTI0NDg0MCwgImhlYXJ0cmF0ZSI6IDYxLjQwMzM2NTExMjUxOTE3fQ==
133022,"{""device_id"": 133022, ""time"": 1575244837, ""heartrate"": 42.08016139207011}",MTMzMDIy,eyJkZXZpY2VfaWQiOiAxMzMwMjIsICJ0aW1lIjogMTU3NTI0NDgzNywgImhlYXJ0cmF0ZSI6IDQyLjA4MDE2MTM5MjA3MDExfQ==
161343,"{""device_id"": 161343, ""time"": 1575244850, ""heartrate"": 32.813226606224255}",MTYxMzQz,eyJkZXZpY2VfaWQiOiAxNjEzNDMsICJ0aW1lIjogMTU3NTI0NDg1MCwgImhlYXJ0cmF0ZSI6IDMyLjgxMzIyNjYwNjIyNDI1NX0=


From the sample, it appears that the `key` column contains the same data as the first column nested within the `value` column.<br>
However, a ten-row sample is not sufficient for a dataset with ten million rows, especially when the Bronze table will be used to create three different tables.<br> 
Let's dig deeper.

### Reviewing decoded data

Now that we've decoded the binary columns, it's important to closely examine each `topic`. This will help understand the structure and content of the data, identify obvious issues, and ensure that the data is correctly decoded and ready for the next steps in our pipeline.

In [0]:
topics = ('workout','bpm','user_info')

for i in topics:
        display(bronze_df.filter(bronze_df.topic == i)
                        .withColumn("key_decoded",bronze_df.key.cast('string'))
                        .withColumn("value_decoded",bronze_df.value.cast('string'))
                        .select('topic','key_decoded','value_decoded')
                        .limit(5)
              )
        
        print("")
        print(f"number of {i} rows = {bronze_df.filter(bronze_df.topic == i).count()}") 
        print("")


topic,key_decoded,value_decoded
workout,40872,"{""user_id"": 40872, ""workout_id"": 8, ""timestamp"": 1575229675.0, ""action"": ""start"", ""session_id"": 76}"
workout,29213,"{""user_id"": 29213, ""workout_id"": 13, ""timestamp"": 1575196562.0, ""action"": ""start"", ""session_id"": 295}"
workout,29213,"{""user_id"": 29213, ""workout_id"": 13, ""timestamp"": 1575200016.0, ""action"": ""stop"", ""session_id"": 295}"
workout,27703,"{""user_id"": 27703, ""workout_id"": 47, ""timestamp"": 1575203433.0, ""action"": ""start"", ""session_id"": 456}"
workout,14508,"{""user_id"": 14508, ""workout_id"": 31, ""timestamp"": 1575224965.0, ""action"": ""start"", ""session_id"": 392}"



number of workout rows = 1514



topic,key_decoded,value_decoded
bpm,124880,"{""device_id"": 124880, ""time"": 1575244790, ""heartrate"": 44.00933243508373}"
bpm,133022,"{""device_id"": 133022, ""time"": 1575244808, ""heartrate"": 47.451012349596056}"
bpm,180027,"{""device_id"": 180027, ""time"": 1575244804, ""heartrate"": 46.30081317699047}"
bpm,115643,"{""device_id"": 115643, ""time"": 1575244810, ""heartrate"": 42.194601232975934}"
bpm,102558,"{""device_id"": 102558, ""time"": 1575244810, ""heartrate"": 51.66831120365384}"



number of bpm rows = 10840345



topic,key_decoded,value_decoded
user_info,27306,"{""user_id"": 27306, ""update_type"": ""new"", ""timestamp"": 1557184117, ""dob"": ""07/16/1958"", ""sex"": ""F"", ""gender"": ""F"", ""first_name"": ""Stephanie"", ""last_name"": ""Martinez"", ""address"": {""street_address"": ""1767 Katherine Expressway Suite 062"", ""city"": ""Edwards"", ""state"": ""CA"", ""zip"": 93523}}"
user_info,31362,"{""user_id"": 31362, ""update_type"": ""new"", ""timestamp"": 1555539598, ""dob"": ""02/01/2001"", ""sex"": ""F"", ""gender"": ""F"", ""first_name"": ""Shelley"", ""last_name"": ""Andrews"", ""address"": {""street_address"": ""4791 Nathan Turnpike Suite 540"", ""city"": ""Los Angeles"", ""state"": ""CA"", ""zip"": 90024}}"
user_info,27703,"{""user_id"": 27703, ""update_type"": ""new"", ""timestamp"": 1557989967, ""dob"": ""11/19/1944"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""Joshua"", ""last_name"": ""Mitchell"", ""address"": {""street_address"": ""843 Hannah Corners"", ""city"": ""Woodland Hills"", ""state"": ""CA"", ""zip"": 91367}}"
user_info,42794,"{""user_id"": 42794, ""update_type"": ""new"", ""timestamp"": 1558904984, ""dob"": ""05/11/1988"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""Brian"", ""last_name"": ""Williams"", ""address"": {""street_address"": ""27880 Dawn Lodge"", ""city"": ""Paramount"", ""state"": ""CA"", ""zip"": 90723}}"
user_info,12140,"{""user_id"": 12140, ""update_type"": ""new"", ""timestamp"": 1558694443, ""dob"": ""02/02/1999"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""Robert"", ""last_name"": ""Clark"", ""address"": {""street_address"": ""68994 Steven Vista"", ""city"": ""Pearblossom"", ""state"": ""CA"", ""zip"": 93553}}"



number of user_info rows = 119



After inspecting the sample, I can conclude that:
<br>
- The data don't need further cleaning at this stage
- The `key` column represents the topic's ID
- All key information is on the value column
<br>
<br>
With this understanding, I'm ready to write the decoded data into the main DataFrame and begin processing each topic.

In [0]:
bronze_df = bronze_df.withColumn("key",bronze_df.key.cast('string')) \
                     .withColumn("value",bronze_df.value.cast('string'))\
                     .select('key', 'value', 'topic', 'partition', 'offset', 'timestamp', 'date', 'week_part')

## Processing the Heart Rate table

The steps to achieve the main goal of this module are:
- Extract a DF containing only `bpm` data
- Unnest the `value` column
- Perform basic cleaning on the unnested data
- Implement measures to extend the pipeline's lifespan
- Create the `Silver.Heartrate` table

### Extract and Parse

In [0]:
# Filter DF to include only BPM data
bronze_df_bpm = bronze_df.where(bronze_df.topic == 'bpm')

#Display a sample of the filtered data to verify the data types of each column
display(bronze_df_bpm.limit(10))

key,value,topic,partition,offset,timestamp,date,week_part
124880,"{""device_id"": 124880, ""time"": 1575244790, ""heartrate"": 44.00933243508373}",bpm,0,65197029,1575244801224,2019-12-02,2019-49
133022,"{""device_id"": 133022, ""time"": 1575244808, ""heartrate"": 47.451012349596056}",bpm,2,65215178,1575244817439,2019-12-02,2019-49
180027,"{""device_id"": 180027, ""time"": 1575244804, ""heartrate"": 46.30081317699047}",bpm,2,65215177,1575244817093,2019-12-02,2019-49
115643,"{""device_id"": 115643, ""time"": 1575244810, ""heartrate"": 42.194601232975934}",bpm,3,64982092,1575244817280,2019-12-02,2019-49
102558,"{""device_id"": 102558, ""time"": 1575244810, ""heartrate"": 51.66831120365384}",bpm,3,64982107,1575244823081,2019-12-02,2019-49
102558,"{""device_id"": 102558, ""time"": 1575244810, ""heartrate"": 51.66831120365384}",bpm,3,64982110,1575244823081,2019-12-02,2019-49
146645,"{""device_id"": 146645, ""time"": 1575244826, ""heartrate"": 35.513344757862924}",bpm,0,65197068,1575244833149,2019-12-02,2019-49
107952,"{""device_id"": 107952, ""time"": 1575244840, ""heartrate"": 61.40336511251917}",bpm,2,65215240,1575244850048,2019-12-02,2019-49
133022,"{""device_id"": 133022, ""time"": 1575244837, ""heartrate"": 42.08016139207011}",bpm,2,65215226,1575244849427,2019-12-02,2019-49
161343,"{""device_id"": 161343, ""time"": 1575244850, ""heartrate"": 32.813226606224255}",bpm,3,64982140,1575244856309,2019-12-02,2019-49


In [0]:
# Define the schema for the JSON data in the value column
schema = t.StructType(
    [
        t.StructField('device_id', t.IntegerType(), True),
        t.StructField('time', t.TimestampType(), True),
        t.StructField('heartrate', t.DoubleType(), True)
    ]
)

# Parse the JSON data in the value column using the defined schema
bpm_df = bronze_df_bpm.withColumn('value', f.from_json('value', schema))\
                     .select(f.col('value.*')) # Select all columns from the parsed JSON data

# Display a sample of the parsed and selected data to verify the structure and contents
display(bpm_df.limit(10))

device_id,time,heartrate
150485,2019-12-15T23:59:49.000+0000,41.75157724210267
154423,2019-12-16T00:00:01.000+0000,41.32117600995303
172965,2019-12-16T00:00:04.000+0000,50.09502110360903
177966,2019-12-16T00:00:01.000+0000,40.58552319685103
146645,2019-12-16T00:00:14.000+0000,31.85149386683724
190340,2019-12-16T00:00:07.000+0000,33.92652658156432
122561,2019-12-16T00:00:12.000+0000,32.52707245486624
135279,2019-12-16T00:00:12.000+0000,48.5547842378845
150485,2019-12-16T00:00:17.000+0000,40.12255104063188
153087,2019-12-16T00:00:18.000+0000,51.71000903882607


### Basic Cleaning

#### Duplicate Rows

In [0]:
# Count the number of rows after removing fully duplicate records
bpm_df.dropDuplicates().count()

Out[9]: 9034893

In [0]:
# Count the number of rows after removing duplicates based on device_id and time columns
bpm_df.dropDuplicates(["device_id", "time"]).count()

Out[10]: 9034893

**Analysis Conclusion**

After this analysis, I can confirm that all duplicate rows are fully identical. Therefore, using `dropDuplicates()` without conditions will suffice. 

In [0]:
bpm_df = bpm_df.dropDuplicates()

#### Negative Values
<br>
Since negative heart rates are impossible, it's essential to check for them as part of our analysis.

In [0]:
# Display a sample of rows where the heart rate is negative
display(bpm_df.where(f.col('heartrate') < 0).limit(10))

# Count the total number of negative heart rate values in the DataFrame
print("")
print(f"The DataFrame has a total of {bpm_df.where(f.col('heartrate') < 0).count()} negative values")
print("")

device_id,time,heartrate
197930,2019-12-16T01:04:01.000+0000,-49.53762936995333
113029,2019-12-16T07:58:59.000+0000,-99.77584777390956
172633,2019-12-16T07:38:43.000+0000,-71.64322670099934
156698,2019-12-16T08:20:47.000+0000,-79.98018540957118
193806,2019-12-16T04:41:42.000+0000,-48.96808899634718
180111,2019-12-16T10:42:53.000+0000,-63.011166360590806
197930,2019-12-16T00:52:41.000+0000,-51.25140767750573
134009,2019-12-16T00:52:00.000+0000,-37.574396004523216
196248,2019-12-16T16:40:01.000+0000,-78.53167975800474
122235,2019-12-16T10:48:05.000+0000,-98.6965991917425



The DataFrame has a total of 2520 negative values



**Handling Negative Values**

While the data does contain some negative values, their quantity doesn't justify specific treatment. However, to ensure data integrity, I'll add a table constraint and a filter to prevent them from being included in the final table query.

#### Ensuring Pipeline Health

To ensure the integrity and reliability of the pipeline, I'll implement measures to verify the temporal consistency of the data. This includes:<br>

- Checking for a clear time span of the data.
- Adding a constraint to ensure that records cannot precede the start date of data collection.

In [0]:
display(bpm_df.withColumn('time', f.to_date("time", "yyyyMMdd")).select(f.min('time'), f.max('time')))

min(time),max(time)
2019-12-01,2019-12-16


Based on the analysis, it appears that data collection began on 2019-12-01. To ensure data integrity, I can implement a constraint related to the timestamp column in the table. In a development scenario, it's essential for developers to verify this information with the client before adding any constraints to a table.



### Create Heartrate table

Handling duplicates within a batch is straightforward, but it doesn't guarantee that identical records haven't been ingested previously.<br> To prevent this scenario, I'll implement a `merge` condition using the `foreachBatch()` function.

In [0]:
# Define merge and update logic for the stream
def heartrate_merge_data(df, batch_id): 
    df.createOrReplaceTempView("stream_updates")    
    sql_query = """
    MERGE INTO silver.heart_rate a
    USING stream_updates b
    ON a.device_id=b.device_id AND a.time=b.time
    WHEN MATCHED THEN
    UPDATE SET *
    WHEN NOT MATCHED THEN
    INSERT * 
    """    
    df.sparkSession.sql(sql_query)

heartrate_schema = "device_id INTEGER, time TIMESTAMP, heartrate DOUBLE"

def process_heart_rate():
    # Create heart_rate table
    spark.sql("CREATE TABLE IF NOT EXISTS silver.heart_rate (device_id INTEGER, time TIMESTAMP, heartrate DOUBLE) USING DELTA")

    # Define constraints based on previous analysis
    spark.sql("ALTER TABLE silver.heart_rate ADD CONSTRAINT validbpm CHECK (heartrate > 0)")
    spark.sql("ALTER TABLE silver.heart_rate ADD CONSTRAINT check_date CHECK (time > '2019-11-30')")

    # Structured Streaming query
    (spark.readStream
            .table("bronze")
            .filter("topic = 'bpm'")
            .select(f.from_json(f.col("value").cast("string"), heartrate_schema).alias("bpm"))
            .filter("bpm.heartrate > 0")
            .select("bpm.*")
            .withWatermark("time", "30 seconds")
            .dropDuplicates()
            .writeStream
                .foreachBatch(heartrate_merge_data) # Apply merge logic to each batch
                .option("checkpointLocation", f"{checkpoint_path}/heart_rate") # Set checkpoint location
                .outputMode("update") # Allow changing existing data in the table 
                .trigger(availableNow=True) # Process all available data, multiple batches if needed
                .queryName("silver_heart_rate") # Name the query for easy reference
                .start() # Start the query
                .awaitTermination()) # Wait for query to finish

def reprocess_heart_rate():

    spark.sql("DROP TABLE IF EXISTS silver.heart_rate")

    dbutils.fs.rm(f"{checkpoint_path}/heart_rate", True)
    
    process_heart_rate()

In [0]:
process_heart_rate()

#### Verifying Transformation

In [0]:
# Display extended description of the Silver Heart Rate table
display(spark.sql("DESCRIBE EXTENDED silver.heart_rate"))

# Display a sample of the data from the Silver Heart Rate table
display(spark.read.table("silver.heart_rate").limit(10))

# Print total count of rows in the Silver Heart Rate table
print("")
print(f"The table has a total of {spark.table('silver.heart_rate').count()} rows")
print("")

col_name,data_type,comment
device_id,int,
time,timestamp,
heartrate,double,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,silver,
Table,heart_rate,
Created Time,Wed May 29 11:13:09 UTC 2024,
Last Access,UNKNOWN,


device_id,time,heartrate
175427,2019-12-16T00:00:18.000+0000,57.6386450240972
133254,2019-12-16T00:00:25.000+0000,60.80141310198724
113570,2019-12-16T00:00:43.000+0000,51.033907602339305
112008,2019-12-16T00:01:02.000+0000,46.75455039600096
130133,2019-12-16T00:01:17.000+0000,45.70402122685689
154423,2019-12-16T00:02:09.000+0000,47.03057581407536
168702,2019-12-16T00:02:25.000+0000,55.376533761596754
147565,2019-12-16T00:02:39.000+0000,46.14926529134262
194717,2019-12-16T00:03:01.000+0000,60.711029846767445
183816,2019-12-16T00:03:13.000+0000,53.44937116151269



The table has a total of 9032373 rows



## Processing the Workout table

The processing steps for the Workout table mirror those of the `Heartrate` table. Therefore, I'll proceed with a streamlined approach for this process.

### Extract and Parse

In [0]:
bronze_df_workout = bronze_df.where(bronze_df.topic == 'workout')

display(bronze_df_workout.limit(10))

key,value,topic,partition,offset,timestamp,date,week_part
40872,"{""user_id"": 40872, ""workout_id"": 8, ""timestamp"": 1575229675.0, ""action"": ""start"", ""session_id"": 76}",workout,0,189240,1575229677709,2019-12-01,2019-48
29213,"{""user_id"": 29213, ""workout_id"": 13, ""timestamp"": 1575196562.0, ""action"": ""start"", ""session_id"": 295}",workout,0,189198,1575196569155,2019-12-01,2019-48
29213,"{""user_id"": 29213, ""workout_id"": 13, ""timestamp"": 1575200016.0, ""action"": ""stop"", ""session_id"": 295}",workout,0,189199,1575200018703,2019-12-01,2019-48
27703,"{""user_id"": 27703, ""workout_id"": 47, ""timestamp"": 1575203433.0, ""action"": ""start"", ""session_id"": 456}",workout,0,189202,1575203437440,2019-12-01,2019-48
14508,"{""user_id"": 14508, ""workout_id"": 31, ""timestamp"": 1575224965.0, ""action"": ""start"", ""session_id"": 392}",workout,0,189227,1575224969124,2019-12-01,2019-48
12474,"{""user_id"": 12474, ""workout_id"": 35, ""timestamp"": 1575225840.0, ""action"": ""stop"", ""session_id"": 2}",workout,0,189232,1575225843611,2019-12-01,2019-48
28521,"{""user_id"": 28521, ""workout_id"": 20, ""timestamp"": 1575227037.0, ""action"": ""start"", ""session_id"": 328}",workout,0,189234,1575227043638,2019-12-01,2019-48
35226,"{""user_id"": 35226, ""workout_id"": 43, ""timestamp"": 1575193979.0, ""action"": ""start"", ""session_id"": 402}",workout,0,189192,1575193983608,2019-12-01,2019-48
26847,"{""user_id"": 26847, ""workout_id"": 14, ""timestamp"": 1575205629.0, ""action"": ""start"", ""session_id"": 2}",workout,0,189207,1575205633152,2019-12-01,2019-48
28588,"{""user_id"": 28588, ""workout_id"": 34, ""timestamp"": 1575207666.0, ""action"": ""start"", ""session_id"": 1}",workout,0,189209,1575207670803,2019-12-01,2019-48


In [0]:
workout_schema = t.StructType(
    [
        t.StructField('user_id', t.IntegerType(), True),
        t.StructField('workout_id', t.IntegerType(), True),
        t.StructField('timestamp', t.DoubleType(), True),
        t.StructField('action', t.StringType(), True),
        t.StructField('session_id', t.IntegerType(), True)
    ]
)

workout_df = bronze_df_workout.withColumn('value', f.from_json('value', workout_schema)) \
                             .select(f.col('value.*')) \
                             .select("user_id", "workout_id", "session_id", f.col("timestamp").cast("timestamp").alias("time"), "action")

display(workout_df.limit(10))

print("")
print(f"The DataFrame has a total of {workout_df.count()} rows")
print("")

user_id,workout_id,session_id,time,action
24863,25,3,2019-12-09T13:37:14.000+0000,start
16093,26,502,2019-12-09T13:43:36.000+0000,start
43104,29,5,2019-12-09T13:47:32.000+0000,start
45875,46,220,2019-12-09T15:24:55.000+0000,stop
29213,25,310,2019-12-09T18:33:36.000+0000,stop
13937,47,5,2019-12-09T19:38:50.000+0000,start
41367,25,314,2019-12-09T19:44:40.000+0000,start
30514,20,1,2019-12-10T10:18:52.000+0000,stop
16093,8,504,2019-12-10T10:28:24.000+0000,start
24018,12,158,2019-12-10T10:32:03.000+0000,stop



The DataFrame has a total of 1514 rows



### Basic Cleaning

In [0]:
workout_df.dropDuplicates(["user_id", "time"]).count()

Out[19]: 1514

In [0]:
workout_df.dropDuplicates().count()

Out[20]: 1514

While it seems duplicate rows aren't currently an issue, I'll still address it in the Structured Streaming query.

In [0]:
display(workout_df.withColumn('timestamp', f.to_date("time", "yyyyMMdd")).select(f.min('time'), f.max('time')))

min(time),max(time)
2019-12-01T06:57:06.000+0000,2019-12-16T16:06:00.000+0000


I'll apply the same constraint to the `Workout` table as with the `Heartrate` table

### Exploring

While `session_id` and `workout_id` may contain similar information, in the context of the workouts table, the column representing a unique start and end session is deemed crucial for future transformations.

In [0]:
# Display the count of workouts by user_id and workout_id
display(
    workout_df.groupBy("user_id", "workout_id").count().orderBy(f.col("count").desc())
)

# Display the count of workouts by user_id and session_id
display(
    workout_df.groupBy("user_id", "session_id").count().orderBy(f.col("count").desc())
)

user_id,workout_id,count
29213,0,8
35728,31,6
36117,19,6
14232,30,6
27306,35,6
26285,26,6
35728,42,6
33987,15,6
29213,11,6
14508,47,6


user_id,session_id,count
16093,502,2
43104,6,2
45875,226,2
12140,479,2
28588,13,2
31362,303,2
35428,1,2
23916,9,2
28814,8,2
34740,325,2


### Create Workouts Table

In [0]:
# Define merge and update logic for the stream
def workout_merge_data(df, batch_id):
    df.createOrReplaceTempView("stream_updates")

    sql_query = """
    MERGE INTO silver.workouts a
    USING stream_updates b
    ON a.user_id = b.user_id AND a.time = b.time
    WHEN MATCHED THEN
    UPDATE SET *
    WHEN NOT MATCHED THEN
    INSERT *
    """

    df.sparkSession.sql(sql_query)


workout_json_schema = "user_id INTEGER, workout_id INTEGER, timestamp DOUBLE, action STRING, session_id INTEGER"

def process_workouts():
    # Create the workouts table if it doesn't exist
    spark.sql(f"""
                CREATE TABLE IF NOT EXISTS 
                    silver.workouts ( 
                        user_id     INTEGER,   
                        workout_id  INTEGER,   
                        time        TIMESTAMP, 
                        action      STRING,    
                        session_id  INTEGER    
                                    ) 
                USING DELTA
                """)

    # Add constraint to ensure timestamp is after a specific date
    spark.sql(f"ALTER TABLE silver.workouts ADD CONSTRAINT check_date CHECK (time > '2019-11-30')")

    # Read stream from the bronze table, apply transformations, and write to the workouts table
    (spark.readStream
            .table("bronze.bronze")
            .filter("topic = 'workout'")
            .select(f.from_json(f.col("value").cast("string"), workout_json_schema).alias("wkrt"))
            .select("wkrt.*")
            .select("user_id", "workout_id", f.col("timestamp").cast("timestamp").alias("time"), "action", "session_id")
            .withWatermark("time", "30 seconds")
            .dropDuplicates(["user_id", "time"])
                .writeStream
                .foreachBatch(workout_merge_data)
                .outputMode("update")
                .option("checkpointLocation", f"{checkpoint_path}/workouts")
                .queryName("silver.workouts")
                .trigger(availableNow=True)
                .start()
                .awaitTermination())

def reprocess_workouts():
    # Drop the workouts table if it exists
    spark.sql(f"drop table if exists silver.workouts")

    # Remove checkpoint location for workouts
    dbutils.fs.rm(f"{checkpoint_path}/workouts", True)

    # Reprocess workouts
    process_workouts()

In [0]:
process_workouts()

#### Verifying Transformation

In [0]:
# Display extended description of the Workouts table
display(spark.sql("DESCRIBE EXTENDED silver.workouts"))

# Display a sample of the data from the Workouts table
display(spark.read.table("silver.workouts").limit(10))

# Print total count of rows in the Workouts table
print("")
print(f"The table has a total of {spark.table('silver.workouts').count()} rows")
print("")

col_name,data_type,comment
user_id,int,
workout_id,int,
time,timestamp,
action,string,
session_id,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,silver,
Table,workouts,


user_id,workout_id,time,action,session_id
43104,29,2019-12-09T13:47:32.000+0000,start,5
45875,46,2019-12-09T15:24:55.000+0000,stop,220
43104,25,2019-12-10T15:06:10.000+0000,stop,6
28588,33,2019-12-15T14:40:13.000+0000,stop,13
29213,38,2019-12-12T15:58:08.000+0000,stop,314
24968,44,2019-12-14T14:04:06.000+0000,start,3
26847,22,2019-12-14T17:08:16.000+0000,start,5
29130,8,2019-12-15T18:35:42.000+0000,stop,14
40872,47,2019-12-15T19:43:55.000+0000,stop,98
40559,6,2019-12-13T09:28:41.000+0000,start,1



The table has a total of 1514 rows



## Creating Completed Workouts table 
<br>
It may seem unconventional, but we'll perform an aggregation on the silver layer. As observed, the workouts table contains separate entries for each action (start and stop), which is not ideal. Therefore, aggregation is necessary here.
<br>
The goal is to consolidate both actions into a single row to represent a complete workout, with only completed workouts being added to the table.

### Transformation Strategy

My approach involves a step-by-step exploration to uncover the desired outcome. Initially, I focus on discovering a method to achieve the result I seek. Once identified, I then prioritize optimizing the process for efficiency and effectiveness.

In [0]:
display(workout_df.filter(f.col("action") == "start") \
                          .select("user_id", 
                                  "workout_id", 
                                  "session_id", 
                                  "time", 
                                  f.lit(None).alias("end_time"), 
                                  f.lit(True).alias("in_progress")
                                 )
                          .limit(10)
       )

user_id,workout_id,session_id,time,end_time,in_progress
24863,25,3,2019-12-09T13:37:14.000+0000,,True
16093,26,502,2019-12-09T13:43:36.000+0000,,True
43104,29,5,2019-12-09T13:47:32.000+0000,,True
13937,47,5,2019-12-09T19:38:50.000+0000,,True
41367,25,314,2019-12-09T19:44:40.000+0000,,True
16093,8,504,2019-12-10T10:28:24.000+0000,,True
15149,25,297,2019-12-10T17:17:06.000+0000,,True
27306,3,173,2019-12-10T17:35:29.000+0000,,True
28814,38,1,2019-12-11T07:36:01.000+0000,,True
45875,37,226,2019-12-11T08:13:50.000+0000,,True


In [0]:
display(workout_df.filter(f.col("action") == "stop") \
                          .select("user_id",
                                  "workout_id", 
                                  "session_id", 
                                  f.lit(None).alias("start_time"), 
                                  "time", f.lit(False).alias("in_progress")
                                 )
                          .limit(10)
       )

user_id,workout_id,session_id,start_time,time,in_progress
45875,46,220,,2019-12-09T15:24:55.000+0000,False
29213,25,310,,2019-12-09T18:33:36.000+0000,False
30514,20,1,,2019-12-10T10:18:52.000+0000,False
24018,12,158,,2019-12-10T10:32:03.000+0000,False
43104,25,6,,2019-12-10T15:06:10.000+0000,False
29213,33,312,,2019-12-11T18:46:47.000+0000,False
29745,2,6,,2019-12-12T12:48:29.000+0000,False
41192,7,9,,2019-12-14T11:00:33.000+0000,False
27306,8,183,,2019-12-14T11:54:43.000+0000,False
41796,31,8,,2019-12-14T12:54:01.000+0000,False


In [0]:
start_actions = workout_df.filter(f.col("action") == "start") \
                          .select("user_id", 
                                  "workout_id", 
                                  "session_id", 
                                  "time", 
                                  f.lit(None).alias("end_time"), 
                                  f.lit(True).alias("in_progress"))

stop_actions = workout_df.filter(f.col("action") == "stop") \
                         .select("user_id",
                                 "workout_id", 
                                 "session_id", 
                                 f.lit(None).alias("start_time"), 
                                 "time", f.lit(False).alias("in_progress"))

completed_workouts = start_actions.alias("a").join(stop_actions.alias("b"), ["user_id", "session_id"], "left") \
                                             .withColumn("workout_progress", f.when(f.col("a.in_progress") & f.col("b.in_progress").isNull(), f.lit("in_progress")).otherwise(f.lit("done"))) \
                                             .select(f.col("a.user_id"),
                                                     f.col("a.workout_id"), 
                                                     f.col("a.session_id"),
                                                     f.col("a.time").alias("start_time"),
                                                     f.col("b.time").alias("end_time"),
                                                     "workout_progress") \
                                             .where(f.col("workout_progress") == "done")

n_completed_workouts = completed_workouts.count()

display(completed_workouts.limit(10))

print("")
print(f"Number of completed workouts {n_completed_workouts}")
print("")

user_id,workout_id,session_id,start_time,end_time,workout_progress
24863,25,3,2019-12-09T13:37:14.000+0000,2019-12-09T14:48:25.000+0000,done
16093,26,502,2019-12-09T13:43:36.000+0000,2019-12-09T14:41:36.000+0000,done
43104,29,5,2019-12-09T13:47:32.000+0000,2019-12-09T14:23:37.000+0000,done
13937,47,5,2019-12-09T19:38:50.000+0000,2019-12-09T20:11:41.000+0000,done
41367,25,314,2019-12-09T19:44:40.000+0000,2019-12-09T20:56:57.000+0000,done
16093,8,504,2019-12-10T10:28:24.000+0000,2019-12-10T11:49:54.000+0000,done
15149,25,297,2019-12-10T17:17:06.000+0000,2019-12-10T18:26:57.000+0000,done
27306,3,173,2019-12-10T17:35:29.000+0000,2019-12-10T18:07:16.000+0000,done
28814,38,1,2019-12-11T07:36:01.000+0000,2019-12-11T08:27:42.000+0000,done
45875,37,226,2019-12-11T08:13:50.000+0000,2019-12-11T08:55:35.000+0000,done



Number of completed workouts 757



I've visually achieved the desired layout for the completed workouts table. And learnt that all workouts in this sample data are completed, out of 1514 initial records, 757 completed workouts were compiled

### Testing In-Progress Workouts

With the logic for extracting completed workouts in place, it's time to stress-test the system with flawed data.<br>
I'll intentionally create some in-progress workouts and records without the start action to assess the resilience of the logic.

In [0]:
%sql
DELETE FROM silver.workouts a WHERE a.user_id between 20000 and 27500 and a.action = "stop";

num_affected_rows
134


In [0]:
%sql
DELETE FROM silver.workouts a WHERE a.user_id between 27501 and 30000 and a.action = "start";

num_affected_rows
103


After removing incomplete workout entries, a total of 237 records were deleted. When applying the same logic to flawed data, only 520 records are expected to be added to the dataframe.

In [0]:
missing_workouts_df = spark.table("silver.workouts")

In [0]:
start_actions = missing_workouts_df.filter(f.col("action") == "start") \
                                   .select("user_id", 
                                           "workout_id", 
                                           "session_id", 
                                           "time", 
                                           f.lit(None).alias("end_time"), 
                                           f.lit(True).alias("in_progress"))

stop_actions = missing_workouts_df.filter(f.col("action") == "stop") \
                                  .select("user_id",
                                          "workout_id", 
                                          "session_id", 
                                          f.lit(None).alias("start_time"), 
                                          "time", f.lit(False).alias("in_progress"))

test_completed_workouts = start_actions.alias("a").join(stop_actions.alias("b"), ["user_id", "session_id"], "left") \
                                                  .withColumn("workout_progress", f.when(f.col("a.in_progress") & f.col("b.in_progress").isNull(), f.lit("in_progress")).otherwise(f.lit("done"))) \
                                                  .select(f.col("a.user_id"),
                                                          f.col("a.workout_id"), 
                                                          f.col("a.session_id"),
                                                          f.col("a.time").alias("start_time"),
                                                          f.col("b.time").alias("end_time")
                                                         ) \
                                                  .where(f.col("workout_progress") == "done")

n_test_completed_workouts = test_completed_workouts.count()

display(test_completed_workouts.limit(10))

print("")
print(f"Number of completed workouts {n_test_completed_workouts}")
print("")

user_id,workout_id,session_id,start_time,end_time
33733,27,4,2019-12-14T08:40:59.000+0000,2019-12-14T09:25:33.000+0000
17807,10,4,2019-12-12T15:09:12.000+0000,2019-12-12T15:45:39.000+0000
30088,0,449,2019-12-09T17:27:09.000+0000,2019-12-09T17:43:00.000+0000
43052,26,16,2019-12-11T18:18:33.000+0000,2019-12-11T19:20:06.000+0000
36117,19,322,2019-12-12T17:25:04.000+0000,2019-12-12T19:08:10.000+0000
40177,38,2,2019-12-14T12:36:53.000+0000,2019-12-14T13:20:46.000+0000
41796,35,7,2019-12-13T07:51:28.000+0000,2019-12-13T08:50:39.000+0000
35728,41,420,2019-12-13T09:00:15.000+0000,2019-12-13T09:21:56.000+0000
47356,4,9,2019-12-10T07:49:40.000+0000,2019-12-10T08:11:54.000+0000
12474,32,17,2019-12-15T18:32:22.000+0000,2019-12-15T19:37:21.000+0000



Number of completed workouts 520



With the solid logic in place, it's time to integrate all this code into the streaming logic and create the completed_workouts table.

### Create Completed Workouts

In [0]:
def completed_wrkt_merge_data(df, batch_id):
    df.createOrReplaceTempView("stream_updates")

    sql_query = """
    MERGE INTO silver.completed_workouts a
    USING stream_updates b
    ON a.user_id=b.user_id AND a.session_id=b.session_id
    WHEN MATCHED THEN
    UPDATE SET *
    WHEN NOT MATCHED THEN
    INSERT * 
    """    
    df.sparkSession.sql(sql_query)


def process_completed_workouts():

    spark.sql("""
      CREATE TABLE IF NOT EXISTS  
      silver.completed_workouts   
        (user_id      INTEGER,    
        session_id   INTEGER,    
        start_time   TIMESTAMP,  
        end_time     TIMESTAMP)  
      USING DELTA \
    """)

    (spark.readStream
          .table("silver.workouts")
          .groupBy("user_id", "session_id")
          .agg(
              f.max(f.when(f.col("action") == "start", f.col("time"))).alias("start_time"),
              f.max(f.when(f.col("action") == "stop", f.col("time"))).alias("end_time")
              )
          .filter(f.col("start_time").isNotNull() & f.col("end_time").isNotNull())
          .select("user_id", "session_id", "start_time", "end_time")
          .withWatermark("start_time", "30 seconds")
          .writeStream
              .foreachBatch(completed_wrkt_merge_data)
              .option("checkpointLocation", f"{checkpoint_path}/completed_workouts")
              .outputMode("update")
              .trigger(availableNow=True)
              .queryName("completed_workouts")
              .start()
              .awaitTermination()
)

def reprocess_completed_workouts():
    dbutils.fs.rm(f"{checkpoint_path}/completed_workouts", True)

    spark.sql("drop table if exists silver.completed_workouts")

    process_completed_workouts()    


In [0]:
process_completed_workouts()


#### Verify Results

In [0]:
# Display a sample of the data from the Completed Workouts table
display(spark.read.table("silver.completed_workouts").limit(10))

# Print total count of rows in the Completed Workouts table
print("")
print(f"The table has a total of {spark.table('silver.completed_workouts').count()} rows")
print("")

user_id,session_id,start_time,end_time
36117,322,2019-12-12T17:25:04.000+0000,2019-12-12T19:08:10.000+0000
45875,233,2019-12-15T12:22:47.000+0000,2019-12-15T13:10:26.000+0000
41192,5,2019-12-10T17:39:59.000+0000,2019-12-10T18:14:34.000+0000
30088,450,2019-12-09T17:50:29.000+0000,2019-12-09T18:19:49.000+0000
33987,499,2019-12-11T14:05:23.000+0000,2019-12-11T14:26:21.000+0000
41796,2,2019-12-10T17:41:14.000+0000,2019-12-10T18:03:39.000+0000
45875,219,2019-12-09T13:56:40.000+0000,2019-12-09T14:03:49.000+0000
43104,6,2019-12-10T14:08:28.000+0000,2019-12-10T15:06:10.000+0000
34740,326,2019-12-11T19:05:52.000+0000,2019-12-11T20:04:29.000+0000
35226,419,2019-12-12T17:57:02.000+0000,2019-12-12T18:54:38.000+0000



The table has a total of 520 rows



### Reverting Flawed Data
<br>
With the streaming process yielding consistent results, I'll reset the workout table and proceed with the full sample.

In [0]:
reprocess_workouts()

reprocess_completed_workouts()

# Print total count of rows in the Completed Workouts table
print("")
print(f"The table has a total of {spark.table('silver.completed_workouts').count()} rows")
print("")


The table has a total of 757 rows



## Create Gym_report
In this section, I'll create the first object in the `Gold` layer: the `gym_report` view. <br>
This view will utilize the completed_workouts table along with two other production tables, `gym_logs` and `user_lookup`.<br>
The goal is to obtain the time span and number of workouts per user in a given gym.<br>
First, let's examine the structure of the other two tables.<br>

### Evaluate Tables

It's crucial to have a clear understanding of the tables to strategize how to apply the given business rules effectively to the data.

In [0]:
# Clone gym_logs table to Silver schema 
spark.sql(f"""
CREATE TABLE silver.gym_logs
SHALLOW CLONE delta.`{main_path}/gym_logs`
LOCATION 'dbfs:/user/hive/warehouse/silver.db/gym_logs'
""")

# Obtain the table in a DF and transform the timestamp to YYYY-MM-DD
gym_df = (spark.table("silver.gym_logs") 
               .select(f.col("first_timestamp").cast("timestamp").cast("date").alias("date"),
                       f.col("first_timestamp").cast("timestamp"), 
                       f.col("gym").cast("integer"), 
                       f.col("last_timestamp").cast("timestamp"), 
                       f.col("mac").cast("string").alias("mac_address") 
                       )
         )

# Display a sample of the DF
display(gym_df.limit(10))

# Print total count of rows in the gym_df table
print("")
print(f"The table has a total of {gym_df.count()} rows")
print("")

date,first_timestamp,gym,last_timestamp,mac_address
2019-12-01,2019-12-01T12:15:19.000+0000,1,2019-12-01T13:56:19.000+0000,3c:97:06:9e:63:9c
2019-12-01,2019-12-01T09:59:02.000+0000,2,2019-12-01T11:36:35.000+0000,e5:a8:d5:73:cf:33
2019-12-01,2019-12-01T17:23:12.000+0000,2,2019-12-01T18:41:55.000+0000,e4:2c:d4:ea:e4:b4
2019-12-01,2019-12-01T07:11:36.000+0000,2,2019-12-01T09:13:56.000+0000,54:bf:e4:40:52:b9
2019-12-01,2019-12-01T17:27:36.000+0000,3,2019-12-01T19:37:45.000+0000,c3:3e:16:08:f9:47
2019-12-01,2019-12-01T17:33:04.000+0000,3,2019-12-01T18:42:50.000+0000,4c:53:51:88:59:ab
2019-12-01,2019-12-01T07:14:05.000+0000,4,2019-12-01T08:18:10.000+0000,dd:45:d2:37:a8:0e
2019-12-01,2019-12-01T17:27:06.000+0000,4,2019-12-01T19:20:54.000+0000,df:f9:dc:5e:e2:a8
2019-12-01,2019-12-01T12:07:11.000+0000,5,2019-12-01T13:23:50.000+0000,a4:eb:49:d9:9b:1d
2019-12-01,2019-12-01T09:49:10.000+0000,5,2019-12-01T10:38:55.000+0000,00:6c:6c:53:51:ef



The table has a total of 314 rows



In [0]:
# Clone user_lookup table to Silver schema 
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver.user_lookup
SHALLOW CLONE delta.`{main_path}/user-lookup`
LOCATION 'dbfs:/user/hive/warehouse/silver.db/user_lookup'
""")

# Obtain the table in a DF and encrypted the user_id renaming to alt_id
user_lookup_df = (spark.table("silver.user_lookup") \
                       .withColumn("alt_id", f.sha2(f.concat(f.col("user_id"),f.lit('BEANS')), 256))\
                       .select(f.col("alt_id").cast("string"), \
                               f.col("device_id").cast("integer"), \
                               f.col("mac_address").cast("string"), \
                               f.col("user_id").cast("integer") \
                              )
                 )

# Dispay a sample of the DF
display(user_lookup_df.limit(10))

# Print total count of rows in the user_lookup_df table
print("")
print(f"The table has a total of {gym_df.count()} rows")
print("")

alt_id,device_id,mac_address,user_id
745c35775cfdf69e9a190c26a173285712c1929c5faa5087ce2855e9043b33f4,190960,14:cd:d6:db:70:f6,11745
a610459add89c3c6a1de87bf8beaa4aee317564837e8d0247852004197e7d8cc,141687,ae:ec:f6:48:ca:f7,12140
c3274e338a1da85e0748f6c9c92247e176044770f4e39637a4c4d4b348191fcf,114131,57:24:ac:8c:75:ea,12227
e6a6387ffbb75e14eb39b138be1e7e9b8a6687e66c36865bf6e234d6688dd517,118440,4c:c5:9f:cb:13:bd,12474
898fa1b7f7057b1b17bca1342b4341b7d84e228d6d9bb83a7a770c4d487ebc4b,109290,36:1f:d9:d3:e8:0d,13559
e83442f287db35be0374a66e2253ab5c119417b0e559554291ac2af8f7f5809f,143442,dd:96:be:e9:1e:f4,13937
71324d786c7f10bd56edf127946a934016c65ff515c47f4d6ea5d233d5cae66a,172965,dd:45:d2:37:a8:0e,14232
a6f9678bccb4d7e7bc0ba5afbe9ba83c154889506173f51c1d55bc19c115bec6,102558,df:f9:dc:5e:e2:a8,14508
8bdbabab0376e9bc29748fd75ba8b669e9947dbcb0ee492a32a4d5c1b604321f,175406,1d:69:69:75:d0:aa,14633
54c9a8be55ad6b77cc376ef3638fc4af1ae1961685edd4cd986a5376c46bc67f,177966,de:c0:cd:a7:71:f4,15149



The table has a total of 314 rows



The `user_lookup` table links all possible IDs a user may have, with `alt_id` being a topic I'll explore later.<br>
The `gym_logs` table records every workout done in a given gym, splited by `mac_address`.<br>
By using the `user_lookup` table to link the `user_id` from the `completed_workouts` table with the `mac_address` from `gym_logs`, the gym_report table will be created.<br>

### Begin Trasformation

To account for individuals working out multiple times a day, I need to transform the `completed_workouts` table from "every workout per person" to "all workouts per day per person." Iâ€™ll achieve this by:

- Grouping by the `user_id` and the `start_time` column, transformed to YYYY-MM-DD.
- Collecting the workouts (`session_id`) performed (using collect_list for possible duplicated records).
- Identifying when the user first went to the gym that day.
- Determining the last time the user was at the gym that day.


In [0]:
completed_workouts = spark.table("silver.completed_workouts").select(f.col("start_time").cast("date").alias("date"), 'user_id', 'session_id', 'start_time', 'end_time')

grouped_comp_wrkt = completed_workouts.groupBy("user_id", "date") \
                                      .agg( \
                                            f.collect_set("session_id").alias("workouts"),\
                                            f.min("start_time").alias("start_workout"),\
                                            f.max("end_time").alias("end_workout") \
                                          )

display(grouped_comp_wrkt)


user_id,date,workouts,start_workout,end_workout
11745,2019-12-10,List(1),2019-12-10T07:25:12.000+0000,2019-12-10T08:20:39.000+0000
11745,2019-12-13,List(2),2019-12-13T13:51:49.000+0000,2019-12-13T15:00:10.000+0000
12140,2019-12-05,"List(477, 478)",2019-12-05T07:52:09.000+0000,2019-12-05T09:17:19.000+0000
12140,2019-12-12,List(479),2019-12-12T07:15:44.000+0000,2019-12-12T07:48:50.000+0000
12227,2019-12-11,"List(1, 2)",2019-12-11T17:16:57.000+0000,2019-12-11T18:48:10.000+0000
12227,2019-12-13,List(3),2019-12-13T12:28:41.000+0000,2019-12-13T13:23:31.000+0000
12227,2019-12-15,"List(5, 4)",2019-12-15T07:30:29.000+0000,2019-12-15T08:10:38.000+0000
12474,2019-12-01,"List(1, 2)",2019-12-01T17:28:22.000+0000,2019-12-01T18:44:00.000+0000
12474,2019-12-02,List(3),2019-12-02T07:39:36.000+0000,2019-12-02T08:16:03.000+0000
12474,2019-12-03,"List(5, 4)",2019-12-03T07:20:53.000+0000,2019-12-03T08:14:24.000+0000


In [0]:
display(completed_workouts.join(user_lookup_df, ["user_id"])
                          .groupBy("mac_address", "date")
                          .agg(
                               f.collect_set("completed_workouts.session_id").alias("workouts"),
                               f.min("completed_workouts.start_time").alias("start_workout"),
                               f.max("completed_workouts.end_time").alias("end_workout") 
                               )
                          .join(gym_df, ["mac_address", "date"], how= "inner")
                          .withColumn("minutes_in_gym", 
                                         (f.unix_timestamp(gym_df.last_timestamp) - f.unix_timestamp(gym_df. 
                                          first_timestamp)) / 60)
                          .withColumn("minutes_exercising", 
                                         (f.unix_timestamp("end_workout") - f.unix_timestamp("start_workout")) / 60)
                          .select(
                                    f.col("gym"), 
                                    f.col("mac_address"), 
                                    f.col("date"), 
                                    f.col("workouts"),  
                                    f.col("minutes_in_gym"), 
                                    f.col("minutes_exercising")
                                    )
                          .limit(10)
)

gym,mac_address,date,workouts,minutes_in_gym,minutes_exercising
5,00:6c:6c:53:51:ef,2019-12-01,"List(402, 403)",49.75,40.96666666666667
5,00:6c:6c:53:51:ef,2019-12-02,"List(404, 405)",97.66666666666669,92.6
5,00:6c:6c:53:51:ef,2019-12-03,"List(407, 406)",47.1,37.766666666666666
5,00:6c:6c:53:51:ef,2019-12-04,List(408),57.46666666666667,46.05
5,00:6c:6c:53:51:ef,2019-12-05,"List(409, 410)",84.91666666666667,68.56666666666666
5,00:6c:6c:53:51:ef,2019-12-07,"List(411, 412)",94.45,78.3
5,00:6c:6c:53:51:ef,2019-12-08,"List(413, 414)",56.63333333333333,45.583333333333336
5,00:6c:6c:53:51:ef,2019-12-09,"List(416, 415)",125.31666666666666,111.48333333333332
5,00:6c:6c:53:51:ef,2019-12-10,List(417),69.05,64.66666666666667
5,00:6c:6c:53:51:ef,2019-12-11,List(418),60.43333333333333,43.96666666666667


### Create Gym_report

This process will differ from the previous ones because I can't create a `Permanent View` in PySpark. Instead, I will perform the process in SQL. Since it will be a view, it will be fully reprocessed periodically, eliminating the need for a merge function.

In [0]:
%sql
DROP VIEW if EXISTS gold.gym_report;

CREATE OR REPLACE VIEW gold.gym_report AS (
  SELECT gym, mac_address, date, workouts, (last_timestamp - first_timestamp)/60 minutes_in_gym, (to_unix_timestamp(end_workout) - to_unix_timestamp(start_workout))/60 minutes_exercising
  FROM silver.gym_logs c
  INNER JOIN (
    SELECT b.mac_address, to_date(start_time) date, collect_set(session_id) workouts, min(start_time) start_workout, max(end_time) end_workout
    FROM silver.completed_workouts a
    INNER JOIN silver.user_lookup b
    ON a.user_id = b.user_id
    GROUP BY mac_address, to_date(start_time)
    ) d
    ON c.mac = d.mac_address AND to_date(CAST(c.first_timestamp AS timestamp)) = d.date
);

select * from gold.gym_report limit 10;

gym,mac_address,date,workouts,minutes_in_gym,minutes_exercising
5,00:6c:6c:53:51:ef,2019-12-01,"List(402, 403)",49.75,40.96666666666667
5,00:6c:6c:53:51:ef,2019-12-02,"List(404, 405)",97.66666666666669,92.6
5,00:6c:6c:53:51:ef,2019-12-03,"List(407, 406)",47.1,37.766666666666666
5,00:6c:6c:53:51:ef,2019-12-04,List(408),57.46666666666667,46.05
5,00:6c:6c:53:51:ef,2019-12-05,"List(409, 410)",84.91666666666667,68.56666666666666
5,00:6c:6c:53:51:ef,2019-12-07,"List(411, 412)",94.45,78.3
5,00:6c:6c:53:51:ef,2019-12-08,"List(413, 414)",56.63333333333333,45.583333333333336
5,00:6c:6c:53:51:ef,2019-12-09,"List(416, 415)",125.31666666666666,111.48333333333332
5,00:6c:6c:53:51:ef,2019-12-10,List(417),69.05,64.66666666666667
5,00:6c:6c:53:51:ef,2019-12-11,List(418),60.43333333333333,43.96666666666667


In [0]:
print("")
print(f"The view has a total of {spark.table('gold.gym_report').count()} rows")
print("")


The table has a total of 304 rows



## Process Users
This table is particularly interesting because it contains both Personally Identifiable Information (PII) and functions as a Change Data Feed (CDF) table.<br>
These characteristics require additional steps for processing.<br>
The start is the same as previously done with the other base tables.

### Start Processing

In [0]:
bronze_df_users = bronze_df.where(bronze_df.topic == 'user_info')

display(bronze_df_users.limit(10))

print("")
print(f"The DF has a total of {bronze_df_users.count()} rows")
print("")

key,value,topic,partition,offset,timestamp,date,week_part
35226,"{""user_id"": 35226, ""update_type"": ""new"", ""timestamp"": 1560222288, ""dob"": ""01/17/1954"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""William"", ""last_name"": ""Allen"", ""address"": {""street_address"": ""559 Katelyn Forks Apt. 417"", ""city"": ""Glendale"", ""state"": ""CA"", ""zip"": 91208}}",user_info,0,12762,1560222288120,2019-06-11,2019-24
40872,"{""user_id"": 40872, ""update_type"": ""new"", ""timestamp"": 1560219172, ""dob"": ""03/08/1982"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""David"", ""last_name"": ""King"", ""address"": {""street_address"": ""1074 Baker Summit"", ""city"": ""Los Angeles"", ""state"": ""CA"", ""zip"": 90043}}",user_info,0,12761,1560219172223,2019-06-11,2019-24
49296,"{""user_id"": 49296, ""update_type"": ""new"", ""timestamp"": 1560475456, ""dob"": ""04/23/1989"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""William"", ""last_name"": ""Hinton"", ""address"": {""street_address"": ""441 Davis Stravenue"", ""city"": ""North Hollywood"", ""state"": ""CA"", ""zip"": 91602}}",user_info,0,12765,1560475457807,2019-06-14,2019-24
30612,"{""user_id"": 30612, ""update_type"": ""new"", ""timestamp"": 1560507319, ""dob"": ""03/27/1937"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""Darrell"", ""last_name"": ""Gonzalez"", ""address"": {""street_address"": ""1945 George Inlet Suite 490"", ""city"": ""Valyermo"", ""state"": ""CA"", ""zip"": 93563}}",user_info,0,12766,1560507323075,2019-06-14,2019-24
13559,"{""user_id"": 13559, ""update_type"": ""new"", ""timestamp"": 1560189545, ""dob"": ""03/06/1980"", ""sex"": ""F"", ""gender"": ""F"", ""first_name"": ""Victoria"", ""last_name"": ""Smith"", ""address"": {""street_address"": ""88788 Dawson Lodge"", ""city"": ""Los Angeles"", ""state"": ""CA"", ""zip"": 90065}}",user_info,0,12760,1560189553779,2019-06-10,2019-24
36164,"{""user_id"": 36164, ""update_type"": ""new"", ""timestamp"": 1560249727, ""dob"": ""10/08/1935"", ""sex"": ""F"", ""gender"": ""F"", ""first_name"": ""Caitlin"", ""last_name"": ""Miles"", ""address"": {""street_address"": ""1523 Garza Estate Apt. 852"", ""city"": ""Los Angeles"", ""state"": ""CA"", ""zip"": 90057}}",user_info,0,12763,1560249731473,2019-06-11,2019-24
34740,"{""user_id"": 34740, ""update_type"": ""new"", ""timestamp"": 1560379376, ""dob"": ""04/18/1991"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""Kevin"", ""last_name"": ""Phillips"", ""address"": {""street_address"": ""60685 Pena Crossroad"", ""city"": ""Santa Monica"", ""state"": ""CA"", ""zip"": 90403}}",user_info,0,12764,1560379383367,2019-06-12,2019-24
38766,"{""user_id"": 38766, ""update_type"": ""new"", ""timestamp"": 1560539604, ""dob"": ""08/27/1978"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""Daniel"", ""last_name"": ""Ruiz"", ""address"": {""street_address"": ""2620 Hatfield Village Suite 039"", ""city"": ""Los Angeles"", ""state"": ""CA"", ""zip"": 90073}}",user_info,0,12768,1560539608724,2019-06-14,2019-24
26285,"{""user_id"": 26285, ""update_type"": ""new"", ""timestamp"": 1560528115, ""dob"": ""11/15/1969"", ""sex"": ""M"", ""gender"": ""M"", ""first_name"": ""William"", ""last_name"": ""Gutierrez"", ""address"": {""street_address"": ""8954 Willie Club"", ""city"": ""Beverly Hills"", ""state"": ""CA"", ""zip"": 90212}}",user_info,0,12767,1560528121614,2019-06-14,2019-24
27306,"{""user_id"": 27306, ""update_type"": ""new"", ""timestamp"": 1557184117, ""dob"": ""07/16/1958"", ""sex"": ""F"", ""gender"": ""F"", ""first_name"": ""Stephanie"", ""last_name"": ""Martinez"", ""address"": {""street_address"": ""1767 Katherine Expressway Suite 062"", ""city"": ""Edwards"", ""state"": ""CA"", ""zip"": 93523}}",user_info,0,12753,1557184122286,2019-05-06,2019-19



The DF has a total of 119 rows



In [0]:
users_schema = """
        user_id INTEGER, 
        update_type STRING, 
        timestamp FLOAT, 
        dob STRING, 
        sex STRING, 
        gender STRING, 
        first_name STRING, 
        last_name STRING, 
        address STRUCT<
            street_address: STRING, 
            city: STRING, 
            state: STRING, 
            zip: INT
    >"""

users_df = bronze_df_users.withColumn('value', f.from_json('value', users_schema))\
                         .select(f.col('value.*'))

display(users_df.limit(10))

print("")
print(f"The DF has a total of {bronze_df_users.dropDuplicates().count()} deduped rows")
print("")

user_id,update_type,timestamp,dob,sex,gender,first_name,last_name,address
36164,update,1576264700.0,10/08/1935,F,F,Caitlin,Miles,"List(176 Sandra Lane Suite 564, Carson, CA, 90746)"
19548,new,1576438400.0,02/12/1972,F,F,Megan,Daugherty,"List(5641 Kelly Tunnel Apt. 584, Los Angeles, CA, 90026)"
42643,delete,1575853820.0,,,,,,
47261,new,1575884540.0,11/05/1946,M,M,Chris,Wright,"List(5700 Catherine Fort Suite 808, Culver City, CA, 90232)"
49835,new,1575944320.0,09/14/1930,M,M,Brian,Wood,"List(42172 Ross Forge, Alhambra, CA, 91803)"
19787,new,1575984640.0,01/02/1992,M,M,Christopher,Frey,"List(76662 Thompson Ville, Northridge, CA, 91327)"
40872,update,1575906560.0,03/08/1982,M,M,David,King,"List(959 Snyder Extensions, Los Angeles, CA, 90038)"
14633,new,1576097660.0,09/04/1997,F,F,Hannah,Fuller,"List(81346 Obrien Streets, Gardena, CA, 90249)"
47250,new,1576451580.0,01/05/1981,M,M,Patrick,Hunt,"List(7768 Dawn Skyway, South Gate, CA, 90280)"
27671,new,1575883010.0,03/01/1930,M,M,Andrew,Monroe,"List(5605 Walsh Orchard, Glendale, CA, 91205)"



The DF has a total of 119 deduped rows



The basic DataFrame is processed, and confirmed that it doesn't contain any duplicate rows.

### Dealing with PII data
In this table, I'll handle Personal Identifiable Information (PII), which requires additional steps to ensure GDPR compliance.<br>
The first step is:

#### Encrypting
As observed in the `user_lookup` table, the `user_id` column is pseudonymized as `alt_id`.<br> 
Below, I'll demonstrate a more secure method of achieving the same structure in the users table.

In [0]:
salt = 'BEANS'
spark.conf.set("salt", salt)

In [0]:
users_df = (users_df.selectExpr(f"sha2(concat(user_id,'${salt}'), 256) AS alt_id",
                                "user_id",
                                "update_type",
                                "timestamp",
                                "dob",
                                "sex",
                                "gender",
                                "first_name",
                                "last_name",
                                "address",
                                )
                        .select(f.col("alt_id"),
                                f.col("user_id"),
                                f.col("timestamp").cast("timestamp").alias("updated"),
                                f.to_date("dob", "MM/dd/yyyy").alias("dob"),
                                f.col("sex"),
                                f.col("gender"),
                                f.col("first_name"),
                                f.col("last_name"),
                                f.col("address"),
                                f.col("update_type")
                        )
                )

display(users_df.limit(10))

alt_id,user_id,updated,dob,sex,gender,first_name,last_name,address,update_type
0cdbef3db20c806ddd473f3608ff69eaba2b99b63a967105b4b22165543f734f,36164,2019-12-13T19:18:24.000+0000,1935-10-08,F,F,Caitlin,Miles,"List(176 Sandra Lane Suite 564, Carson, CA, 90746)",update
24249166bfcf35d3d6bbdb41f04fe6f672ef3ed5b3d7540ff4ba0fd57d763033,19548,2019-12-15T19:33:20.000+0000,1972-02-12,F,F,Megan,Daugherty,"List(5641 Kelly Tunnel Apt. 584, Los Angeles, CA, 90026)",new
6af71c730bf30207f0f5e2ad628ec8d177579f83eb79bfd2b1915b752b47613b,42643,2019-12-09T01:10:24.000+0000,,,,,,,delete
425c2efbb88dc3c7d11caaad88948727c8c09035739e8a4587058aa10e74d668,47261,2019-12-09T09:42:24.000+0000,1946-11-05,M,M,Chris,Wright,"List(5700 Catherine Fort Suite 808, Culver City, CA, 90232)",new
ed8e093eb4988734e124a4b9da92398cf8eefd10dec69fc588c41070df2c15af,49835,2019-12-10T02:18:40.000+0000,1930-09-14,M,M,Brian,Wood,"List(42172 Ross Forge, Alhambra, CA, 91803)",new
62e23537acd9e96b8936f131edbebf53e6671a2517bb88bb45d840d93ac4a09c,19787,2019-12-10T13:30:40.000+0000,1992-01-02,M,M,Christopher,Frey,"List(76662 Thompson Ville, Northridge, CA, 91327)",new
b510c008ba29acbfcaed1b214f7ca9d1afcae0a10b8c69419f798bd77a021b4e,40872,2019-12-09T15:49:20.000+0000,1982-03-08,M,M,David,King,"List(959 Snyder Extensions, Los Angeles, CA, 90038)",update
cf42cb73e4b3a8153ef153ad3f831a156d5b964c9f9ff44c27ddd66a85bdc10d,14633,2019-12-11T20:54:24.000+0000,1997-09-04,F,F,Hannah,Fuller,"List(81346 Obrien Streets, Gardena, CA, 90249)",new
1ed76b6174e0f7646cc9d409e971bad78116ed0e1555ecfb594b7cdc6266ecf3,47250,2019-12-15T23:13:04.000+0000,1981-01-05,M,M,Patrick,Hunt,"List(7768 Dawn Skyway, South Gate, CA, 90280)",new
9c0459d8d3a3d03e3fecddeab1ba6bb89cfcd24aea6392886dd7cabc085d8a4f,27671,2019-12-09T09:16:48.000+0000,1930-03-01,M,M,Andrew,Monroe,"List(5605 Walsh Orchard, Glendale, CA, 91205)",new


With the method described above, I could conceal the salt within a notebook and limit its access. Alternatively, for enhanced security, it's possible to upload the salt as a secret using the Databricks Secrets utility with the following command:

`databricks secrets put-secret --json '{
  "scope": "portfolio_secrets",
  "key": "salt",
  "string_value": "BEANS"
}'` <br> <br>
Then, the salt can be retrieved using the following code:<br>
<br>
`salt = dbutils.secrets.get(scope="portfolio_secrets", key="salt")`

### Chage Data Feed 
Rr CDF for short, is a type of table that retains previous states of the data or the entire history of the data within itself.<br>
In development, this entails using a different method to process this information.<br> 
I'll begin with the insert and update operations.<br>

> ***CDF vs CDC*** <br>
> *After reading more about the topic and studying with the examples that I have, I'm still not 100% confident to categorize this table as CDC or CDF*<br>
> *As far as I'm concerned this would be a CDF table, because the Databricks staff named it as one and I can se row level changes instead of only batch level changes*<br>
> *On the other hand, this table was not created with the CDF config enabled. So, maybe, a half CDF?*<br>
> *In conclusion, I'll call the `users` table CDF*

##### Inserts and Updates

This transformation will follow these steps: 
- Use the Window function with the updated column to get the latest update for each user.
- Utilize the update_type column to determine the required action.
- Implement the logic to have the dataframe with only the latest data.

In [0]:
window = Window.partitionBy("user_id").orderBy(f.col("updated").desc())

ranked_df = (users_df.withColumn("rank", f.rank().over(window))
                     .select("user_id", "update_type", "updated", "rank")
                     )

display(ranked_df.limit(10))

user_id,update_type,updated,rank
11745,new,2019-12-08T00:06:24.000+0000,1
12140,update,2019-12-12T08:04:16.000+0000,1
12140,new,2019-05-24T10:40:00.000+0000,2
12227,new,2019-12-08T14:56:00.000+0000,1
12474,new,2019-12-01T05:28:32.000+0000,1
13559,update,2019-12-10T21:30:40.000+0000,1
13559,new,2019-06-10T17:59:28.000+0000,2
13937,new,2019-12-03T19:03:28.000+0000,1
14232,update,2019-12-09T09:31:44.000+0000,1
14232,new,2019-07-03T00:32:00.000+0000,2


In [0]:
ranked_df = (users_df.withColumn("rank", f.rank().over(window))
                     .filter("rank == 1")
                     .drop("rank")
                     )
                     
display(ranked_df.limit(10))

alt_id,user_id,updated,dob,sex,gender,first_name,last_name,address,update_type
77cc97b2e457eebeb379ebc401dda92c167a3b75a5b182222ddd30a7ca59f0b1,11745,2019-12-08T00:06:24.000+0000,1955-06-29,F,F,Shannon,Reyes,"List(3105 Bowers Expressway, Long Beach, CA, 90808)",new
21252a4a1a3c3055965abcdf07d150252526dd08d448530aff3f9a0b1ba3ea15,12140,2019-12-12T08:04:16.000+0000,1999-02-02,M,M,Robert,Castillo,"List(68994 Steven Vista, Pearblossom, CA, 93553)",update
439af0d33aac875f72c02c406fe2400f44ad41ca51818250835cdd5441d6f64f,12227,2019-12-08T14:56:00.000+0000,1949-12-11,F,F,Courtney,Sheppard,"List(47754 Angela Plaza Apt. 135, Los Angeles, CA, 90010)",new
50dbd84f23da852932469306aeea94b554b4e06da456ade438c1d00eb46fafaa,12474,2019-12-01T05:28:32.000+0000,1939-07-25,M,M,Matthew,Phillips,"List(02648 Wilkins Cliffs Suite 998, San Fernando, CA, 91340)",new
3ea4621e0fd13ae874fd99932b95c985adf5825704c62bcaecde311a57d4a185,13559,2019-12-10T21:30:40.000+0000,1980-03-06,F,F,Victoria,Smith,"List(634 Acevedo Mountain, Santa Monica, CA, 90405)",update
ed758349b7130134cc56dba2153fc3687352b4a059efbfedcd89d471c2864a1d,13937,2019-12-03T19:03:28.000+0000,1982-04-26,M,M,Matthew,Johnson,"List(9231 Edward Throughway Suite 072, Toluca Lake, CA, 91610)",new
ca08d684e3d88b99593bc121043f76ac7d40b3575645f3f9f5afbe6c6b44dc5f,14232,2019-12-09T09:31:44.000+0000,1979-01-04,M,M,Edward,Smith,"List(41444 Noble Cape Suite 390, North Hollywood, CA, 91606)",update
4dd112a1f4d2b9647f144e58693e9d6fec15c20e4b26c72b21c86e9c4fa42d02,14508,2019-07-27T21:26:24.000+0000,1936-01-28,M,M,Justin,Eaton,"List(04952 Lori Plain, Sierra Madre, CA, 91024)",new
cf42cb73e4b3a8153ef153ad3f831a156d5b964c9f9ff44c27ddd66a85bdc10d,14633,2019-12-11T20:54:24.000+0000,1997-09-04,F,F,Hannah,Fuller,"List(81346 Obrien Streets, Gardena, CA, 90249)",new
082446604d60261afdc0491d2a8c3c700fac55890dff01d72bb6eb73b409d66f,15149,2019-07-04T00:57:36.000+0000,1972-03-30,M,M,Cameron,Vasquez,"List(95932 Gary Ridges, Los Angeles, CA, 90018)",new


Before moving on to compiling this logic in the upsert function (Structured Streaming doesn't support non-time-based windows), let's delve into the logic for parsing deletes.

#### Handling deletes

This section concerns CDF and PII, as compliance with GDPR and CCPA mandates the right for customers to request deletion of their data from the Data Lake. To ensure comprehensive coverage, I'll adhere to the following requirements:

- Delete all PII in the users table.
- Erase links between pseudonymized keys and natural keys.

Given that there will be only one delete request, a straightforward filter will suffice. I'll impose a deadline of 30 days after the request is inserted.

In [0]:
delete_requests = (users_df.filter("update_type = 'delete'")
                           .select(f.col("alt_id"),
                                   f.col("user_id"),
                                   f.col('updated').alias("requested"),
                                   f.date_add("requested", 30).alias("deadline"), 
                                   f.lit("requested").alias("status"))
                   )

display(delete_requests.limit(10))

alt_id,user_id,requested,deadline,status
6af71c730bf30207f0f5e2ad628ec8d177579f83eb79bfd2b1915b752b47613b,42643,2019-12-09T01:10:24.000+0000,2020-01-08,requested
70744608949541b904257a726fbcad36eaba54acfdf31bd9d8d298d6f909def3,28776,2019-12-15T10:46:24.000+0000,2020-01-14,requested
5178bebcc29bcceaae206fb6dc48df5a153181ba8650136c6956e51a3b07cc97,19198,2019-12-13T09:36:00.000+0000,2020-01-12,requested
98f50739ae3a94ab454a3bbe72a4bcce6efdde04cd99de0ae12579e8e76dbbad,39676,2019-12-08T23:21:36.000+0000,2020-01-07,requested
c84576c8b9421dc3cee344ddc3f78d05c81b330ea1f61ecdab2b25bc2d590e7b,28521,2019-12-16T00:12:48.000+0000,2020-01-15,requested
918e542e24afecf3a2f65183a6c61f5a0e89a19c9cb7154d8969d9de040bf28e,28359,2019-12-16T16:38:24.000+0000,2020-01-15,requested


With this dataframe, I have all the necessary information to sever any link between data and the user while retaining a record of the process. Given the 30-day deadline, this process can be executed periodically or on-demand, separate from the main pipeline.

### Processing Users

In [0]:
def users_merge_crt_dlt(microBatchDF, batchId):

    window = Window.partitionBy("alt_id").orderBy(f.col("updated").desc())

    (microBatchDF
        .filter(f.col("update_type").isin(["new", "update"]))
        .withColumn("rank", f.rank().over(window))
        .filter("rank == 1")
        .drop("rank")
        .createOrReplaceTempView("ranked_updates"))

    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.users u
        USING ranked_updates r
        ON u.alt_id=r.alt_id
        WHEN MATCHED AND u.updated < r.updated
          THEN UPDATE SET *
        WHEN NOT MATCHED
          THEN INSERT *
    """)

    (microBatchDF
        .filter("update_type = 'delete'")
        .select(f.col("alt_id"), 
                f.col("updated").alias("requested"), 
                f.date_add("updated", 30).alias("deadline"), 
                f.lit("requested").alias("status"))
        .write
        .format("delta")
        .mode("append")
        .option("txnVersion", batchId)
        .option("txnAppId", "batch_rank_upsert")
        .saveAsTable("silver.delete_requests"))
    
def process_users():

    spark.sql(f"CREATE TABLE IF NOT EXISTS silver.users (alt_id STRING, user_id INTEGER, dob DATE, sex STRING, gender STRING, first_name STRING, last_name STRING, street_address STRING, city STRING, state STRING, zip INT, updated TIMESTAMP) USING DELTA")

    users_schema = """
        user_id INTEGER, 
        update_type STRING, 
        timestamp FLOAT, 
        dob STRING, 
        sex STRING, 
        gender STRING, 
        first_name STRING, 
        last_name STRING, 
        address STRUCT<
            street_address: STRING, 
            city: STRING, 
            state: STRING, 
            zip: INT
    >"""
    
    (spark.readStream
        .table("bronze.bronze")
        .filter("topic = 'user_info'")
        .dropDuplicates()
        .select(f.from_json(f.col("value").cast("string"), users_schema).alias("users"))
        .select("users.*")
        .select(f.sha2(f.concat(f.col("user_id"), f.lit(f"{salt}")), 256).alias("alt_id"),
                f.col("user_id"),
                f.col('timestamp').cast("timestamp").alias("updated"),
                f.to_date('dob','MM/dd/yyyy').alias('dob'),
                'sex', 
                'gender',
                'first_name',
                'last_name',
                'address.*', 
                'update_type')
        .writeStream
        .foreachBatch(users_merge_crt_dlt)
        .outputMode("update")
        .option("checkpointLocation", f"{checkpoint_path}/users")
        .trigger(availableNow=True)
        .start()
        .awaitTermination())
        
def reprocess_users():
    dbutils.fs.rm(f"{checkpoint_path}/users", True)
    dbutils.fs.rm(f"{main_path}/delete_requests", True)

    spark.sql("drop table if exists silver.users")
    spark.sql("drop table if exists silver.delete_requests")

    process_users()    


In [0]:
process_users()

### Verifying Results

In [0]:
# Display a sample of the data from the Users table
display(spark.read.table("silver.users").limit(10))

# Print total count of rows in the Users table
print("")
print(f"The User table has a total of {spark.table('silver.users').count()} rows")
print("")

# Display a sample of the data from the delete_requests table
display(spark.read.table("silver.delete_requests").limit(10))

# Print total count of rows in the delete_requests table
print("")
print(f"The Delete_requests table has a total of {spark.table('silver.delete_requests').count()} rows")
print("")


alt_id,user_id,dob,sex,gender,first_name,last_name,street_address,city,state,zip,updated
0cac08c9ce7ea9534718f6f1047ed792015f9dc5aecc2e121e4c80c68ae7bb64,19103,1984-01-18,F,F,Leah,Walker,6973 Conner Viaduct,Burbank,CA,91521,2019-12-15T08:34:08.000+0000
2af970fb2c6d514d11c4738f07306469b05681eb57817b1ad1566ce85a7676e1,19888,1936-09-06,M,M,Craig,Nunez,882 Pamela Stravenue Suite 687,Los Alamitos,CA,90720,2019-12-15T04:22:24.000+0000
3de9b9bd48d773210bb42f9b566665356e73e1b35ae5584eec5824d57042fc89,48913,1949-03-28,M,M,Joseph,Aguilar,80176 Jeremy Harbor Apt. 565,Los Angeles,CA,90033,2019-12-06T18:33:36.000+0000
43a9a48524fca3e47fd42d2669013c49c567cedeeb506d1fd7a751f75f1c8687,40559,1952-07-04,M,M,Joseph,Kane,56860 Fernando Lock Apt. 520,Glendale,CA,91204,2019-12-12T04:05:20.000+0000
4a63161d94e3318c38193842d17054d239dc9b9261ea1cbac66de86f30285214,33733,1965-10-30,M,M,Gary,Perry,72915 Ross Throughway,Rowland Heights,CA,91748,2019-12-02T12:35:12.000+0000
5ce6620b7c357cd69446a87f463a4c40b100364e706a661dcec12390b9852d53,19198,1950-04-18,M,M,Benjamin,Poole,58054 Cynthia Rest,La Crescenta,CA,91214,2019-07-29T13:52:00.000+0000
6ae2eb91031e5980c271eff64aeea2bf6c0127f380c0f93dd5077228c562bd61,25477,1967-11-13,M,M,Richard,Douglas,993 Smith Mountain Apt. 056,Pomona,CA,91768,2019-12-03T21:22:08.000+0000
745c35775cfdf69e9a190c26a173285712c1929c5faa5087ce2855e9043b33f4,11745,1955-06-29,F,F,Shannon,Reyes,3105 Bowers Expressway,Long Beach,CA,90808,2019-12-08T00:06:24.000+0000
7e0504eb0e145b999b39081c43ac2cc7f26ec9ba8e4b06d5337be256c28a72ba,25143,1944-08-13,M,M,Lee,Brown,81022 Gibson Fords,Palmdale,CA,93552,2019-12-12T20:58:40.000+0000
7fde9776f2be9726369921eb480568b3e726110af056b36fe7a3ffdf4f9f6b79,37012,1925-02-26,M,M,Joshua,Perry,3456 Snyder Coves,Torrance,CA,90501,2019-08-10T22:06:56.000+0000



The User table has a total of 100 rows



alt_id,requested,deadline,status
cba874842f4dd76d361c730e6c6080f130d86f149c1f6faeaf3d0dc605c85d17,2019-12-16T00:12:48.000+0000,2020-01-15,requested
8bb09f205d023251e392804a20a7294f5a424c9c295054c8f8f293c76dd205a5,2019-12-15T10:46:24.000+0000,2020-01-14,requested
39e5d7cc7a7354cab35d1ae9b4395e2303337d4a37b26f56071462c3a39b9b95,2019-12-09T01:10:24.000+0000,2020-01-08,requested
22cd18aa025b6e0b792b992b8ce7573c1a29a651cc72ed158c80daabbe660101,2019-12-16T16:38:24.000+0000,2020-01-15,requested
5d78cff0ca0f659e86ef1118bbf5b322731890ac20e0079fa799c44c80f1feff,2019-12-08T23:21:36.000+0000,2020-01-07,requested
5ce6620b7c357cd69446a87f463a4c40b100364e706a661dcec12390b9852d53,2019-12-13T09:36:00.000+0000,2020-01-12,requested



The Delete_requests table has a total of 6 rows



In a production environment, access to the Users table would be restricted, and only a processed view with classified PII information would be available for access.

## Create User_bins 

Now that deletion is sorted, let's revisit measures for handling PII data.<br>
As mentioned in the comment after processing the users table, it's imperative to restrict access to the table or its PII information.<br>
The best solution for this problem would be a dynamic view, where I could just obscure some columns with the code below: <br>
```sql
SELECT CASE WHEN is_member('grup_a') THEN column ELSE 'REDACTED' END AS column
```
Or rows with this:<br>
```sql
WHERE CASE WHEN is_member('group_b') THEN TRUE ELSE column = "that_filter" AND date_column > "YYYY-MM-DD" END
```
<br>
However, I don't have access to these features in community mode. Therefore, the solution is to create a users table with minimal PII while maintaining its utility.

The plan here is:
- Delete the columns: first name, last name, date of birth, and street address (zip code might still be an issue, but the next step should help mitigate this)
- Generalize age

In [0]:
# Function to Generalize Age
def age_bins(dob_col):
    age_col = f.floor(f.months_between(f.current_date(), dob_col)/12).alias("age")
    
    return (f.when((age_col < 18), "under 18")
             .when((age_col >= 18) & (age_col < 25), "18-25")
             .when((age_col >= 25) & (age_col < 35), "25-35")
             .when((age_col >= 35) & (age_col < 45), "35-45")
             .when((age_col >= 45) & (age_col < 55), "45-55")
             .when((age_col >= 55) & (age_col < 65), "55-65")
             .when((age_col >= 65) & (age_col < 75), "65-75")
             .when((age_col >= 75) & (age_col < 85), "75-85")
             .when((age_col >= 85) & (age_col < 95), "85-95")
             .when((age_col >= 95), "95+")
             .otherwise("invalid age").alias("age"))

# Simple Merge function
def user_bins_merge_data(df, batch_id):
    (df.drop("updated")
     .createOrReplaceTempView("stream_updates"))

    df.sparkSession.sql("""
          MERGE INTO silver.user_bins a
          USING stream_updates b
          ON a.alt_id=b.alt_id
          WHEN MATCHED THEN
            UPDATE SET *
          WHEN NOT MATCHED THEN
            INSERT * 
          """)


def process_user_bins():

    (spark.sql("""
      CREATE TABLE IF NOT EXISTS
      silver.user_bins
       (alt_id   STRING,
        user_id  INTEGER,
        age      STRING,
        sex      STRING,
        gender   STRING,
        city     STRING,
        state    STRING,
        zip      INTEGER) 
      USING DELTA
    """))

    (spark.readStream
          .table("silver.users")
          .select("alt_id",
                  "user_id", 
                  age_bins(f.col("dob")),
                  "sex",
                  "gender", 
                  "city", 
                  "state",
                  "zip",
                  "updated")
          .withWatermark("updated", "30 seconds")
          .writeStream
              .foreachBatch(user_bins_merge_data)
              .option("checkpointLocation", f"{checkpoint_path}/user_bins")
              .outputMode("update")
              .trigger(availableNow=True)
              .queryName("user_bins")
              .start()
              .awaitTermination())

def reprocess_user_bins():
    dbutils.fs.rm(f"{checkpoint_path}/user_bins", True)

    spark.sql("drop table if exists silver.user_bins")

    process_user_bins()


In [0]:
process_user_bins()

display(spark.table("silver.user_bins").limit(10))

print("")
print(f"The user_bins table has {spark.table('silver.user_bins').count()} rows")
print("")

alt_id,user_id,age,sex,gender,city,state,zip
0cac08c9ce7ea9534718f6f1047ed792015f9dc5aecc2e121e4c80c68ae7bb64,19103,35-45,F,F,Burbank,CA,91521
2af970fb2c6d514d11c4738f07306469b05681eb57817b1ad1566ce85a7676e1,19888,85-95,M,M,Los Alamitos,CA,90720
3de9b9bd48d773210bb42f9b566665356e73e1b35ae5584eec5824d57042fc89,48913,75-85,M,M,Los Angeles,CA,90033
43a9a48524fca3e47fd42d2669013c49c567cedeeb506d1fd7a751f75f1c8687,40559,65-75,M,M,Glendale,CA,91204
4a63161d94e3318c38193842d17054d239dc9b9261ea1cbac66de86f30285214,33733,55-65,M,M,Rowland Heights,CA,91748
5ce6620b7c357cd69446a87f463a4c40b100364e706a661dcec12390b9852d53,19198,65-75,M,M,La Crescenta,CA,91214
6ae2eb91031e5980c271eff64aeea2bf6c0127f380c0f93dd5077228c562bd61,25477,55-65,M,M,Pomona,CA,91768
745c35775cfdf69e9a190c26a173285712c1929c5faa5087ce2855e9043b33f4,11745,65-75,F,F,Long Beach,CA,90808
7e0504eb0e145b999b39081c43ac2cc7f26ec9ba8e4b06d5337be256c28a72ba,25143,75-85,M,M,Palmdale,CA,93552
7fde9776f2be9726369921eb480568b3e726110af056b36fe7a3ffdf4f9f6b79,37012,95+,M,M,Torrance,CA,90501



The user_bins table has 100 rows



## Create Workout_user_summary

This is the last object of this pipeline, this view will use all the data ingested to offer a wide range of information of every user.

In [0]:
display(
    spark.table("silver.completed_workouts")
    .join(spark.table("silver.user_lookup")
          .select("device_id", "user_id"),
          ["user_id"])
    .select("user_id", "device_id", "session_id", "start_time", "end_time")
    .join(spark.table("silver.heart_rate")
          .select("device_id", "heartrate"),
          ["device_id"])
    .groupBy("user_id", "device_id", "session_id")
        .agg(f.count(f.col("heartrate")).alias("avg_heartrate"),
             f.min(f.col("start_time")).alias("start_time"),
             f.max(f.col("end_time")).alias("end_time"))
    .where((f.col("device_id") == 183816))
    .orderBy(f.col("start_time").desc())
    .limit(10)
)

user_id,device_id,session_id,avg_heartrate,start_time,end_time
29213,183816,320,135718,2019-12-15T10:56:29.000+0000,2019-12-15T11:28:51.000+0000
29213,183816,319,135718,2019-12-15T10:20:14.000+0000,2019-12-15T10:45:26.000+0000
29213,183816,318,135718,2019-12-14T08:06:32.000+0000,2019-12-14T08:37:31.000+0000
29213,183816,317,135718,2019-12-14T07:17:47.000+0000,2019-12-14T08:03:30.000+0000
29213,183816,316,135718,2019-12-13T18:41:53.000+0000,2019-12-13T19:01:42.000+0000
29213,183816,315,135718,2019-12-13T17:42:12.000+0000,2019-12-13T18:31:46.000+0000
29213,183816,314,135718,2019-12-12T15:10:22.000+0000,2019-12-12T15:58:08.000+0000
29213,183816,313,135718,2019-12-12T14:12:02.000+0000,2019-12-12T15:02:39.000+0000
29213,183816,312,135718,2019-12-11T18:00:47.000+0000,2019-12-11T18:46:47.000+0000
29213,183816,311,135718,2019-12-10T07:33:27.000+0000,2019-12-10T08:09:09.000+0000


In [0]:
%sql

DROP VIEW IF EXISTS gold.workout_user_summary;

CREATE VIEW gold.workout_user_summary AS (
SELECT
    wh.user_id,
    wh.min_heartrate,
    wh.avg_heartrate,
    wh.max_heartrate,
    wh.start_time,
    wh.end_time,
    ub.age,
    ub.sex,
    ub.gender,
    ub.city,
    ub.state,
    ub.zip
FROM (SELECT
          cw.user_id,
          ul.device_id,
          cw.session_id,
          MIN(hr.heartrate) AS min_heartrate,
          AVG(hr.heartrate) AS avg_heartrate,
          MAX(hr.heartrate) AS max_heartrate,
          MIN(cw.start_time) AS start_time,
          MAX(cw.end_time) AS end_time
      FROM silver.completed_workouts cw
      JOIN silver.user_lookup ul ON cw.user_id = ul.user_id
      JOIN silver.heart_rate hr ON ul.device_id = hr.device_id
      GROUP BY cw.user_id, ul.device_id, cw.session_id
) wh
JOIN silver.user_bins ub ON wh.user_id = ub.user_id
);

In [0]:
display(spark.table("gold.workout_user_summary").limit(10))

print("")
print(f"The workout_user_summary table has {spark.table('gold.workout_user_summary').count()} rows")
print("")

user_id,min_heartrate,avg_heartrate,max_heartrate,start_time,end_time,age,sex,gender,city,state,zip
28588,45.24741098794139,87.02458361971546,168.72168469775733,2019-12-12T12:31:59.000+0000,2019-12-12T13:09:23.000+0000,35-45,M,M,Arcadia,CA,91006
30088,43.03108584578808,86.08719253950291,177.9334884047032,2019-12-07T17:39:56.000+0000,2019-12-07T18:15:44.000+0000,25-35,M,M,Los Angeles,CA,90009
30088,43.03108584578808,86.08719253950291,177.9334884047032,2019-12-02T17:27:12.000+0000,2019-12-02T18:19:53.000+0000,25-35,M,M,Los Angeles,CA,90009
30088,43.03108584578808,86.08719253950291,177.9334884047032,2019-12-03T07:26:47.000+0000,2019-12-03T08:30:21.000+0000,25-35,M,M,Los Angeles,CA,90009
45875,28.752025520276057,64.2687462790618,127.85422558490134,2019-12-02T14:28:03.000+0000,2019-12-02T15:00:47.000+0000,85-95,M,M,El Monte,CA,91731
45875,28.752025520276057,64.2687462790618,127.85422558490134,2019-12-16T14:16:50.000+0000,2019-12-16T15:07:13.000+0000,85-95,M,M,El Monte,CA,91731
45875,28.752025520276057,64.2687462790618,127.85422558490134,2019-12-08T12:42:47.000+0000,2019-12-08T13:50:37.000+0000,85-95,M,M,El Monte,CA,91731
45875,28.752025520276057,64.2687462790618,127.85422558490134,2019-12-05T17:54:06.000+0000,2019-12-05T19:13:31.000+0000,85-95,M,M,El Monte,CA,91731
45875,28.752025520276057,64.2687462790618,127.85422558490134,2019-12-14T13:16:32.000+0000,2019-12-14T13:37:23.000+0000,85-95,M,M,El Monte,CA,91731
45875,28.752025520276057,64.2687462790618,127.85422558490134,2019-12-10T10:19:19.000+0000,2019-12-10T10:32:08.000+0000,85-95,M,M,El Monte,CA,91731



The workout_user_summary table has 757 rows



## Propagating deletes

Now to propagate these deletes throughout the pipeline, I'll tip my toes into CDF configuration. <br>
I'll use the `user_lookup` table to be the source of the deletes since it has every key used across the pipeline, the process will be easier. <br>

---

> *I could add comments to the table to make it easier to understand, but the only method I found toggles comments for the entire notebook*<br>
> *Since this isn't crucial, I'll just record it here and research further in the future* <br>
> `%sql` <br>
> `SET spark.databricks.delta.commitInfo.userMetadata=Deletes committed` <br>
> `.option("userMetadata", "Write your message here")` <br>

Keeping in mind that, tables that were not created with CDF enabled will not have it turned on by default. Although it can be altered to capture changes it'll version the table. Also, Cloning a CDF table don't carry the change feed.<br>
<br> 
The strategy will be:
- Turn the `user_lookup` table into CDF
- Use the `delete_requests` to delete the user from the `user_lookup`
- Use the change feed of the `user_lookup` to get which users where deleted
- Propagate the deletes through the pipeline
- Update the `status` column from the `delete_requests`

In [0]:
# Enable CDF
spark.sql("ALTER TABLE silver.user_lookup SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")

# Get the table version which CDF was enabled
start_version = spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

# Delete the data from the User_lookup table using the Delete_requests table
spark.sql("DELETE FROM silver.user_lookup WHERE alt_id IN (SELECT alt_id FROM silver.delete_requests WHERE status = 'requested')")

Out[72]: DataFrame[num_affected_rows: bigint]

To verify the accuracy of these changes, we should find a reference to enabling CDF in the `describe`, while the change feed should display the removal of 6 users.

In [0]:
spark.sql("DESCRIBE HISTORY silver.user_lookup")

display(
    spark.readStream
    .option("readChangeFeed", "true")
    .option("startingVersion", start_version)
    .table("silver.user_lookup")
    .filter("_change_type = 'delete'")
)

alt_id,device_id,mac_address,user_id,_change_type,_commit_version,_commit_timestamp
5ce6620b7c357cd69446a87f463a4c40b100364e706a661dcec12390b9852d53,181512,de:34:11:f4:90:2d,19198,delete,2,2024-05-29T14:48:50.000+0000
22cd18aa025b6e0b792b992b8ce7573c1a29a651cc72ed158c80daabbe660101,198332,f2:e1:1c:84:56:52,28359,delete,2,2024-05-29T14:48:50.000+0000
cba874842f4dd76d361c730e6c6080f130d86f149c1f6faeaf3d0dc605c85d17,148312,34:12:9e:9c:72:28,28521,delete,2,2024-05-29T14:48:50.000+0000
8bb09f205d023251e392804a20a7294f5a424c9c295054c8f8f293c76dd205a5,161343,e6:de:fe:e2:2f:4d,28776,delete,2,2024-05-29T14:48:50.000+0000
5d78cff0ca0f659e86ef1118bbf5b322731890ac20e0079fa799c44c80f1feff,169221,3a:95:59:f8:db:29,39676,delete,2,2024-05-29T14:48:50.000+0000
39e5d7cc7a7354cab35d1ae9b4395e2303337d4a37b26f56071462c3a39b9b95,155761,d2:1f:ef:4e:c1:9b,42643,delete,2,2024-05-29T14:48:50.000+0000


In [0]:
# Stop the display of the stream table
for stream in spark.streams.active:
    stream.stop()
    stream.awaitTermination()

In [0]:
def broadcast_deletes(microBatchDF, batchId):
    
    (microBatchDF
        .filter("_change_type = 'delete'")
        .createOrReplaceTempView("deletes"))
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.workouts w
        USING deletes d
        ON w.user_id = d.user_id
        WHEN MATCHED
            THEN DELETE
    """)
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.completed_workouts cw
        USING deletes d
        ON cw.user_id = d.user_id
        WHEN MATCHED
            THEN DELETE
    """)
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.heart_rate hr
        USING deletes d
        ON hr.device_id = d.device_id
        WHEN MATCHED
            THEN DELETE
    """)
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.gym_logs gl
        USING deletes d
        ON gl.mac = d.mac_address
        WHEN MATCHED
            THEN DELETE
    """)
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.users u
        USING deletes d
        ON u.alt_id = d.alt_id
        WHEN MATCHED
            THEN DELETE
    """)

    microBatchDF.sparkSession.sql("""
        DELETE FROM silver.user_bins
        WHERE user_id IN (SELECT user_id FROM deletes)
    """)

    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.delete_requests dr
        USING deletes d
        ON d.alt_id = dr.alt_id
        WHEN MATCHED
          THEN UPDATE SET status = "deleted"
    """)


def process_deletes():
    #counting before purge
    before_count_workouts = spark.table("silver.workouts").count()
    before_count_completed_workouts = spark.table("silver.completed_workouts").count()
    before_count_gym_logs = spark.table("silver.gym_logs").count()
    before_count_heart_rate = spark.table("silver.heart_rate").count()
    before_count_users = spark.table("silver.users").count()
    before_count_user_bins = spark.table("silver.user_bins").count()

    spark.sql("DELETE FROM silver.user_lookup WHERE alt_id IN (SELECT alt_id FROM silver.delete_requests WHERE status = 'requested')")

    (spark.readStream
          .format("delta")
          .option("readChangeFeed", "true")
          .option("startingVersion", start_version)
          .table("silver.user_lookup")
          .writeStream
          .foreachBatch(broadcast_deletes)
          .outputMode("update")
          .option("checkpointLocation", f"{checkpoint_path}/deletes")
          .trigger(availableNow=True)
          .start()
          .awaitTermination()
    )

    #counting after purge
    uptodate_count_workouts = spark.table("silver.workouts").count()
    uptodate_count_completed_workouts = spark.table("silver.completed_workouts").count()
    uptodate_count_gym_logs = spark.table("silver.gym_logs").count()
    uptodate_count_heart_rate = spark.table("silver.heart_rate").count()
    uptodate_count_users = spark.table("silver.users").count()
    uptodate_count_user_bins = spark.table("silver.user_bins").count()

    #printing values obtained before and how many were purged in each table
    print(f"Before broadcasting the deletes, the table Workouts had {before_count_workouts} rows. {before_count_workouts - uptodate_count_workouts} rows were deleted, remaining now {uptodate_count_workouts}")
    print(" ")
    print(f"Before broadcasting the deletes, the table Completed_workouts had {before_count_completed_workouts} rows. {before_count_completed_workouts - before_count_completed_workouts} rows were deleted, remaining now {uptodate_count_completed_workouts}")
    print(" ")
    print(f"Before broadcasting the deletes, the table Gym_logs had {before_count_gym_logs} rows. {before_count_gym_logs - uptodate_count_gym_logs} rows were deleted, remaining now {uptodate_count_gym_logs}")
    print(" ")
    print(f"Before broadcasting the deletes, the table Heart_rate had {before_count_heart_rate} rows. {before_count_heart_rate - uptodate_count_heart_rate} rows were deleted, remaining now {uptodate_count_heart_rate}")
    print(" ")
    print(f"Before broadcasting the deletes, the table Users had {before_count_users} rows. {before_count_users - uptodate_count_users} rows were deleted, remaining now {uptodate_count_users}")
    print(" ")
    print(f"Before broadcasting the deletes, the table User_bins had {before_count_user_bins} rows. {before_count_user_bins - uptodate_count_user_bins} rows were deleted, remaining now {uptodate_count_user_bins}")
    print(" ")


In [0]:
process_deletes()

Before broadcasting the deletes, the table Workouts had 1514 rows. 54 rows were deleted, remaining now 1460
 
Before broadcasting the deletes, the table Completed_workouts had 757 rows. 0 rows were deleted, remaining now 730
 
Before broadcasting the deletes, the table Gym_logs had 314 rows. 4 rows were deleted, remaining now 310
 
Before broadcasting the deletes, the table Heart_rate had 9032373 rows. 445848 rows were deleted, remaining now 8586525
 
Before broadcasting the deletes, the table Users had 100 rows. 6 rows were deleted, remaining now 94
 
Before broadcasting the deletes, the table User_bins had 100 rows. 6 rows were deleted, remaining now 94
 


**Deletes Closing Comments** <br>
<br>
All the delete requests were peformed on the Silver layer. <br>
Note that in Delta Lake, data isn't deleted immediately; a new file marks the deletion. Using `RESTORE`, deleted data can still be accessed, so using the `VACUUM` command is crucial. By default, all historical versions older then 7 days are deleted.<br>
<br>
For the Gold layer, enabling CDF on all tables (more about that below) would propagate the deletes downstream, but its impact on performance is unclear for me. <br>
In production, the delete broadcast and views update should be linked to prevent unwanted data from appearing. <br>
<br>
The Bronze layer still has the data, but without the user_lookup table, it can't be linked, and since the checkpoint files are intact, the deleted data won't be reingested. <br>
<br>
<br>
**About CDF**<br>
<br>
From what I understand, if all tables had CDF enabled, the delete broadcast would only need to happen at the first step of each stream, and this change would propagate downstream. Unfortunately, I learned this too late, and redesigning the entire pipeline is out of scope now. <br>
The command to enable CDF is `spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", True).`



## Portfolio Closing Comments

Well, that wraps up what I had in mind for these notebooks.<br>
I trust I've been able to showcase some of my skills. If you have any questions or want to connect further, don't hesitate to reach out to me on LinkedIn www.linkedin.com/in/luiz-felipe-borges-37256216b or via email at felipe.borges_11@hotmail.com.<br>
Looking forward to connecting!