
## Overview:

In this notebook, we embark on an ETL (Extract, Transform, Load) process aimed at refining raw data into a polished format[bronze - silver] and [silver - gold], ready for consumption in Power BI dashboards.

Our primary focus lies on two distinct datasets: `userdata` and `devicedata`.

1. **`userdata` Dataset:**
   This dataset provides essential information about users' health and lifestyle. It includes fields such as `userid`, `gender`, `age`, `height`, `weight`, `smoker`, `familyhistory`, `cholestlevs`, `bp`, and `risk`. These attributes capture vital health indicators and demographics necessary for comprehensive analysis.

2. **`devicedata` Dataset:**
   The `devicedata` dataset complements the user-centric data with insights derived from IoT devices. It includes metrics like `calories_burnt`, `miles_walked`, `num_steps`, and `timestamp`. Additionally, it contains device-specific information such as `device_id` and `user_id`, facilitating correlation between user activity and device usage.

## ETL Process Overview:
- **Extract:** Retrieve raw data from the provided Databricks sample datasets, preparing it for transformation.
- **Transform:**
  - **Cleansing:** Handle missing or inconsistent data.
  - **Aggregating:** Aggregate data at the hourly level, breaking down timestamp into date and time.
  - **Enriching:** Enhance data quality and utility.
- **Load:** Load the transformed data into a Delta Lake tables optimized for efficient querying and visualization.

## Aggregation at Hourly Level:
We will aggregate the data to an hourly granularity, allowing for more efficient analysis and visualization. This aggregation will enable us to observe trends and patterns in user activity and health metrics over time, providing valuable insights for decision-making.

## Goals:
- Achieve data consistency and integrity across both datasets.
- Enhance data quality through cleaning and normalization procedures.
- Aggregate data at the hourly level for improved analysis granularity.
- Optimize data structures for efficient querying and visualization in Power BI.

By following this journey, we aim to unlock the full potential of Databricks' IoT datasets, empowering data-driven decision-making and actionable insights in health and wellness analysis. Let's dive in and refine this raw data into a valuable asset for Power BI dashboards!

# Data Extraction from Databricks Datasets

## Extraction from `dbfs:/databricks-datasets/iot-stream/data-device/`

### Brief Info about `%fs ls dbfs:/databricks-datasets/iot-stream/data-device/`:

- The `%fs ls` command is used to list files and directories in Databricks File System (DBFS).
- `dbfs:/databricks-datasets/iot-stream/data-device/` is the directory path from which we are extracting data.
- This directory likely contains raw data files related to IoT devices, which we will extract for further processing.
- The data files may include metrics such as device IDs, user IDs, timestamps, and various sensor readings.

## Extraction Process:

1. Use the `%fs ls` command to list files and directories in `dbfs:/databricks-datasets/iot-stream/data-device/`.
2. Identify the relevant data files needed for our analysis.
3. Extract the selected data files for further processing and analysis.

The extraction step is crucial as it forms the foundation for subsequent data transformation and loading processes. By accurately extracting the required data, we ensure that our analysis is based on reliable and relevant information.


In [None]:
%fs ls dbfs:/databricks-datasets/iot-stream/data-device/

path,name,size,modificationTime
dbfs:/databricks-datasets/iot-stream/data-device/part-00000.json.gz,part-00000.json.gz,2610922,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00001.json.gz,part-00001.json.gz,2612478,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00002.json.gz,part-00002.json.gz,2619023,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00003.json.gz,part-00003.json.gz,2620016,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00004.json.gz,part-00004.json.gz,2618699,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00005.json.gz,part-00005.json.gz,2619772,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00006.json.gz,part-00006.json.gz,2619027,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00007.json.gz,part-00007.json.gz,2619832,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00008.json.gz,part-00008.json.gz,2617893,1532465738000
dbfs:/databricks-datasets/iot-stream/data-device/part-00009.json.gz,part-00009.json.gz,2619764,1532465738000


In [None]:
deviceData_raw = spark.read.json("dbfs:/databricks-datasets/iot-stream/data-device")

In [None]:
deviceData_raw.printSchema()

root
 |-- calories_burnt: double (nullable = true)
 |-- device_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- miles_walked: double (nullable = true)
 |-- num_steps: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- value: string (nullable = true)



In [None]:
deviceData_raw.show()

+--------------+---------+------+------------+---------+--------------------+-------+--------------------+
|calories_burnt|device_id|    id|miles_walked|num_steps|           timestamp|user_id|               value|
+--------------+---------+------+------------+---------+--------------------+-------+--------------------+
|         250.7|        5|950000|       2.507|     5014|2018-07-22 06:44:...|     24|{"user_id": 24, "...|
|         126.8|       13|950001|       1.268|     2536|2018-07-21 01:18:...|     24|{"user_id": 24, "...|
|         365.7|        5|950002|       3.657|     7314|2018-07-24 12:42:...|      4|{"user_id": 4, "c...|
|         489.8|       10|950003|       4.898|     9796|2018-07-23 22:56:...|     22|{"user_id": 22, "...|
|        280.15|       13|950004|      2.8015|     5603|2018-07-21 13:50:...|     34|{"user_id": 34, "...|
|         591.6|        1|950005|       5.916|    11832|2018-07-23 11:05:...|     21|{"user_id": 21, "...|
|         548.1|       12|950006|    

In [None]:
deviceData_raw.select('value').show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"user_id": 24, "calories_burnt": 250.6999969482422, "num_steps": 5014, "miles_walked": 2.506999969482422, "time_stamp": "2018-07-22 06:44:25.732267", "device_id": 5}   |
|{"user_id": 24, "calories_burnt": 126.80000305175781, "num_steps": 2536, "miles_walked": 1.2680000066757202, "time_stamp": "2018-07-21 01:18:10.732306", "device_id": 13}|
|{"user_id": 4, "calories_burnt": 365.70001220703125, "num_steps": 7314, "miles_walked": 3.6570000648498535, "time_stamp": "2018-07-24 12:42


Following code snippet demonstrates the process of extracting data from JSON values in a Spark DataFrame and performing some data transformations.

- We first define a schema for the JSON data using `StructType` and `StructField`.
- Then, we use the `from_json` function to extract JSON values into separate columns based on the defined schema.
- We select relevant columns from the extracted JSON data.
- Additionally, we perform additional transformations:
  - Extracting the date and time from a timestamp column using `to_date` and `date_format`.
- Finally, we select the desired columns and drop the JSON object column.

This code prepares the data for further analysis or visualization, ensuring it's in a structured format suitable for downstream processing.


In [None]:
from pyspark.sql.functions import col, to_date, date_format, hour, minute, second,  from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType

# Define schema for JSON data
schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("calories_burnt", FloatType()),
    StructField("num_steps", IntegerType()),
    StructField("miles_walked", FloatType()),
    StructField("time_stamp", TimestampType()),
    StructField("device_id", IntegerType())
])

# Extract JSON values into separate columns and select other columns
deviceData_json = deviceData_raw.select(
    from_json(col("value"), schema).alias("json_data")
).selectExpr(
    "json_data.*"
)

# Show DataFrame
deviceData_json.show(truncate=False)

deviceData_raw = deviceData_raw.withColumn("date", to_date(col("timestamp")))  # Extract date
deviceData_raw = deviceData_raw.withColumn("time", date_format(col("timestamp"), "HH:mm:ss"))  # Extract time

#json object shows simmilar data hence we can drop it 

deviceData = deviceData_raw.select(
    'device_id',
    'user_id',
    'id',
    'miles_walked',
    'num_steps',
    'calories_burnt',
    'date',
    'time')


+-------+--------------+---------+------------+--------------------------+---------+
|user_id|calories_burnt|num_steps|miles_walked|time_stamp                |device_id|
+-------+--------------+---------+------------+--------------------------+---------+
|24     |250.7         |5014     |2.507       |2018-07-22 06:44:25.732267|5        |
|24     |126.8         |2536     |1.268       |2018-07-21 01:18:10.732306|13       |
|4      |365.7         |7314     |3.657       |2018-07-24 12:42:53.732332|5        |
|22     |489.8         |9796     |4.898       |2018-07-23 22:56:23.732358|10       |
|34     |280.15        |5603     |2.8015      |2018-07-21 13:50:39.732385|13       |
|21     |591.6         |11832    |5.916       |2018-07-23 11:05:48.732412|1        |
|7      |548.1         |10962    |5.481       |2018-07-23 02:10:39.732438|12       |
|33     |272.4         |5448     |2.724       |2018-07-22 17:42:18.732465|4        |
|16     |381.85        |7637     |3.8185      |2018-07-21 12:21:2

The following code creates a temporary view named `deviceData` in Spark SQL for further analysis:


In [None]:
deviceData.createOrReplaceTempView('deviceData')

In [None]:
%sql
SELECT * FROM deviceData

device_id,user_id,id,miles_walked,num_steps,calories_burnt,date,time
5,24,950000,2.507,5014,250.7,2018-07-22,06:44:25
13,24,950001,1.268,2536,126.8,2018-07-21,01:18:10
5,4,950002,3.657,7314,365.7,2018-07-24,12:42:53
10,22,950003,4.898,9796,489.8,2018-07-23,22:56:23
13,34,950004,2.8015,5603,280.15,2018-07-21,13:50:39
1,21,950005,5.916,11832,591.6,2018-07-23,11:05:48
12,7,950006,5.481,10962,548.1,2018-07-23,02:10:39
4,33,950007,2.724,5448,272.4,2018-07-22,17:42:18
5,16,950008,3.8185,7637,381.85,2018-07-21,12:21:21
8,27,950009,5.8545,11709,585.44995,2018-07-22,12:40:55


CREATING BRONZE DELTA TABLE FOR deviceData

In [None]:
%sql
CREATE TABLE bronze_device 
USING DELTA
COMMENT 'Bronze table in the architecture'
TBLPROPERTIES ('type'='bronze')
AS 
SELECT * FROM deviceData;


num_affected_rows,num_inserted_rows


The following code uses the `%fs ls` magic command to list files and directories in the specified directory within Databricks File System (DBFS):



In [None]:
%fs ls dbfs:/databricks-datasets/iot-stream/data-user

path,name,size,modificationTime
dbfs:/databricks-datasets/iot-stream/data-user/userData.csv,userData.csv,1374,1532465793000



The following code snippet reads data from a CSV file into a DataFrame named `userData` using Apache Spark:



In [None]:
userData = spark.read.format('csv')\
    .option('header' , True)\
    .option('inferSchema' , True) \
    .load('dbfs:/databricks-datasets/iot-stream/data-user')

In [None]:
userData.show()

+------+------+---+------+------+------+-------------+-----------+------+----+
|userid|gender|age|height|weight|smoker|familyhistory|cholestlevs|    bp|risk|
+------+------+---+------+------+------+-------------+-----------+------+----+
|     1|     F| 40|    63|   100|     N|            Y|       High|  High|   5|
|     2|     M| 34|    69|   150|     N|            Y|     Normal|Normal| -10|
|     3|     F| 35|    60|   134|     N|            N|     Normal|Normal| -10|
|     4|     F| 38|    59|   131|     Y|            Y|       High|  High|  20|
|     5|     F| 29|    59|   102|     N|            N|       High|Normal|  10|
|     6|     M| 37|    73|   174|     Y|            N|       High|  High|  20|
|     7|     M| 25|    72|   171|     Y|            N|       High|  High|  20|
|     8|     M| 26|    75|   166|     N|            Y|       High|Normal|  10|
|     9|     F| 36|    64|   115|     N|            N|     Normal|Normal| -10|
|    10|     F| 39|    65|   123|     Y|            

In [None]:
userData.printSchema()                  

root
 |-- userid: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- familyhistory: string (nullable = true)
 |-- cholestlevs: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- risk: integer (nullable = true)




 
The following code creates a temporary view named `userData` in Spark SQL for further analysis:



In [None]:
userData.createOrReplaceTempView('userData')

In [None]:
%sql
SELECT * 
FROM userData 

userid,gender,age,height,weight,smoker,familyhistory,cholestlevs,bp,risk
1,F,40,63,100,N,Y,High,High,5
2,M,34,69,150,N,Y,Normal,Normal,-10
3,F,35,60,134,N,N,Normal,Normal,-10
4,F,38,59,131,Y,Y,High,High,20
5,F,29,59,102,N,N,High,Normal,10
6,M,37,73,174,Y,N,High,High,20
7,M,25,72,171,Y,N,High,High,20
8,M,26,75,166,N,Y,High,Normal,10
9,F,36,64,115,N,N,Normal,Normal,-10
10,F,39,65,123,Y,N,Normal,Normal,20


CREATING BRONZE DELTA TABLE FOR deviceData

In [None]:
%sql
CREATE TABLE bronze_user 
USING DELTA
COMMENT 'Bronze table in the architecture'
TBLPROPERTIES ('type'='bronze')
AS 
SELECT * FROM userData;


num_affected_rows,num_inserted_rows


**Data Ingestion and Transformation**

Data has been successfully ingested into the notebook. We've performed initial data quality checks and proceeded with the transformation process. Here's a summary of the steps taken so far:

1. **Ingestion:** Data has been ingested from the source, and initial exploration has been conducted to ensure successful ingestion.

2. **Data Quality Checks:** We've conducted preliminary data quality checks to identify any anomalies or inconsistencies in the dataset.

3. **Timestamp Column Transformation:** The timestamp column has been broken down into separate columns for date and time using PySpark DataFrames, facilitating easier analysis and manipulation.

4. **View Creation:** Views have been created out of selected columns, allowing for seamless integration of DataFrame operations with SQL queries. This marks the transition from the raw to the bronze layer of the data processing pipeline.

Next, we will proceed with maintaining aggregation at the hourly level in the bronze to silver layer, enabling more granular analysis and visualization of the data.

This Markdown cell provides a structured overview of the data processing steps performed, enhancing documentation and readability within the notebook.



<h2> Bronze to Silver </h2>

In [None]:
%sql
SHOW tables

database,tableName,isTemporary
default,bronze_device,False
default,bronze_user,False
,devicedata,True
,userdata,True


In [None]:
%sql
SELECT count(*) from bronze_device

count(1)
1000000


This SQL snippet first checks if a view named `agg_deviceData` exists and drops it if present. Then, it creates a new temporary view `agg_deviceData`, aggregating data from the `deviceData` DataFrame. The aggregation includes rounding the sum of miles walked, step count, and calories burnt to two decimal places, grouped by device ID, user ID, date, and hour extracted from the timestamp. This aggregation enhances data granularity for analysis. The view creation signifies the transition from the bronze to the silver layer of the data processing pipeline, facilitating further analysis and visualization at an hourly level.

In [None]:
%sql
DROP view IF EXISTS agg_deviceData;

CREATE temp view agg_deviceData AS (
SELECT
    device_id,
    user_id,
    ROUND(SUM(miles_walked), 2) AS miles_walked,
    ROUND(SUM(num_steps), 2) AS step_count,
    ROUND(SUM(calories_burnt), 2) AS calories_count,
    date,
    HOUR(time) AS hour
FROM 
    bronze_device
GROUP BY 
    device_id, user_id, date, HOUR(time)
)

In [None]:
%sql
SELECT count(*) as after_aggregation FROM agg_deviceData

after_aggregation
92342


In [None]:
%sql
SELECT * from agg_deviceData

device_id,user_id,miles_walked,step_count,calories_count,date,hour
9,32,18.3,36606,1830.3,2018-07-20,10
6,26,79.01,158024,7901.2,2018-07-21,18
14,23,32.39,64783,3239.15,2018-07-23,22
2,26,43.97,87940,4397.0,2018-07-23,2
10,29,54.0,108006,5400.3,2018-07-22,19
14,8,12.92,25831,1291.55,2018-07-20,5
11,13,53.6,107209,5360.45,2018-07-24,13
9,7,49.48,98961,4948.05,2018-07-20,11
1,4,25.06,50120,2506.0,2018-07-20,14
2,31,29.12,58247,2912.35,2018-07-20,16


CREATING SILVER DELTA TABLE FOR deviceData

In [None]:
%sql
CREATE TABLE silver_device
USING DELTA
COMMENT 'Silver table in the architecture'
TBLPROPERTIES ('type'='silver')
AS 
SELECT * FROM agg_deviceData;

num_affected_rows,num_inserted_rows


Following SQL command performs the following processing steps:
- It calculates the BMI for each record in the `userdata` table by applying the formula (weight / height^2) * 703.
- It categorizes the BMI values into different categories ('Underweight', 'Normal Weight', 'Overweight', 'Obese') based on standard BMI ranges.
- The resulting dataset is stored in a temporary view named `calc_user`.

This view creation enables seamless integration of BMI calculations and categorizations into subsequent data analysis tasks. By categorizing BMI, it provides valuable insights into individuals' health status, aiding in personalized health assessments and interventions. Additionally, the use of temporary views ensures that the processing is efficient and does not persist beyond the current session, maintaining data integrity and reducing storage overhead. Overall, this SQL code enhances the functionality and analytical capabilities of the `userdata` dataset within the Databricks environment, empowering users to derive actionable insights for health-related decision-making.


In [None]:


%sql
DROP view if exists calc_user;

CREATE TEMP VIEW calc_user AS
(
-- Calculate BMI and categorize
SELECT *,
ROUND(((weight / height / height) * 703), 2) AS BMI,
CASE 
    WHEN ((weight / height / height) * 703) < 18.5 THEN 'Underweight'
    WHEN ((weight / height / height) * 703) >= 18.5 AND ((weight / height / height) * 703) < 24.9 THEN 'Normal Weight'
    WHEN ((weight / height / height) * 703) >= 24.9 AND ((weight / height / height) * 703) < 29.9 THEN 'Overweight'
    ELSE 'Obese'
END AS BMI_Category
FROM bronze_user
)


CREATING SILVER DELTA TABLE FOR userData

In [None]:
%sql
CREATE TABLE silver_user
USING DELTA
COMMENT 'Silver table in the architecture'
TBLPROPERTIES ('type'='silver')
AS 
SELECT * FROM calc_user;

num_affected_rows,num_inserted_rows


In [None]:
%sql
SELECT date, count(date) from silver_device
GROUP BY date;

date,count(date)
2018-07-20,18193
2018-07-24,15132
2018-07-22,18167
2018-07-23,18182
2018-07-21,18186
2018-07-19,4482


We're removing the data from July 19, 2018, because it's incomplete and not useful. After that, we'll make all the rest of the user data available for consumption


CREATING GOLD DELTA TABLE FOR deviceData for consumption and visualization


In [None]:
%sql
CREATE TABLE gold_device
USING DELTA
COMMENT 'Gold table in the architecture'
TBLPROPERTIES ('type'='gold')
AS 
SELECT * FROM silver_device
where date != '2018-07-19';

num_affected_rows,num_inserted_rows


CREATING GOLD DELTA TABLE FOR userData for consumption and visualization

In [None]:
%sql
CREATE TABLE gold_user
USING DELTA
COMMENT 'Gold table in the architecture'
TBLPROPERTIES ('type'='gold')
AS 
SELECT * FROM silver_user;

num_affected_rows,num_inserted_rows


In [None]:
%sql
SHOW tables;

database,tableName,isTemporary
default,bronze_device,False
default,bronze_user,False
default,gold_device,False
default,gold_user,False
default,silver_device,False
default,silver_user,False
,agg_devicedata,True
,calc_user,True
,devicedata,True
,userdata,True


In [None]:
#%fs rm -r dbfs:/user/hive/warehouse 