# Bronze to Silver - ETL into a Silver table

We need to perform some transformations on the data to move it from bronze to silver tables.

😎 We're reading _from_ the Delta table now because a Delta table can be both a source AND a sink.

## Notebook Objective

In this notebook we:
1. Harden the Raw to Bronze Step we wrote in a previous notebook
2. Develop the Bronze to Silver Step
   - Extract and Transform the Raw string to columns
   - Load this Data into the Silver Table

## Step Configuration

In [0]:
%run ./includes/configuration

Out[3]: DataFrame[]

No running streams.


## Import Operation Functions

In [0]:
%run ./includes/main/python/operations

### Display the Files in the Raw and Bronze Paths

In [0]:
display(dbutils.fs.ls(rawPath))

path,name,size
dbfs:/dbacademy/gaurav_chattree/dataengineering/plus/raw/health_tracker_data_2020_1.json,health_tracker_data_2020_1.json,310628


In [0]:
display(dbutils.fs.ls(bronzePath))

path,name,size
dbfs:/dbacademy/gaurav_chattree/dataengineering/plus/bronze/_delta_log/,_delta_log/,0
dbfs:/dbacademy/gaurav_chattree/dataengineering/plus/bronze/p_ingestdate=2022-03-25/,p_ingestdate=2022-03-25/,0


## Start Streams

Before we add new streams, let's start the streams we have previously engineered.

We will start two named streams:

- `write_raw_to_bronze`
- `display_bronze`

🤠 In a typical production setting, you would not interact with your streams as we are doing here. We stop and restart our streams in each new notebook for demonstration purposes. *It is easier to track everything that is happening if our streams are only running in our current notebook.*

### Current Delta Architecture
Next, we demonstrate everything we have built up to this point in our
Delta Architecture.

#### Harden the Raw to Bronze Step

We do so not with the ad hoc queries as written before, but now with
composable functions included in the file `includes/main/python/operations`.
This is a process known as **hardening** the step. If the data engineering
code is written in composable functions, it can be unit tested to ensure
stability.

🛠 In our composable functions we will be making use of
[Python Type Hints](https://docs.python.org/3/library/typing.html).

#### Python Type Hints

For example, the function below takes and returns a string and is annotated as follows:

```
def greeting(name: str) -> str:
    return 'Hello ' + name
```
In the function `greeting`, the argument `name` is expected to be of type `str`
and the return type `str`.

### Step 1: Create the `rawDF` Streaming DataFrame

In the previous notebook, we wrote:

```
rawDF = (
  spark.readStream
  .format("text")
  .schema(kafka_schema)
  .load(rawPath)
)
```

Now, we use the following function in `includes/main/python/operations`

```
def read_stream_raw(spark: SparkSession, rawPath: str) -> DataFrame:
  kafka_schema = "value STRING"
  return (
    spark.readStream
    .format("text")
    .schema(kafka_schema)
    .load(rawPath)
  )
```

🤩 Note that we have injected the current Spark Session into the function as the variable `spark`.

In [0]:
rawDF = read_stream_raw(spark, rawPath)

### Step 2: Transform the Raw Data

Next, we transform the raw data, `rawDF`. Previously, we wrote:

```
rawDF = (
  rawDF.select(
    lit("files.training.databricks.com").alias("datasource"),
    current_timestamp().alias("ingesttime"),
    "value",
    current_timestamp().cast("date").alias("ingestdate")
  )
)
```

Now, we use the following function in `includes/main/python/operations`

```
def transform_raw(df: DataFrame) -> DataFrame:
  return (
    df.select(
      lit("files.training.databricks.com").alias("datasource"),
      current_timestamp().alias("ingesttime"),
      "value",
      current_timestamp().cast("date").alias("p_ingestdate")
    )
  )
```

In [0]:
transformedRawDF = transform_raw(rawDF)

### Step 3: Write Stream to a Bronze Table

Finally, we write to the Bronze Table using Structured Streaming.
Previously, we wrote:

```
(
  raw_health_tracker_data_df
  .select("datasource", "ingesttime", "value", col("ingestdate").alias("p_ingestdate"))
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", bronzeCheckpoint)
  .partitionBy("p_ingestdate")
  .queryName("write_raw_to_bronze")
  .start(bronzePath)
)
```
Now, we use the following function in `includes/main/python/operations`

```
def create_stream_writer(dataframe: DataFrame, checkpoint: str,
                         name: str, partition_column: str=None,
                         mode: str="append") -> DataStreamWriter:

    stream_writer = (
        dataframe.writeStream
        .format("delta")
        .outputMode(mode)
        .option("checkpointLocation", checkpoint)
        .queryName(name)
    )
    if partition_column is not None:
      return stream_writer.partitionBy(partition_column)
    return stream_writer
```

🤯 **Note**: This function will be used repeatedly, every time we create
a `DataStreamWriter`.

☝🏿 This function returns a `DataStreamWriter`, not a `DataFrame`. This means
that we will have to call `.start()` as a function method to start the stream.

In [0]:
rawToBronzeWriter = create_stream_writer(
    dataframe=transformedRawDF,
    checkpoint=bronzeCheckpoint,
    name="write_raw_to_bronze",
    partition_column="p_ingestdate",
)

rawToBronzeWriter.start(bronzePath)

Out[19]: <pyspark.sql.streaming.StreamingQuery at 0x7faf50eeca30>

## Display the Bronze Table

In [0]:
bronzeDF = read_stream_delta(spark, bronzePath)
display(bronzeDF, streamName="display_bronze")

datasource,ingesttime,value,p_ingestdate
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577836800.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.8139067501}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577840400.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":53.9078900098}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577844000.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.7129593616}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577847600.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.2880422685}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577851200.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.5156095386}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577854800.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":53.6280743846}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577858400.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":52.1760037066}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577862000.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":90.0456721836}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577865600.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":89.4695644522}",2022-03-25
files.training.databricks.com,2022-03-25T05:44:13.701+0000,"{""time"":1577869200.0,""name"":""Deborah Powell"",""device_id"":0,""heartrate"":88.1490304138}",2022-03-25


## Show Running Streams

In [0]:
for stream in spark.streams.active:
    print(stream.name)

display_bronze
write_raw_to_bronze


## Make Notebook Idempotent

In [0]:
dbutils.fs.rm(silverPath, recurse=True)
dbutils.fs.rm(silverCheckpoint, recurse=True)

Out[22]: False

**Exercise:** Count Records in the Bronze Table

Display how many records are in our table so we can watch it grow as the data streams in. As we ingest more files, you will be able to return to this streaming display and watch the count increase.

- Use the DataFrame, `bronzeDF`, which is a reference to the Bronze Delta table
- Write spark code to count the number of records in the Bronze Delta table

💡 **Hint:** While a standard DataFrame has a simple `.count()` method, when performing operations such as `count` on a stream, you must use `.groupby()` before the aggregate operation.

In [0]:
# TODO
display(
  bronzeDF
  .groupby()
  .count(),
  streamName="display_bronze_count"
)

count
7128


## Retrieve Second Month of Data

Next, we use the utility function, `retrieve_data` to retrieve another file.

After you ingest the file by running the following cell, view the streams above; you should be able to watch the data being ingested.

In [0]:
retrieve_data(2020, 2, rawPath)

Out[24]: True

**Exercise:** Write an Assertion Statement to Verify File Ingestion

The expected file has the following name:

In [0]:
file_2020_2 = "health_tracker_data_2020_2.json"

In [0]:

assert file_2020_2 in [item.name for item in dbutils.fs.ls(rawPath)], "File not present in Raw Path"
print("Assertion passed.")

Assertion passed.


## Extracting Nested JSON

We now begin the work of creating the Silver Table. First, we extract the JSON data from the `value` column in the Bronze Delta table. That this is being done after first landing our ingested data in a Bronze table means that we do not need to worry about the ingestion process breaking because the data did not parse.

This extraction consists of two steps:

1. We extract the nested JSON from `bronzeDF` using the `pyspark.sql` function `from_json`.

   📒 The `from_json` function requires that a schema be passed as argument. Here we pass the schema `json_schema = "device_id INTEGER, heartrate DOUBLE, name STRING, time FLOAT"`.

1. We flatten the nested JSON into a new DataFrame by selecting all nested values of the `nested_json` column.

In [0]:
from pyspark.sql.functions import from_json

json_schema = "device_id INTEGER, heartrate DOUBLE, name STRING, time FLOAT"

silver_health_tracker = bronzeDF.select(
    from_json(col("value"), json_schema).alias("nested_json")
).select("nested_json.*")

## Transform the Data

The "time" column isn't currently human-readable in Unix time format.
We need to transform it to make it useful. We also extract just the date
from the timestamp. Next, we transform `silver_health_tracker` with the
following transformations:

- convert the `time` column to a timestamp with the name `eventtime`
- convert the `time` column to a date with the name `p_eventdate`

Note that we name the new column `p_eventdate` to indicate that we are
partitioning on this column.

In [0]:
from pyspark.sql.functions import col, from_unixtime

silver_health_tracker = silver_health_tracker.select(
    "device_id",
    "heartrate",
    from_unixtime("time").cast("timestamp").alias("eventtime"),
    "name",
    from_unixtime("time").cast("date").alias("p_eventdate"),
)

**Exercise:** Write an Assertion To Verify the Schema

The DataFrame `silver_health_tracker` should now have the following schema:

```
device_id: integer
heartrate: double
eventtime: timestamp
name: string
p_eventdate: date```

Write a schema using DDL format to complete the below assertion statement.

💪🏼 Remember, the function `_parse_datatype_string` converts a DDL format schema string into a Spark schema.

In [0]:
from pyspark.sql.types import _parse_datatype_string
 
assert silver_health_tracker.schema == _parse_datatype_string("""
  device_id INTEGER,
  heartrate DOUBLE,
  eventtime TIMESTAMP,
  name STRING,
  p_eventdate DATE"""), "File not present in Silver Path"
print("Assertion passed.")

Assertion passed.


## WRITE Stream to a Silver Table

Next, we stream write to the Silver table.

We partion this table on event data (`p_eventdate`).

In [0]:
(
    silver_health_tracker.writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", silverCheckpoint)
    .partitionBy("p_eventdate")
    .queryName("write_bronze_to_silver")
    .start(silverPath)
)

Out[32]: <pyspark.sql.streaming.StreamingQuery at 0x7faf51167250>

In [0]:
spark.sql(
    """
DROP TABLE IF EXISTS health_tracker_plus_silver
"""
)

spark.sql(
    f"""
CREATE TABLE health_tracker_plus_silver
USING DELTA
LOCATION "{silverPath}"
"""
)

Out[33]: DataFrame[]

### Troubleshooting

😫 If you try to run this before the `writeStream` above has been created, you may see the following error:

`
AnalysisException: Table schema is not set.  Write data into it or use CREATE TABLE to set the schema.;`

If this happens, wait a moment for the `writeStream` to instantiate and run the command again.

## Explore and Visualize the Data

After running the following cell, click on "Plot Options..." and set the plot options as shown below:

![Plot Options](https://files.training.databricks.com/images/pipelines_plot_options.png)

In [0]:
display(
    spark.readStream.table("health_tracker_plus_silver"), streamName="display_silver"
)

device_id,heartrate,eventtime,name,p_eventdate
0,56.8004920211,2020-01-13T00:00:00.000+0000,Deborah Powell,2020-01-13
0,57.3427304091,2020-01-13T00:59:44.000+0000,Deborah Powell,2020-01-13
0,57.3753078662,2020-01-13T01:59:28.000+0000,Deborah Powell,2020-01-13
0,57.5585488599,2020-01-13T02:59:12.000+0000,Deborah Powell,2020-01-13
0,56.0829660354,2020-01-13T03:58:56.000+0000,Deborah Powell,2020-01-13
0,56.5231267033,2020-01-13T05:00:48.000+0000,Deborah Powell,2020-01-13
0,94.2689405255,2020-01-13T06:00:32.000+0000,Deborah Powell,2020-01-13
0,94.9195777222,2020-01-13T07:00:16.000+0000,Deborah Powell,2020-01-13
0,95.2584820986,2020-01-13T08:00:00.000+0000,Deborah Powell,2020-01-13
0,94.1239514075,2020-01-13T08:59:44.000+0000,Deborah Powell,2020-01-13


### What patterns do you notice in the data? Anomalies?

### Missing Records

When we look at the Silver table, we expect to see two months of data, five device measurements, 24 hours a day for (31 + 29) days, or 7200 records. (The data was recorded during the month of February in a leap year, which is why there are 29 days in the month.)

❗️We do not have a correct count. It looks like `device_id`: 4 is missing 72 records.

In [0]:
from pyspark.sql.functions import count

display(
    spark.read.table("health_tracker_plus_silver").groupby("device_id").agg(count("*"))
)

device_id,count(1)
1,1440
3,1440
4,1368
2,1440
0,1440


## Table Histories

Recall that the Delta transaction log allows us to view all of the commits that have taken place in a Delta table's history.

In [0]:
from delta.tables import DeltaTable

bronzeTable = DeltaTable.forPath(spark, bronzePath)
silverTable = DeltaTable.forPath(spark, silverPath)

In [0]:
display(bronzeTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
1,2022-03-25T06:02:37.000+0000,8047228571528786,gchattre@ur.rochester.edu,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 733fdb5a-68da-4b0c-9198-0ed8b0802f77, epochId -> 1)",,List(3719025789001831),0325-052223-ncmvp8x,0.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 3408, numOutputBytes -> 71505, numAddedFiles -> 1)",
0,2022-03-25T05:44:17.000+0000,8047228571528786,gchattre@ur.rochester.edu,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 733fdb5a-68da-4b0c-9198-0ed8b0802f77, epochId -> 0)",,List(3719025789001790),0325-052223-ncmvp8x,,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 3720, numOutputBytes -> 77601, numAddedFiles -> 1)",


In [0]:
display(silverTable.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,2022-03-25T06:05:18.000+0000,8047228571528786,gchattre@ur.rochester.edu,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 9949bb4c-54af-4293-ba39-319b3e5e0f8d, epochId -> 0)",,List(3719025789001831),0325-052223-ncmvp8x,,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 7128, numOutputBytes -> 152784, numAddedFiles -> 60)",


## Time Travel

We can query an earlier version of the Delta table using the time travel feature. By running the following two cells, we can see that the current table count is larger than it was before we ingested the new data file into the stream.

In [0]:
%sql

SELECT COUNT(*) FROM health_tracker_plus_bronze VERSION AS OF 0

count(1)
3720


In [0]:
%sql

SELECT COUNT(*) FROM health_tracker_plus_bronze VERSION AS OF 1

count(1)
7128


In [0]:
%sql

SELECT COUNT(*) FROM health_tracker_plus_bronze

count(1)
7128


## Stop All Streams

In the next notebook, we will analyze data in the Silver Delta table, and perform some update operations on the data.

Before we do so, let's shut down all streams in this notebook.

In [0]:
stop_all_streams()


Out[42]: True

-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>