d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Bronze to Silver - ETL into a Silver table

We need to perform some transformations on the data to move it from bronze to silver tables.

## Notebook Objective

In this notebook we:
1. Ingest raw data using composable functions
1. Use composable functions to write to the Bronze table
1. Develop the Bronze to Silver Step
   - Extract and transform the raw string to columns
   - Quarantine the bad data
   - Load clean data into the Silver table
1. Update the status of records in the Bronze table

## Step Configuration

In [0]:
%run ./includes/configuration

## Import Operation Functions

In [0]:
%run ./includes/main/python/operations

### Display the Files in the Bronze Paths

In [0]:
display(dbutils.fs.ls(bronzePath))

## Land More Raw Data

Before we get started with this lab, let's land some more raw data.

In a production setting, we might have data coming in every
hour. Here we are simulating this with the function
`ingest_classic_data`.

😎 Recall that we did this in the notebook `00_ingest_raw`.

**EXERCISE:** Land ten hours using the utility function, `ingest_classic_data`.

In [0]:
# ANSWER
ingest_classic_data(hours=10)

## Current Delta Architecture
Next, we demonstrate everything we have built up to this point in our
Delta Architecture.

We do so not with the ad hoc queries as written before, but now with
composable functions included in the file `classic/includes/main/python/operations`.
You should check this file for the correct arguments to use in the next
three steps.

🤔 You can refer to `plus/02_bronze_to_silver` if you are stuck.

### Step 1: Create the `rawDF` DataFrame

**Exercise:** Use the function `read_batch_raw` to ingest the newly arrived
data.

In [0]:
# ANSWER
rawDF = read_batch_raw(rawPath)

### Step 2: Transform the Raw Data

**Exercise:** Use the function `transform_raw` to ingest the newly arrived
data.

In [0]:
# ANSWER
transformedRawDF = transform_raw(rawDF)

## Verify the Schema with an Assertion

The DataFrame `transformedRawDF` should now have the following schema:

```
datasource: string
ingesttime: timestamp
status: string
value: string
p_ingestdate: date
```

In [0]:
from pyspark.sql.types import *

assert transformedRawDF.schema == StructType(
    [
        StructField("datasource", StringType(), False),
        StructField("ingesttime", TimestampType(), False),
        StructField("status", StringType(), False),
        StructField("value", StringType(), True),
        StructField("p_ingestdate", DateType(), False),
    ]
)
print("Assertion passed.")

### Step 3: Write Batch to a Bronze Table

**Exercise:** Use the function `batch_writer` to ingest the newly arrived
data.

**Note**: you will need to begin the write with the `.save()` method on
your writer.

🤖 **Be sure to partition on `p_ingestdate`**.

In [0]:
# ANSWER
rawToBronzeWriter = batch_writer(
    dataframe=transformedRawDF, partition_column="p_ingestdate"
)

rawToBronzeWriter.save(bronzePath)

## Purge Raw File Path

Manually purge the raw files that have already been loaded.

In [0]:
dbutils.fs.rm(rawPath, recurse=True)

## Display the Bronze Table

If you have ingested 16 hours you should see 160 records.

In [0]:
%sql
SELECT * FROM health_tracker_classic_bronze

## Bronze to Silver Step

Let's start the Bronze to Silver step.

## Make Notebook Idempotent

In [0]:
dbutils.fs.rm(silverPath, recurse=True)

## Load New Records from the Bronze Records

**EXERCISE**

Load all records from the Bronze table with a status of `"new"`.

In [0]:
# ANSWER

bronzeDF = spark.read.table("health_tracker_classic_bronze").filter("status = 'new'")

## Extract the Nested JSON from the Bronze Records

### Step 1: Extract the Nested JSON from the `value` column
**EXERCISE**

Use `pyspark.sql` functions to extract the `"value"` column as a new
column `"nested_json"`.

In [0]:
# ANSWER
from pyspark.sql.functions import from_json

json_schema = """
    time TIMESTAMP,
    name STRING,
    device_id STRING,
    steps INTEGER,
    day INTEGER,
    month INTEGER,
    hour INTEGER
"""

bronzeAugmentedDF = bronzeDF.withColumn(
    "nested_json", from_json(col("value"), json_schema)
)

### Step 2: Create the Silver DataFrame by Unpacking the `nested_json` Column

Unpacking a JSON column means to flatten the JSON and include each top level attribute
as its own column.

🚨 **IMPORTANT** Be sure to include the `"value"` column in the Silver DataFrame
because we will later use it as a unique reference to each record in the
Bronze table

In [0]:
# ANSWER
silver_health_tracker = bronzeAugmentedDF.select("value", "nested_json.*")

## Verify the Schema with an Assertion

The DataFrame `silver_health_tracker` should now have the following schema:

```
value: string
time: timestamp
name: string
device_id: string
steps: integer
day: integer
month: integer
hour: integer
```

💪🏼 Remember, the function `_parse_datatype_string` converts a DDL format schema string into a Spark schema.

In [0]:
from pyspark.sql.types import _parse_datatype_string

assert silver_health_tracker.schema == _parse_datatype_string(
    """
  value STRING,
  time TIMESTAMP,
  name STRING,
  device_id STRING,
  steps INTEGER,
  day INTEGER,
  month INTEGER,
  hour INTEGER
"""
)
print("Assertion passed.")

## Transform the Data

1. Create a column `p_eventdate DATE` from the column `time`.
1. Rename the column `time` to `eventtime`.
1. Cast the `device_id` as an integer.
1. Include only the following columns in this order:
   1. `value`
   1. `device_id`
   1. `steps`
   1. `eventtime`
   1. `name`
   1. `p_eventdate`

💪🏼 Remember that we name the new column `p_eventdate` to indicate
that we are partitioning on this column.

🕵🏽‍♀️ Remember that we are keeping the `value` as a unique reference to values
in the Bronze table.

In [0]:
# ANSWER
from pyspark.sql.functions import col

silver_health_tracker = silver_health_tracker.select(
    "value",
    col("device_id").cast("integer").alias("device_id"),
    "steps",
    col("time").alias("eventtime"),
    "name",
    col("time").cast("date").alias("p_eventdate"),
)

## Verify the Schema with an Assertion

The DataFrame `silver_health_tracker_data_df` should now have the following schema:

```
value: string
device_id: integer
heartrate: double
eventtime: timestamp
name: string
p_eventdate: date```

💪🏼 Remember, the function `_parse_datatype_string` converts a DDL format schema string into a Spark schema.

In [0]:
from pyspark.sql.types import _parse_datatype_string

assert silver_health_tracker.schema == _parse_datatype_string(
    """
  value STRING,
  device_id INTEGER,
  steps INTEGER,
  eventtime TIMESTAMP,
  name STRING,
  p_eventdate DATE
"""
), "Schemas do not match"
print("Assertion passed.")

## Quarantine the Bad Data

Recall that at step, `00_ingest_raw`, we identified that some records were coming in
with device_ids passed as uuid strings instead of string-encoded integers.
Our Silver table stores device_ids as integers so clearly there is an issue
with the incoming data.

In order to properly handle this data quality issue, we will quarantine
the bad records for later processing.

Check for records that have nulls - compare the output of the following two cells

In [0]:
silver_health_tracker.count()

In [0]:
silver_health_tracker.na.drop().count()

### Split the Silver DataFrame

In [0]:
silver_health_tracker_clean = silver_health_tracker.filter("device_id IS NOT NULL")
silver_health_tracker_quarantine = silver_health_tracker.filter("device_id IS NULL")

### Display the Quarantined Records

In [0]:
display(silver_health_tracker_quarantine)

## WRITE Clean Batch to a Silver Table

**EXERCISE:** Batch write `silver_health_tracker_clean` to the Silver table path, `silverPath`.

1. Use format, `"delta"`
1. Use mode `"append"`.
1. Do **NOT** include the `value` column.
1. Partition by `"p_eventdate"`.

In [0]:
# ANSWER
(
    silver_health_tracker_clean.select(
        "device_id", "steps", "eventtime", "name", "p_eventdate"
    )
    .write.format("delta")
    .mode("append")
    .partitionBy("p_eventdate")
    .save(silverPath)
)

In [0]:
spark.sql(
    """
DROP TABLE IF EXISTS health_tracker_classic_silver
"""
)

spark.sql(
    f"""
CREATE TABLE health_tracker_classic_silver
USING DELTA
LOCATION "{silverPath}"
"""
)

## Verify the Schema with an Assertion

In [0]:
silverTable = spark.read.table("health_tracker_classic_silver")
expected_schema = """
  device_id INTEGER,
  steps INTEGER,
  eventtime TIMESTAMP,
  name STRING,
  p_eventdate DATE
"""

assert silverTable.schema == _parse_datatype_string(
    expected_schema
), "Schemas do not match"
print("Assertion passed.")

In [0]:
%sql

SELECT * FROM health_tracker_classic_silver

## Update Bronze table to Reflect the Loads

**EXERCISE:** Update the records in the Bronze table to reflect updates.

### Step 1: Update Clean records
Clean records that have been loaded into the Silver table and should have
   their Bronze table `status` updated to `"loaded"`.

💃🏽 **Hint** You are matching the `value` column in your clean Silver DataFrame
to the `value` column in the Bronze table.

In [0]:
# ANSWER
from delta.tables import DeltaTable

bronzeTable = DeltaTable.forPath(spark, bronzePath)
silverAugmented = silver_health_tracker_clean.withColumn("status", lit("loaded"))

update_match = "bronze.value = clean.value"
update = {"status": "clean.status"}

(
    bronzeTable.alias("bronze")
    .merge(silverAugmented.alias("clean"), update_match)
    .whenMatchedUpdate(set=update)
    .execute()
)

**EXERCISE:** Update the records in the Bronze table to reflect updates.

### Step 2: Update Quarantined records
Quarantined records should have their Bronze table `status` updated to `"quarantined"`.

🕺🏻 **Hint** You are matching the `value` column in your quarantine Silver
DataFrame to the `value` column in the Bronze table.

In [0]:
# ANSWER
silverAugmented = silver_health_tracker_quarantine.withColumn(
    "status", lit("quarantined")
)

update_match = "bronze.value = quarantine.value"
update = {"status": "quarantine.status"}

(
    bronzeTable.alias("bronze")
    .merge(silverAugmented.alias("quarantine"), update_match)
    .whenMatchedUpdate(set=update)
    .execute()
)


-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>