
## Lab: Multi-Hop Architecture


<div  style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Associate/main/Labs/Includes/images/school_schema.png" alt="School Schema" style="width: 600">
</div>

Run the following cell to setup the lab environment

In [0]:
%run ../Includes/Setup-Lab

#### Q1- Declaring Bronze Table

Use Auto Loader to incrementally load enrollments json files from the directory **{dataset_school}/enrollments-json-raw** to a Delta table called **`bronze`**


In [0]:
dataset_source = f"{dataset_school}/enrollments-json-raw"
bronze_checkpoint_path = "dbfs:/mnt/DE-Associate/checkpoints/school/bronze"
schema_location = bronze_checkpoint_path

(spark.readStream
        .___________________
        .___________________
        .___________________
        .load(dataset_source)
      .writeStream
        .___________________
        .___________________
        .table("bronze")
)

Create a streaming temporary view **bronze_tmp** from the bronze table in order to perform transformations using SQL.

In [0]:
(spark
  .readStream
  .table("bronze")
  .___________________("bronze_tmp"))

#### Q2 - Data Cleansing & Enrichment

Using CTAS syntax, define a new streaming view **`bronze_cleaned_tmp`** against **`bronze_tmp`** that does the following:
* Remove records with **quantity** of 0 item
* Add a column called **`processing_time`** containing the current timestamp using the **current_timestamp()** function

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW bronze_cleaned_tmp AS
SELECT ___________________


#### Q3 - Declaring Silver Table

Stream the data from **`bronze_cleaned_tmp`** to a table called **`silver`**.

In [0]:
silver_checkpoint_path = "dbfs:/mnt/DE-Associate/checkpoints/school/silver"

(spark.table("bronze_cleaned_tmp")
        .___________________
        .___________________
        .___________________
        .table("silver")
)

Let's create a streaming temporary view from the silver table in order to perform business-level aggregation using SQL

In [0]:
(spark
  .readStream
  .table("silver")
  .createOrReplaceTempView("silver_tmp"))

#### Q4- Declaring Gold Table

Using CTAS syntax, define a new streaming view **`enrollments_per_student_tmp_vw`** against **`silver_tmp`** to count the number of enrollments per **`student`**. Name the aggregated field: **`enrollments_count`**

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW enrollments_per_student_tmp_vw AS
SELECT ___________________



Stream the aggregated data from the **`enrollments_per_student_tmp_vw`** view to a Delta table called **`gold_enrollments_stats`**.

In [0]:
gold_checkpoint_path = "dbfs:/mnt/DE-Associate/checkpoints/school/gold_enrollments_stats"

query = (spark.table("enrollments_per_student_tmp_vw")
              .writeStream
              .___________________
              .___________________
              .table("gold_enrollments_stats"))

Query the data in the **`gold_enrollments_stats`** table to ensure data was written as expected.

In [0]:
%sql
SELECT * FROM gold_enrollments_stats

Run the below cell to land new a json file of enrollments data

In [0]:
load_new_json_data()

Wait for the new data to be propagated, and then run the below query to verify that the statistics have been updated in the table **gold_enrollments_stats**

In [0]:
%sql
SELECT * FROM gold_enrollments_stats

Finally, run the below cell for canceling the above streaming queries

In [0]:
for s in spark.streams.active:
    print("Stopping stream: " + s.id)
    s.stop()
    s.awaitTermination()