d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Raw to Bronze Pattern

## Notebook Objective

In this notebook we:
1. Ingest Raw Data
2. Augment the data with Ingestion Metadata
3. Batch write the augmented data to a Bronze Table

## Step Configuration

In [0]:
%run ./includes/configuration

### Display the Files in the Raw Path

In [0]:
display(dbutils.fs.ls(rawPath))

path,name,size,modificationTime
dbfs:/dacharya/dacharya/dataengineering/classic/raw/2022-08-21-04-12-31.txt,2022-08-21-04-12-31.txt,1071,1661055153000
dbfs:/dacharya/dacharya/dataengineering/classic/raw/2022-08-21-04-12-56.txt,2022-08-21-04-12-56.txt,5389,1661055179000


## Make Notebook Idempotent

In [0]:
dbutils.fs.rm(bronzePath, recurse=True)

## Ingest raw data

Next, we will read files from the source directory and write each line as a string to the Bronze table.

🤠 You should do this as a batch load using `spark.read`

Read in using the format, `"text"`, and using the provided schema.

In [0]:
# TODO
kafka_schema = "value STRING"

raw_health_tracker_data_df = (
  spark.read.format("text").schema(kafka_schema).load(rawPath)
  
)

## Display the Raw Data

🤓 Each row here is a raw string in JSON format, as would be passed by a stream server like Kafka.

In [0]:
display(raw_health_tracker_data_df)

value
"{""time"":""2020-01-01 01:00:00"",""name"":""Armando Clemente"",""device_id"":""2"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Meallan O'Conarain"",""device_id"":""3"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Lai Hui"",""device_id"":""1"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Lakesia Brown"",""device_id"":""4"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Anu Achaval"",""device_id"":""5"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Ae Yujin"",""device_id"":""6"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Pardeep Kapoor"",""device_id"":""7"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Julian Andersen"",""device_id"":""8"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Simone Graber"",""device_id"":""9"",""steps"":0,""day"":1,""month"":1,""hour"":1}"
"{""time"":""2020-01-01 01:00:00"",""name"":""Gonzalo Valdés"",""device_id"":""10"",""steps"":0,""day"":1,""month"":1,""hour"":1}"


## Ingestion Metadata

As part of the ingestion process, we record metadata for the ingestion.

**EXERCISE:** Add metadata to the incoming raw data. You should add the following columns:

- data source (`datasource`), use `"files.training.databricks.com"`
- ingestion time (`ingesttime`)
- status (`status`), use `"new"`
- ingestion date (`ingestdate`)

In [0]:
# TODO
from pyspark.sql.functions import current_timestamp, lit

raw_health_tracker_data_df = (
  raw_health_tracker_data_df.select(
   "value",
lit("files.training.databricks.com").alias("datasource"),
    current_timestamp().alias ("ingesttime"),
    lit("new").alias("status"),
    current_timestamp().cast("date").alias("ingestdate")
  )
)

## WRITE Batch to a Bronze Table

Finally, we write to the Bronze Table.

Make sure to write in the correct order (`"datasource"`, `"ingesttime"`, `"value"`, `"status"`, `"p_ingestdate"`).

Make sure to use following options:

- the format `"delta"`
- using the append mode
- partition by `p_ingestdate`

In [0]:
# TODO
from pyspark.sql.functions import col

(
  raw_health_tracker_data_df.select(
  "datasource",
  "ingesttime",
  "value",
  "status",
  col("ingestdate").alias("p_ingestdate"),
  )
  .write.format("delta")
  .mode("append")
  .partitionBy("p_ingestdate")
  .save(bronzePath)
)

In [0]:
display(dbutils.fs.ls(bronzePath))

path,name,size,modificationTime
dbfs:/dacharya/dacharya/dataengineering/classic/bronze/_delta_log/,_delta_log/,0,1661107867000
dbfs:/dacharya/dacharya/dataengineering/classic/bronze/p_ingestdate=2022-08-21/,p_ingestdate=2022-08-21/,0,1661107858000


## Register the Bronze Table in the Metastore

The table should be named `health_tracker_classic_bronze`.

In [0]:
# TODO
spark.sql("""
DROP TABLE IF EXISTS health_tracker_classic_bronze;
""")

spark.sql(f"""
CREATE TABLE health_tracker_classic_bronze
USING DELTA
LOCATION "{bronzePath}"
""")

## Display Classic Bronze Table

Run this query to display the contents of the Classic Bronze Table

In [0]:
%sql

SELECT * FROM health_tracker_classic_bronze

datasource,ingesttime,value,status,p_ingestdate
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Armando Clemente"",""device_id"":""2"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Meallan O'Conarain"",""device_id"":""3"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Lai Hui"",""device_id"":""1"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Lakesia Brown"",""device_id"":""4"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Anu Achaval"",""device_id"":""5"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Ae Yujin"",""device_id"":""6"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Pardeep Kapoor"",""device_id"":""7"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Julian Andersen"",""device_id"":""8"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Simone Graber"",""device_id"":""9"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Gonzalo Valdés"",""device_id"":""10"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21


### Query Broken Records


Run a SQL query to display just the incoming records for "Gonzalo Valdés".

🧠 You can use the SQL operator `RLIKE`, which is short for regex `LIKE`,
to create your matching predicate.

[`RLIKE` documentation](https://docs.databricks.com/spark/latest/spark-sql/language-manual/functions.html#rlike)

In [0]:
%sql

SELECT * FROM health_tracker_classic_bronze WHERE value RLIKE 'Gonzalo Valdés'

datasource,ingesttime,value,status,p_ingestdate
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 01:00:00"",""name"":""Gonzalo Valdés"",""device_id"":""10"",""steps"":0,""day"":1,""month"":1,""hour"":1}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 02:00:00"",""name"":""Gonzalo Valdés"",""device_id"":""10"",""steps"":0,""day"":1,""month"":1,""hour"":2}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 03:00:00"",""name"":""Gonzalo Valdés"",""device_id"":""10"",""steps"":0,""day"":1,""month"":1,""hour"":3}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 04:00:00"",""name"":""Gonzalo Valdés"",""device_id"":""16b830c4-d9da-11ea-8534-0242ac110002"",""steps"":0,""day"":1,""month"":1,""hour"":4}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 05:00:00"",""name"":""Gonzalo Valdés"",""device_id"":""10"",""steps"":0,""day"":1,""month"":1,""hour"":5}",new,2022-08-21
files.training.databricks.com,2022-08-21T18:50:56.519+0000,"{""time"":""2020-01-01 00:00:00"",""name"":""Gonzalo Valdés"",""device_id"":""10"",""steps"":0,""day"":1,""month"":1,""hour"":0}",new,2022-08-21


### What do you notice?

### Display the User Dimension Table


Run a SQL query to display the records in `health_tracker_user`.

In [0]:
%sql

SELECT * FROM health_tracker_user

name,address,phone_number,user_id,device_id
Meallan O'Conarain,"3048 Guerrero Alley Jerryhaven, PA 56888",(580)703-9076x32254,16b79f9c-d9da-11ea-8534-0242ac110002,3
Lakesia Brown,"549 Palmer Village Lake Joseph, IN 44981",001-624-908-5142x446,16b7b946-d9da-11ea-8534-0242ac110002,4
Anu Achaval,"8334 Kevin Fork Suite 531 South Kennethton, WI 42697",468-733-3330x598,16b7cec2-d9da-11ea-8534-0242ac110002,5
Julian Andersen,"321 Jackson Forest Apt. 689 Garciafort, UT 91205",001-796-472-0831x3399,16b807ac-d9da-11ea-8534-0242ac110002,8
Simone Graber,"55863 Brown Cliff Port Amybury, ND 99197",4548070215,16b81c2e-d9da-11ea-8534-0242ac110002,9
Gonzalo Valdés,"2456 Rachael Manors Apt. 758 South Curtisfort, WV 27129",(408)059-4700x9591,16b830c4-d9da-11ea-8534-0242ac110002,10
Lai Hui,"805 John Oval Apt. 470 Lake Amanda, NE 09043",3087607759,16b74cfe-d9da-11ea-8534-0242ac110002,1
Armando Clemente,"293 Keith Drive East David, NY 05983",8497224309,16b78264-d9da-11ea-8534-0242ac110002,2
Ae Yujin,USCGC Davis FPO AP 67548,001-072-063-6894x746,16b7dd72-d9da-11ea-8534-0242ac110002,6
Pardeep Kapoor,"653 Monica Knoll Hicksfort, KS 41378",001-834-698-3839x6306,16b7f0d2-d9da-11ea-8534-0242ac110002,7


## Purge Raw File Path

We have loaded the raw files using batch loading, whereas with the Plus pipeline we used Streaming.

The impact of this is that batch does not use checkpointing and therefore does not know which files have been ingested.

We need to manually purge the raw files that have been loaded.

In [0]:
dbutils.fs.rm(rawPath, recurse=True)


-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>