In [0]:
# INCLUDE_HEADER_TRUE
# INCLUDE_FOOTER_TRUE

# Demo - Multi Flow Data Pipeline with Liquid Clustering and Data Quality

## Overview

This demonstration showcases how to build a robust incremental data pipeline using Lakeflow Spark Declarative Pipelines (SDP) to consolidate data from multiple subsidiaries (data sources) into a single target streaming table. 

You'll work with three fictional company subsidiaries: Bright Home, Lumina Sports, and Northstar Outfitters, each producing transaction data in different formats (`CSV` and `JSON`). The demo illustrates how to overcome common pipeline challenges including multiple flows into a single table, schema mismatches, data quality issues, and performance optimization requirements.

Through hands-on implementation, you'll create a complete medallion architecture pipeline that incrementally ingests multiple data sources into a single bronze table using flows, applies data quality constraints and transformations in the silver layer with liquid clustering optimization, and creates business intelligence materialized views in the gold layer. 

## Learning Objectives

By the end of this demonstration, you will be able to:
- **Ingest multiple data sources into one bronze table** using Spark Declarative Pipelines having different file formats like CSV and JSON.

- **Standardize schemas in the bronze layer** and map correct data types in silver layer to resolve differences across source systems.

- **Add data quality checks and enable liquid clustering** in the silver tables to enforce basic rules and improve query performance.

- **Build incremental materialized views** in the gold layer that refresh automatically and provide ready-to-use analytics.

- **Run and monitor the full pipeline run** across bronze, silver, and gold, including incremental loads and data lineage tracking.

### Multi Flow Pipeline Demonstration Overview
In this demonstration, you'll build a Lakeflow Spark Declarative Pipeline that performs the full medallion flow from raw ingestion of multiple raw data sources to curated analytics

1. **Use multiple flows (3)** to incrementally ingest files from three cloud storage locations and write into a single bronze table. 
    - Each volume is a daily orders drop for a specific subsidiary that the company owns. We want all this data ingested into a single table for overall analysis.
2. **Define and build the silver table** with a clean schema, apply basic data quality constraints, and **enable liquid clustering** for query performance as the data continues to grow.
3. **Create gold materialized views** that automatically refresh and provide ready-to-use analytics.

![Multi Flow Pipeline Overview](./Includes/images/multi_flow/multi_flow_demo_pipeline_overview.png)

## REQUIRED - SELECT SERVERLESS COMPUTE VERSION 4

This demonstration was developed using **Serverless v4**. 
- Selecting a serverless version: [Select an environment version](https://docs.databricks.com/aws/en/compute/serverless/dependencies#-select-an-environment-version)

**NOTE:** While all purpose compute or other **Serverless** versions may work, they were not tested.

## A. Classroom Setup
Follow the cells below to set up your Workspace for the demonstration.

### A1. Access Marketplace Data

##### NOTE: If you are running this lab in **Vocareum** as a **Partner**, the share is already installed and available as **dbacademy_retail**. Please use this as the value for the `your_marketplace_share_catalog_name` variable below.

If you are running this in your own Workspace, complete the following steps to get your own copy of the Marketplace data:

1. Open **Databricks Marketplace** in a new tab.  

2. Search for `Simulated Retail Customer Data`.  

3. Select the tile titled **Simulated Retail Customer Data (Databricks provided)**.  

4. Click **Get instant access**.  

5. **Enter a unique catalog name** for your share to avoid receiving a duplicate catalog error in shared Workspaces. For example: `dbacademy_retail_yourname`.  

6. Review and accept the terms, then click **Get instant access** to complete the setup.

7. Update the variable `your_marketplace_share_catalog_name` in cell below to point to your shared catalog from Marketplace.

In [0]:
## Update the variable below to reference your marketplace catalog name

## NOTE: If you are using Vocareum use the value 'dbacademy_retail' catalog below
your_marketplace_share_catalog_name = 'YOUR_MARKETPLACE_SHARE_CATALOG'

### A2. Configure Your Catalog and Schema



1. In this step, you must **specify a catalog that you own or have write access to**.
    - If you already have a catalog, update the code below to use its name.
    - If you do not yet have one, create a catalog first, then return and update the code.

    Replace the value for the `my_catalog` variable with your catalog name.


In [0]:
my_catalog = 'YOUR_CATALOG'   ## <---- Replace with your catalog name

2. Run the cells below to setup your demonstration environment. 

**NOTES: The setup will:**

- Check the variables from above were created
- Create three schemas in your specified catalog:  
    - **multi_flow_1_bronze**
    - **multi_flow_2_silver**
    - **multi_flow_3_gold**  
- Creates a three volumes in your **your-catalog.multi_flow_1_bronze** schema and adds a single JSON file in each volume.
- Checks your specified compute

This ensures that all schemas, tables and objects are created in your catalog.

In [0]:
%run ./Includes/Classroom-Setup-multiple-flows

In [0]:
my_vol_path = multi_flow_demo_setup(
    my_catalog = my_catalog, 
    marketplace_catalog = your_marketplace_share_catalog_name, 
    schema = 'multi_flows',
    source_volumes = ['bright_home_orders','lumina_sports_orders','northstar_outfitters_orders'],
    reset_volume = False  ## <-- Set to True to delete all files in your volumes to start fresh if you've already complete the demo
)

3. Run the cell below to view the value of the `my_vol_path` variable.

   Confirm that the value references your **your-catalog.multi_flows_1_bronze** path. This will be used to dynamically reference your source volumes throughout this demonstration.


In [0]:
print(my_vol_path)

## B. Explore the Source Volumes for Ingesting Multiple Flows into a Single Target

Before building multiple flows that write to a single target streaming table, start by exploring the raw source data stored in in the three volumes.  

Each **volume** represents a **separate sales system** for a different subsidiary within our fictional company:

- **B1.** Bright Home Orders volume (`CSV` files)  
- **B2.** Lumina Sports Orders volume (`CSV` files)  
- **B3.** Northstar Outfitters Orders volume (`JSON` files)

### B1. Bright Home Orders Volume

1. View the files in the **multi_flows_1_bronze.bright_home_orders** volume.

   Notice that only one `CSV` file currently exists in this volume for the sales on **2025-11-01**.


In [0]:
spark.sql(f"LIST '{my_vol_path}/bright_home_orders'").display()

2. Explore the raw data for **bright_home_orders**. The cell below performs the following:

   a. Counts the number of records in the file within the volume.  
   b. Describes the default ingestion data types for each column of the `CSV` file.  
   c. Previews the data.  

In the output, notice the following:
- **157** rows are present in this file.  
- The schema is inferred and returns a variety of data types.  
- This is simple sales order data from the company's **Bright Home** subsidiary.


In [0]:
# a. Row count
df_count = spark.sql(f"""
    SELECT count(*) AS TotalRows
    FROM read_files('{my_vol_path}/bright_home_orders')
""")
display(df_count)

# b. Schema
df_schema = spark.sql(f"""
    DESCRIBE SELECT * 
    FROM read_files('{my_vol_path}/bright_home_orders')
""")
display(df_schema)

# c. Preview rows
df_preview = spark.sql(f"""
    SELECT *
    FROM read_files('{my_vol_path}/bright_home_orders')
    LIMIT 5
""")
display(df_preview)

### B2. Lumina Sports Orders Volume

1. View the files in the **multi_flows_1_bronze.lumina_sports_orders** volume.

   Notice that only one `CSV` file currently exists in this volume for the sales on **2025-11-01**.

In [0]:
spark.sql(f"LIST '{my_vol_path}/lumina_sports_orders'").display()

2. Explore the raw data for **Lumina Sports**. The cell below performs the following:

   a. Counts the number of records in the file within the volume.  
   b. Describes the default ingestion data types for each column of the `CSV` file.  
   c. Previews the data.  

In the output, notice the following:
- **110** rows are present in this file.  
- The schema is inferred and returns a variety of data types.  
- This is simple sales order data from the company's **Lumina Sports** subsidiary.


In [0]:
# a. Row count
df_count = spark.sql(f"""
    SELECT count(*) AS TotalRows
    FROM read_files('{my_vol_path}/lumina_sports_orders')
""")
display(df_count)

# b. Schema
df_schema = spark.sql(f"""
    DESCRIBE SELECT *
    FROM read_files('{my_vol_path}/lumina_sports_orders')
""")
display(df_schema)

# c. Full preview
df_preview = spark.sql(f"""
    SELECT *
    FROM read_files('{my_vol_path}/lumina_sports_orders')
    LIMIT 5
""")
display(df_preview)

### B3. Northstar Outfitters Orders Volume

1. View the files in the **multi_flows_1_bronze.northstar_outfitters_orders** volume.

   Notice that only one `JSON` file currently exists in this volume for the sales on **2025-11-01**.

In [0]:
spark.sql(f"LIST '{my_vol_path}/northstar_outfitters_orders'").display()

2. Explore the raw data for **Northstar Outfitters**. The cell below performs the following:

   a. Counts the number of records in the file within the volume.  
   b. Describes the default ingestion data types for each column of the `JSON` file.  
   c. Previews the data.  

In the output, notice the following:
- **182** rows are present in this file.  
- The schema is inferred and returns a variety of data types.  
- This is simple sales order data from the company's **Northstar Outfitters** subsidiary.


In [0]:
# a. Row count
df_count = spark.sql(f"""
    SELECT count(*) AS TotalRows
    FROM read_files('{my_vol_path}/northstar_outfitters_orders')
""")
display(df_count)

# b. Schema
df_schema = spark.sql(f"""
    DESCRIBE SELECT *
    FROM read_files('{my_vol_path}/northstar_outfitters_orders')
""")
display(df_schema)

# c. Full preview
df_preview = spark.sql(f"""
    SELECT *
    FROM read_files('{my_vol_path}/northstar_outfitters_orders')
    LIMIT 5
""")
display(df_preview)

### B4. Raw Data Exploration Summary

Below is a quick overview of what we discovered after examining the raw source volumes. 

#### Raw Cloud Storage Overview
Each source volume contains one file with a small number of sales for our demonstration.

| Source Data (volume)            | File Format | # of Rows | # of Files | Sales Date |
|---------------------------------|-------------|-----------|------------|------------|
| Bright Home Orders              | CSV         | 157       | 1          | 2025-11-01|
| Lumina Sports Orders            | CSV         | 110       | 1          | 2025-11-01|
| Northstar Outfitters Orders     | JSON        | 182       | 1          | 2025-11-01|


<br>

#### Schema Differences Comparison and Issues

- Each raw data source uses its own structure (`CSV` or `JSON`).  
- When ingesting `CSV`, the inferred column types often differ from `JSON`.  
- Since each format infers schemas independently, combining these three flows into a single target table will introduce **schema mismatches that lead to ingestion conflicts**.

| **Column Name**     | **bright_home_orders (CSV)** | **lumina_sports_orders (CSV)** | **northstar_outfitters_orders (JSON)** |
|---------------------|------------------------|---------------------------|---------------------------------|
| subsidiary_id       | string                 | string                    | string                          |
| order_id            | string                 | string                    | string                          |
| **order_timestamp** | **timestamp**          | **timestamp**             | **string**                      |
| customer_id         | string                 | string                    | string                          |
| region              | string                 | string                    | string                          |
| country             | string                 | string                    | string                          |
| city                | string                 | string                    | string                          |
| channel             | string                 | string                    | string                          |
| sku                 | string                 | string                    | string                          |
| category            | string                 | string                    | string                          |
| **qty**             | **int**                | **int**                   | **bigint**                      |
| unit_price          | double                 | double                    | double                          |
| **discount_pct**    | **int**                | **int**                   | **bigint**                      |
| coupon_code         | string                 | string                    | string                          |
| total_amount        | double                 | double                    | double                          |
| **order_date**      | **date**               | **date**                  | **string**                      |
| _rescued_data       | string                 | string                    | string                          |

<br>

#### Goal: Ingest All Raw Source Files Into One Bronze Streaming Table

To successfully combine all three sources into a single bronze streaming table, we will need to standardize the schema. 

To do this, we will **ingest every column as a `STRING`** into the bronze table.

This avoids data type conflicts between `CSV` and `JSON` files, since each format infers types differently. Normalizing everything to `STRING` keeps the bronze layer predictable, prevents ingestion failures, and lets us apply the correct data types later in the silver layer.

## C. Create the Spark Declarative Pipeline

### C1. Enable the Lakeflow Pipelines Editor

Complete the following steps to confirm or enable the **Lakeflow Pipelines Editor**:

1. In the top-right corner of the workspace, select your **account icon** ![Account Icon](./Includes/images/account_icon.png) (*Your icon letter will differ*).  

2. Right-click **Settings** and choose **Open link in new tab**.  

3. In the left sidebar, select **Developer** under **User**.  

4. In the **Experimental features** section, locate **Lakeflow Pipelines Editor** and toggle it **on**.

### C2. Create a Lakeflow Spark Declarative Pipeline using the Lakeflow Pipelines Editor
Complete the following steps to create your Spark Declarative Pipeline:

1. In the main navigation pane, right-click **Jobs & Pipelines** and select **Open link in New Tab**.  

2. In the new tab, select **Create â†’ ETL Pipeline**.  

   **NOTE:** If prompted to **Try the new Lakeflow Pipelines Editor**, choose **Enable Lakeflow Pipelines Editor**. This appears only if you did not complete the previous step.  

3. At the top, complete the following:
   - Name your pipeline `demo_multi_flow_yourname`
   - Select your default **catalog** and **schema**:  
        - **Catalog:** The catalog you specified for this notebook  
        - **Schema:** **multi_flows_1_bronze**  

4. Select **Start with an empty file**. In the pop-up window, specify the following:  
   - For **Location where the pipeline folder will be created** - Specify the folder this notebook resides in.
   - Select **SQL**
   - Select **Create**

5. Rename the **transformations** folder to `ingest_multiple_flows`.

6. Rename the **my_transformations.sql** file to `flow_ingestion.sql`.

7. Leave the **Lakeflow Pipelines Editor** page open.

#### Checkpoint
![Create SDP Checkpoint](./Includes/images/multi_flow/pipeline_creation.png)

## D. Using Multiple Flows to Write to a Single Target

In many enterprise environments, data arrives from several systems that must be consolidated into a single table for downstream processing.

In this example, the company has **three subsidiaries, each producing transaction data in slightly different raw file formats**. 

With Spark Declarative Pipelines, you can define multiple flows that write to the same target streaming table, allowing all raw files in cloud storage to be **incrementally ingested into one unified destination**.

### D1. Create the Bronze Target Table

1. Start by creating the **bronze streaming table** that will serve as the landing zone for all incoming transaction data. 

    This table ingests every column as `STRING` to ensure compatibility across the different source systems.


    Copy the SQL code below and paste it into your `flow_ingestion.sql` file.

<button onclick="copyBlock()">Copy to clipboard</button>

<pre id="copy-block" style="font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace; border:1px solid #e5e7eb; border-radius:10px; background:#f8fafc; padding:14px 16px; font-size:0.85rem; line-height:1.35; white-space:pre;">
<code>
------------------------------------------
-- CREATE THE BRONZE TABLE STRUCTURE
------------------------------------------
CREATE OR REPLACE STREAMING TABLE multi_flows_1_bronze.orders_bronze_flows_demo
(
  subsidiary_id   STRING,
  order_id        STRING,
  order_timestamp STRING,
  customer_id     STRING,
  region          STRING,
  country         STRING,
  city            STRING,
  channel         STRING,
  sku             STRING,
  category        STRING,
  qty             STRING,
  unit_price      STRING,
  discount_pct    STRING,
  coupon_code     STRING,
  total_amount    STRING,
  order_date      STRING,
  source_file     STRING,   -- Added by the _metadata column to return the source file name
  file_mod_time   TIMESTAMP -- Added by the _metadata column to return file modification time of the file. Returns a consistent value
)
COMMENT "Creates a single bronze streaming table with orders from all subsidiaries using multiple flows."
TBLPROPERTIES (
  'pipelines.reset.allowed' = false    -- prevent full table refreshes on the bronze table
);
</code></pre>

<script>
function copyBlock() {
  const el = document.getElementById("copy-block");
  if (!el) return;

  const text = el.innerText;

  // Preferred modern API
  if (navigator.clipboard && navigator.clipboard.writeText) {
    navigator.clipboard.writeText(text)
      .then(() => alert("Copied to clipboard"))
      .catch(err => {
        console.error("Clipboard write failed:", err);
        fallbackCopy(text);
      });
  } else {
    fallbackCopy(text);
  }
}

function fallbackCopy(text) {
  const textarea = document.createElement("textarea");
  textarea.value = text;
  textarea.style.position = "fixed";
  textarea.style.left = "-9999px";
  document.body.appendChild(textarea);
  textarea.select();
  try {
    document.execCommand("copy");
    alert("Copied to clipboard");
  } catch (err) {
    console.error("Fallback copy failed:", err);
    alert("Could not copy to clipboard. Please copy manually.");
  } finally {
    document.body.removeChild(textarea);
  }
}
</script>

**Review the code**
- The `COMMENT` clause adds descriptive metadata to the table for documentation purposes.

- The `TBLPROPERTIES` statement configures the following setting:
  - **Reset Protection**: The `'pipelines.reset.allowed' = false` property prevents full refreshes on the streaming table, which helps avoid accidentally removing checkpoints and truncating the streaming table data.

#### IMPORTANT: Understanding Full Table Refresh Protection

This protection is particularly important when your raw data source automatically removes files after a certain timeframe. Without this setting, data that is no longer present in the source directory would not be reingested into the target table during a **Run pipeline with full table refresh** operation.

**NOTE:** For guidance on when to use full refreshes, see the [Should I use a full refresh?](https://docs.databricks.com/aws/en/ldp/updates#should-i-use-a-full-refresh) documentation.

### D2. Configure the Pipeline Parameters

1. Run the cell below to retrieve the key value pairs needed to set your pipeline configuration parameters for each **raw data source volume**.

In [0]:
config_parameters = [
    ('bright_home_orders_source',        f'{my_vol_path}/bright_home_orders'),
    ('lumina_sports_orders_source',      f'{my_vol_path}/lumina_sports_orders'),
    ('northstar_outfitters_orders_source', f'{my_vol_path}/northstar_outfitters_orders')
]

for key, value in config_parameters:
    print(f"Key: {key}\nValue: {value}\n")

2. Copy the paths above and add each one by one as a configuration parameter in your **Spark Declarative Pipeline**.

   This will allow your pipeline to reference each volume through parameters.

   a. Select **Settings** in your pipeline tab.  

   b. Under **Configuration**, select **Add configuration**. 

   c. For each **Key**, enter the key name shown above.  

   d. For each **Value**, enter the corresponding volume path.  

   e. Select **Save**.

   **NOTE:** For more details on configuration parameters, see the Databricks documentation: [Use parameters with Lakeflow Declarative Pipelines](https://docs.databricks.com/aws/en/ldp/parameters)


#### Checkpoint (your path will vary)
<img src="./Includes/images/multi_flow/checkpoint_config_params.png" alt="Config Parameter Checkpoint" width="600">


### D3. Configure Flow from the Store Bright Home Orders Volume

1. Copy the code below and paste into your `flow_ingestion.sql` file.

<button onclick="copyBlock()">Copy to clipboard</button>

<pre id="copy-block" style="font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace; border:1px solid #e5e7eb; border-radius:10px; background:#f8fafc; padding:14px 16px; font-size:0.85rem; line-height:1.35; white-space:pre;">
<code>
<!-------------------ADD SOLUTION CODE BELOW------------------->
------------------------------------------
-- BRONZE FLOW - BRIGHT HOME
------------------------------------------
-- Read CSV files from the bright_home_orders volume
CREATE FLOW bright_home_orders_flow
AS INSERT INTO multi_flows_1_bronze.orders_bronze_flows_demo BY NAME
SELECT
  CAST(subsidiary_id AS STRING) AS subsidiary_id,
  CAST(order_id AS STRING) AS order_id,
  CAST(order_timestamp AS STRING) AS order_timestamp,
  CAST(customer_id AS STRING) AS customer_id,
  CAST(region AS STRING) AS region,
  CAST(country AS STRING) AS country,
  CAST(city AS STRING) AS city,
  CAST(channel AS STRING) AS channel,
  CAST(sku AS STRING) AS sku,
  CAST(category AS STRING) AS category,
  CAST(qty AS STRING) AS qty,
  CAST(unit_price AS STRING) AS unit_price,
  CAST(discount_pct AS STRING) AS discount_pct,
  CAST(coupon_code AS STRING) AS coupon_code,
  CAST(total_amount AS STRING) AS total_amount,
  CAST(order_date AS STRING) AS order_date,
  _metadata.file_name AS source_file,
  _metadata.file_modification_time AS file_mod_time
FROM STREAM read_files(
    '${bright_home_orders_source}',   -- Uses the configuration parameter to point to the bright_home_orders volume
    format => 'csv',
    header => true
);
<!-------------------END SOLUTION CODE------------------->
</code></pre>

<script>
function copyBlock() {
  const el = document.getElementById("copy-block");
  if (!el) return;

  const text = el.innerText;

  // Preferred modern API
  if (navigator.clipboard && navigator.clipboard.writeText) {
    navigator.clipboard.writeText(text)
      .then(() => alert("Copied to clipboard"))
      .catch(err => {
        console.error("Clipboard write failed:", err);
        fallbackCopy(text);
      });
  } else {
    fallbackCopy(text);
  }
}

function fallbackCopy(text) {
  const textarea = document.createElement("textarea");
  textarea.value = text;
  textarea.style.position = "fixed";
  textarea.style.left = "-9999px";
  document.body.appendChild(textarea);
  textarea.select();
  try {
    document.execCommand("copy");
    alert("Copied to clipboard");
  } catch (err) {
    console.error("Fallback copy failed:", err);
    alert("Could not copy to clipboard. Please copy manually.");
  } finally {
    document.body.removeChild(textarea);
  }
}
</script>


**Review the code**
- All business columns are cast to `STRING` so this flow aligns with the unified bronze schema, while the **metadata** columns keep their native types.  
- The **metadata columns** capture the source **file name and modification time**, which helps with lineage and debugging.  
- The `FROM STREAM read_files('${bright_home_orders_source}', ...)` clause uses your configuration parameter to reference the volume path and relies on Auto Loader for incremental ingestion.

### D4. Configure Flow from Store Lumina Sports Orders Volume

1. Copy the code below and paste into your `flow_ingestion.sql` file.

<button onclick="copyBlock()">Copy to clipboard</button>

<pre id="copy-block" style="font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace; border:1px solid #e5e7eb; border-radius:10px; background:#f8fafc; padding:14px 16px; font-size:0.85rem; line-height:1.35; white-space:pre;">
<code>
<!-------------------ADD SOLUTION CODE BELOW------------------->
------------------------------------------
-- BRONZE FLOW - LUMINA SPORTS
------------------------------------------
-- Read CSV files from the lumina_sports_orders volume
CREATE FLOW lumina_sports_orders_flow
AS INSERT INTO multi_flows_1_bronze.orders_bronze_flows_demo BY NAME
SELECT
  CAST(subsidiary_id AS STRING) AS subsidiary_id,
  CAST(order_id AS STRING) AS order_id,
  CAST(order_timestamp AS STRING) AS order_timestamp,
  CAST(customer_id AS STRING) AS customer_id,
  CAST(region AS STRING) AS region,
  CAST(country AS STRING) AS country,
  CAST(city AS STRING) AS city,
  CAST(channel AS STRING) AS channel,
  CAST(sku AS STRING) AS sku,
  CAST(category AS STRING) AS category,
  CAST(qty AS STRING) AS qty,
  CAST(unit_price AS STRING) AS unit_price,
  CAST(discount_pct AS STRING) AS discount_pct,
  CAST(coupon_code AS STRING) AS coupon_code,
  CAST(total_amount AS STRING) AS total_amount,
  CAST(order_date AS STRING) AS order_date,
  _metadata.file_name AS source_file,
  _metadata.file_modification_time AS file_mod_time
FROM STREAM read_files(
  '${lumina_sports_orders_source}',   -- Uses the configuration parameter to point to the lumina sports volume
  format => 'csv',
  header => true
);
<!-------------------END SOLUTION CODE------------------->
</code></pre>

<script>
function copyBlock() {
  const el = document.getElementById("copy-block");
  if (!el) return;

  const text = el.innerText;

  // Preferred modern API
  if (navigator.clipboard && navigator.clipboard.writeText) {
    navigator.clipboard.writeText(text)
      .then(() => alert("Copied to clipboard"))
      .catch(err => {
        console.error("Clipboard write failed:", err);
        fallbackCopy(text);
      });
  } else {
    fallbackCopy(text);
  }
}

function fallbackCopy(text) {
  const textarea = document.createElement("textarea");
  textarea.value = text;
  textarea.style.position = "fixed";
  textarea.style.left = "-9999px";
  document.body.appendChild(textarea);
  textarea.select();
  try {
    document.execCommand("copy");
    alert("Copied to clipboard");
  } catch (err) {
    console.error("Fallback copy failed:", err);
    alert("Could not copy to clipboard. Please copy manually.");
  } finally {
    document.body.removeChild(textarea);
  }
}
</script>


**Review the code**
- All business columns are cast to `STRING` so this flow aligns with the unified bronze schema, while the **metadata** columns keep their native types.  
- The **metadata columns** capture the source **file name and modification time**, which helps with lineage and debugging.  
- The `FROM STREAM read_files('${lumina_sports_orders_source}', ...)` clause uses your configuration parameter to reference the volume path and relies on Auto Loader for incremental ingestion.

### D5. Configure Flow from Northstar Outfitters Orders Volume

1. Copy the code below and paste into your `flow_ingestion.sql` file.

<button onclick="copyBlock()">Copy to clipboard</button>

<pre id="copy-block" style="font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace; border:1px solid #e5e7eb; border-radius:10px; background:#f8fafc; padding:14px 16px; font-size:0.85rem; line-height:1.35; white-space:pre;">
<code>
<!-------------------ADD SOLUTION CODE BELOW------------------->
------------------------------------------
-- BRONZE FLOW - NORTHSTAR OUTFITTERS
------------------------------------------
-- Read JSON files from the northstar_outfitters_orders volume
CREATE FLOW northstar_outfitters_orders_flow
AS INSERT INTO multi_flows_1_bronze.orders_bronze_flows_demo BY NAME
SELECT
  CAST(subsidiary_id AS STRING) AS subsidiary_id,
  CAST(order_id AS STRING) AS order_id,
  CAST(order_timestamp AS STRING) AS order_timestamp,
  CAST(customer_id AS STRING) AS customer_id,
  CAST(region AS STRING) AS region,
  CAST(country AS STRING) AS country,
  CAST(city AS STRING) AS city,
  CAST(channel AS STRING) AS channel,
  CAST(sku AS STRING) AS sku,
  CAST(category AS STRING) AS category,
  CAST(qty AS STRING) AS qty,
  CAST(unit_price AS STRING) AS unit_price,
  CAST(discount_pct AS STRING) AS discount_pct,
  CAST(coupon_code AS STRING) AS coupon_code,
  CAST(total_amount AS STRING) AS total_amount,
  CAST(order_date AS STRING) AS order_date,
  _metadata.file_name AS source_file,
  _metadata.file_modification_time AS file_mod_time
FROM STREAM read_files(
  '${northstar_outfitters_orders_source}',  -- Uses the configuration parameter to point to the northstar volume
  format => 'json'
);
<!-------------------END SOLUTION CODE------------------->
</code></pre>

<script>
function copyBlock() {
  const el = document.getElementById("copy-block");
  if (!el) return;

  const text = el.innerText;

  // Preferred modern API
  if (navigator.clipboard && navigator.clipboard.writeText) {
    navigator.clipboard.writeText(text)
      .then(() => alert("Copied to clipboard"))
      .catch(err => {
        console.error("Clipboard write failed:", err);
        fallbackCopy(text);
      });
  } else {
    fallbackCopy(text);
  }
}

function fallbackCopy(text) {
  const textarea = document.createElement("textarea");
  textarea.value = text;
  textarea.style.position = "fixed";
  textarea.style.left = "-9999px";
  document.body.appendChild(textarea);
  textarea.select();
  try {
    document.execCommand("copy");
    alert("Copied to clipboard");
  } catch (err) {
    console.error("Fallback copy failed:", err);
    alert("Could not copy to clipboard. Please copy manually.");
  } finally {
    document.body.removeChild(textarea);
  }
}
</script>


**Review the code**
- All business columns are cast to `STRING` so this flow aligns with the unified bronze schema, while the **metadata** columns keep their native types.  
- The **metadata columns** capture the source **file name and modification time**, which helps with lineage and debugging.  
- The `FROM STREAM read_files('${northstar_outfitters_orders_source}', ...)` clause uses your configuration parameter to reference the volume path and relies on Auto Loader for incremental ingestion.

### D6. Run and Explore the Pipeline

1. Run the Spark Declarative Pipeline and confirm it runs successfully. 

2. Explore the run in the Lakeflow Pipelines Editor.
  - Confirm **449** rows were ingested into the bronze table from the three volumes (**157 + 110 + 182**)
  - Preview the data in the editor (Select **orders_bronze_flows_demo** -> **Data** tab). Notice data from each volume was incrementally ingested into the bronze table.

> **TROUBLESHOOTING:** If your pipeline does not run successfully, confirm that your volumes were created and that your configuration parameters are set correctly.

#### Checkpoint

<img src="./Includes/images/multi_flow/checkpoint_bronze_flows.png" alt="Bronze Flow" width="1200">

### D7. Query the Bronze Streaming Table
1. The query below groups records by the captured **source_file** column so you can see the number of ingested rows per file.

    Review how many files were ingested from each source confirming each cloud storage flow was ingested successfully.

#### Checkpoint
| Source File                 | Total Ingested by Source  |
|-----------------------------|---------------------------|
| nso_orders_2025-11-01.json  | 182                       |
| bsh_orders_2025-11-01.csv   | 157                       |
| lms_orders_2025-11-01.csv   | 110                       |

In [0]:
%sql
SELECT source_file, count(*) AS `Total Ingested by Source`
FROM multi_flows_1_bronze.orders_bronze_flows_demo
GROUP BY source_file

## E. Silver Table Data Quality, Optimization, and Transformation

In the silver layer, we refine the **raw bronze table** into a clean and consistent **single source of truth (silver table)**. 

This is where we standardize fields, enforce data quality, and prepare the dataset for downstream analytics.

In this section, you will:

- Apply data quality constraints  
- Clean and standardize fields  
- Enable liquid clustering for on the streaming table for optimized performance  

### E1. Create a New SQL File in your Pipeline

1. Click the kebab menu and select **New File** in the **ingest_multiple_flows** folder.
2. Select the language as **SQL**.
3. Name the file `silver_transformation.sql`.
4. Keep the dataset type as `None Selected`.


### E2. Create Silver Transactions Table with Liquid Clustering and Data Quality Rules

1. Add the following code to your `silver_transformation.sql` file to create a Silver table with:
- a defined schema, 
- enforce consistent column types through `TRY_CAST` 
- and enable liquid clustering for performance

<button onclick="copyBlock()">Copy to clipboard</button>

<pre id="copy-block" style="font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace; border:1px solid #e5e7eb; border-radius:10px; background:#f8fafc; padding:14px 16px; font-size:0.85rem; line-height:1.35; white-space:pre;">
<code>
<!-------------------ADD SOLUTION CODE BELOW------------------->
CREATE OR REFRESH STREAMING TABLE multi_flows_2_silver.orders_silver_flows_demo
(
  -- A: Define a fixed schema to prevent schema evolution.
  subsidiary_id   STRING,
  order_id        STRING,
  order_timestamp TIMESTAMP,
  order_date      DATE,
  customer_id     STRING,
  region          STRING,
  country         STRING,
  city            STRING,
  channel         STRING,
  sku             STRING,
  category        STRING,
  qty             INT,
  unit_price      DOUBLE,
  discount_pct    DOUBLE,
  total_amount    DOUBLE,
  coupon_code     STRING,

  -- B: Data quality constraints to drop or flag or drop invalid rows.
  CONSTRAINT qty_valid          EXPECT (qty >= 0) ON VIOLATION DROP ROW,
  CONSTRAINT total_amount_valid EXPECT (total_amount >= 0) ON VIOLATION DROP ROW,
  CONSTRAINT timestamp_not_null EXPECT (order_timestamp IS NOT NULL) ON VIOLATION FAIL UPDATE
)
-- C: Adds a table comment
COMMENT 'Clean and standardize data from the multiple-flow bronze table'

-- D: Enable liquid clustering to improve performance on common filters.
CLUSTER BY AUTO

AS
-- E: Select and clean data from the Bronze table. Uses TRY_CAST to enforce consistent types across all subsidiaries.
SELECT
  subsidiary_id,
  order_id,
  TRY_CAST(order_timestamp AS TIMESTAMP) AS order_timestamp, 
  TRY_CAST(order_date      AS DATE)      AS order_date,
  customer_id,
  region,
  country,
  city,
  channel,
  sku,
  category,
  TRY_CAST(qty          AS INT)    AS qty,
  TRY_CAST(unit_price   AS DOUBLE) AS unit_price,
  TRY_CAST(discount_pct AS DOUBLE) AS discount_pct,
  TRY_CAST(total_amount AS DOUBLE) AS total_amount,
  coupon_code
-- F: Incrementally reads data from the bronze table that contains data from three volumes
FROM STREAM multi_flows_1_bronze.orders_bronze_flows_demo;
<!-------------------END SOLUTION CODE------------------->
</code></pre>

<script>
function copyBlock() {
  const el = document.getElementById("copy-block");
  if (!el) return;

  const text = el.innerText;

  // Preferred modern API
  if (navigator.clipboard && navigator.clipboard.writeText) {
    navigator.clipboard.writeText(text)
      .then(() => alert("Copied to clipboard"))
      .catch(err => {
        console.error("Clipboard write failed:", err);
        fallbackCopy(text);
      });
  } else {
    fallbackCopy(text);
  }
}

function fallbackCopy(text) {
  const textarea = document.createElement("textarea");
  textarea.value = text;
  textarea.style.position = "fixed";
  textarea.style.left = "-9999px";
  document.body.appendChild(textarea);
  textarea.select();
  try {
    document.execCommand("copy");
    alert("Copied to clipboard");
  } catch (err) {
    console.error("Fallback copy failed:", err);
    alert("Could not copy to clipboard. Please copy manually.");
  } finally {
    document.body.removeChild(textarea);
  }
}
</script>

**Review the code**
- A: `CREATE OR REFRESH STREAMING TABLE` - Creates or refreshes the **multi_flows_2_silver.orders_silver_flows_demo** Silver table and applies a fixed schema to prevent schema drift across multiple flows.

- B: `CONSTRAINT ... EXPECT`  Applies data quality rules that drop rows with invalid quantities, negative totals, or missing timestamps.

- C: `COMMENT`  Adds descriptive metadata explaining the purpose of the Silver table.

- D: `CLUSTER BY AUTO`  Enables automatic liquid clustering where Databricks intelligently chooses clustering keys to optimize your query performance. You can also specify your own clustering keys if you'd like.

- E: `TRY_CAST`  Enforces consistent column types across all subsidiaries by converting raw values into standardized data types.

- F: `SELECT ... FROM STREAM`  Incrementally reads, selects, and cleans records from the **multi_flows_1_bronze.orders_bronze_flows_demo** Bronze streaming table, which contains data from three separate volumes.

### E3. Run and Explore the Pipeline 
1. Select **Run pipeline** to create the Silver table.  
   - This run executes the transformation logic and applies the data quality expectations.

2. Explore the pipeline in the Lakeflow Pipeline Editor. Notice the following:
   - Since the pipeline has already ingested the source files into Bronze, **0** rows are processed in the Bronze table.
   - All **449** rows are processed in the Silver table.
   - In the **Expectations** column, select `3 met` to view the data quality rules. All rows pass the expectations.



#### Checkpoint

![Silver SDP Checkpoint](./Includes/images/multi_flow/checkpoint_silver.png)

### E4. Explore the Silver Streaming table

1. Run the command to view the table metadata. When the results appear, notice the following:

- The silver table has the exact **column data types** that were defined.
- **Liquid clustering** is enabled:
  - In the **# Clustering Information** section no columns are specified since Databricks hasn't optimized the keys yet (requires historical query analysis on the table to optimize the clustered columns)
  - In the **Table Properties** row you will see cluster by auto is enabled (`clusterByAuto=true`).

**NOTE:** SDP supports Liquid clustering. Liquid clustering automatically organizes data based on frequently filtered columns to improve query performance. For more information, view the [Use liquid clustering for tables](https://docs.databricks.com/aws/en/delta/clustering).


In [0]:
%sql
DESCRIBE TABLE EXTENDED multi_flows_2_silver.orders_silver_flows_demo;

2. Run the code below to view the data in your **silver table**. 

    Confirm that the records look clean, standardized, and ready for gold-level analysis. 
    
    Look for consistent data types, valid numeric values, and properly cast timestamps and dates.


In [0]:
%sql
SELECT *
FROM multi_flows_2_silver.orders_silver_flows_demo;

## F. Business Intelligence Materialized Views

Materialized views in Lakeflow provide precomputed results that power fast, reliable analytics for downstream users. Unlike regular views, they automatically refresh as new data arrives, so stakeholders always see up-to-date insights without requiring on-demand computation.

**NOTE:** This section assumes prior familiarity with materialized views.


### F1. Create a New SQL File in your Pipeline

1. Click the kebab menu and select **New File** in the **ingest_multiple_flows** folder.
2. Select the language as **SQL**.
3. Name the file `gold_mvs.sql`.
4. Keep the dataset type as `None Selected`.

### F2. Create Simple Gold Materialized Views

1. Next you will add two small simple materialized views for your consumers. These are only used to confirm that your silver data is clean and ready for analysis. Materialized views are not the focus here, so we will keep them simple.

   a. The first materialized view gives a **daily summary by subsidiary**, letting you quickly check revenue, units, and order counts over time.  

   b. The second view highlights **basic product performance** so you can see which categories and SKUs are selling within each subsidiary.


2. Copy the code below into your `gold_mvs.sql` file to create both materialized views.

<button onclick="copyBlock()">Copy to clipboard</button>

<pre id="copy-block" style="font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace; border:1px solid #e5e7eb; border-radius:10px; background:#f8fafc; padding:14px 16px; font-size:0.85rem; line-height:1.35; white-space:pre;">
<code>
<!-------------------ADD SOLUTION CODE BELOW------------------->
------------------------------------------
-- a. GOLD MATERIALIZED VIEW: DAILY SUBSIDIARY SCORECARD
-- Simple daily summary by subsidiary
------------------------------------------
CREATE OR REPLACE MATERIALIZED VIEW multi_flows_3_gold.mv_daily_subsidiary_scorecard_demo
AS
SELECT
  order_date,
  subsidiary_id,
  COUNT(DISTINCT order_id)    AS order_count,   -- how many unique orders occurred
  ROUND(SUM(total_amount),2)  AS total_revenue, -- total revenue for the day
  SUM(qty)                    AS total_units    -- total units sold
FROM multi_flows_2_silver.orders_silver_flows_demo
WHERE order_date IS NOT NULL
GROUP BY order_date, subsidiary_id;


------------------------------------------
-- b. GOLD MATERIALIZED VIEW: PRODUCT PERFORMANCE BY SUBSIDIARY
-- Basic units and revenue by product and subsidiary
------------------------------------------
CREATE OR REPLACE MATERIALIZED VIEW multi_flows_3_gold.mv_product_performance_by_subsidiary_demo
AS
SELECT
  subsidiary_id,
  category,
  sku,
  SUM(qty)                   AS units_sold,  -- total units sold for each SKU
  ROUND(SUM(total_amount),2) AS revenue      -- total revenue for each SKU
FROM multi_flows_2_silver.orders_silver_flows_demo
GROUP BY subsidiary_id, category, sku;
<!-------------------END SOLUTION CODE------------------->
</code></pre>

<script>
function copyBlock() {
  const el = document.getElementById("copy-block");
  if (!el) return;

  const text = el.innerText;

  // Preferred modern API
  if (navigator.clipboard && navigator.clipboard.writeText) {
    navigator.clipboard.writeText(text)
      .then(() => alert("Copied to clipboard"))
      .catch(err => {
        console.error("Clipboard write failed:", err);
        fallbackCopy(text);
      });
  } else {
    fallbackCopy(text);
  }
}

function fallbackCopy(text) {
  const textarea = document.createElement("textarea");
  textarea.value = text;
  textarea.style.position = "fixed";
  textarea.style.left = "-9999px";
  document.body.appendChild(textarea);
  textarea.select();
  try {
    document.execCommand("copy");
    alert("Copied to clipboard");
  } catch (err) {
    console.error("Fallback copy failed:", err);
    alert("Could not copy to clipboard. Please copy manually.");
  } finally {
    document.body.removeChild(textarea);
  }
}
</script>


### F3. Run and Explore the Pipeline

1. Select **Run pipeline** to create the two Gold materialized views.

2. Explore the pipeline in the Lakeflow Pipeline Editor. Notice the following:

   - Since the pipeline has already ingested the raw data into the **Bronze** and **Silver** streaming tables, and no new files were added, there are no new records for the streaming tables to process.

   - Both materialized views are created with **3** and **15** rows respectively.

   - In the **Tables** window at the bottom:  
     - Select the **Show and hide columns** button ![Show and Hide Columns](./Includes/images/show_hide_columns_icon.png)  
     - Then show the **Incrementalization** column to see whether the materialized views were **fully recomputed** or **incrementally computed**.

   - On this initial run, both materialized views show **Full recompute** on the initial creation. Later, we will see these views compute incrementally.



#### Checkpoint

![MVs SDP Checkpoint](./Includes/images/multi_flow/checkpoint_mvs_pipeline.png)

### F4. Display the Materialized Views

1. Querying the **multi_flows_3_gold.mv_daily_subsidiary_scorecard_demo** materialized view returns a daily scorecard for each subsidiary. 

    It summarizes order volume, total revenue, and total units so you can quickly compare performance across the three subsidiaries and order date.


In [0]:
%sql
SELECT *
FROM multi_flows_3_gold.mv_daily_subsidiary_scorecard_demo

2. The **multi_flows_3_gold.mv_product_performance_by_subsidiary_demo**  materialized view provides a simple product performance breakdown, showing which categories and SKUs are selling within each subsidiary. 

    It helps you compare units sold and revenue across product lines.


In [0]:
%sql
SELECT *
FROM multi_flows_3_gold.mv_product_performance_by_subsidiary_demo
ORDER BY subsidiary_id, category;

## G. Run the Spark Declarative Pipeline with New Files

Now that the pipeline is built, let's add the daily drop for the **2025-11-02** orders for each subsidiary.

### G1. Land a New File in Each Volume

1. Run the cell below to add the next daily file (**2025-11-02**) to each subsidiary volume.  

2. After the cell runs, confirm that each volume now contains two files: **2025-11-01** and **2025-11-02**.

In [0]:
## Copy a second CSV file into the bright_home_orders volume
marketplace_share_path = f'/Volumes/{your_marketplace_share_catalog_name}/v02/subsidiary_daily_orders'

copy_files(
    copy_from = f'{marketplace_share_path}/bright_home_orders', 
    copy_to = f'/Volumes/{my_catalog}/multi_flows_1_bronze/bright_home_orders', 
    n = 2
)

## Copy a second CSV file into the lumina_sports_orders volume
copy_files(
    copy_from = f'{marketplace_share_path}/lumina_sports_orders', 
    copy_to = f'/Volumes/{my_catalog}/multi_flows_1_bronze/lumina_sports_orders', 
    n = 2
)

## Copy a second JSON file into the northstar_outfitters_orders volume
copy_files(
    copy_from = f'{marketplace_share_path}/northstar_outfitters_orders', 
    copy_to = f'/Volumes/{my_catalog}/multi_flows_1_bronze/northstar_outfitters_orders', 
    n = 2
)


## List files in your volumes (confirm two files exist)
spark.sql(f'LIST "{my_vol_path}/bright_home_orders"').display()
spark.sql(f'LIST "{my_vol_path}/lumina_sports_orders"').display()
spark.sql(f'LIST "{my_vol_path}/northstar_outfitters_orders"').display()

### G2. Explore the New Daily Drop (2025-11-02) File in Each Volume
1. Simply run the cell below to count the number of records in the raw daily drop file **2025-11-02** within each volume.

2. Confirm that the output shows **502** total rows across all three subsidiaries for the **2025-11-02** orders drop.

| Volume                      | TotalRows | FileName                    |
|-----------------------------|-----------|------------------------------|
| bright_home_orders          | 191       | bsh_orders_2025-11-02.csv    |
| lumina_sports_orders        | 170       | lms_orders_2025-11-02.csv    |
| northstar_outfitters_orders | 141       | nso_orders_2025-11-02.json   |
| TOTAL                       | 502       |                              |

In [0]:
from pyspark.sql.functions import lit, sum as _sum

df_all = spark.sql(f"""
    SELECT 
        'bright_home_orders' AS Volume,
        COUNT(*) AS TotalRows,
        'bsh_orders_2025-11-02.csv' AS FileName
    FROM read_files('{my_vol_path}/bright_home_orders/bsh_orders_2025-11-02.csv')

    UNION ALL
    SELECT 
        'lumina_sports_orders' AS Volume,
        COUNT(*) AS TotalRows,
        'lms_orders_2025-11-02.csv' AS FileName
    FROM read_files('{my_vol_path}/lumina_sports_orders/lms_orders_2025-11-02.csv')

    UNION ALL
    SELECT 
        'northstar_outfitters_orders' AS Volume,
        COUNT(*) AS TotalRows,
        'nso_orders_2025-11-02.json' AS FileName
    FROM read_files('{my_vol_path}/northstar_outfitters_orders/nso_orders_2025-11-02.json')
""")

# Build TOTAL row
total_row = (
    df_all
    .agg(_sum("TotalRows").alias("TotalRows"))
    .withColumn("Volume", lit("TOTAL"))
    .withColumn("FileName", lit(""))
    .select("Volume", "TotalRows", "FileName")  # match column order
)

# Append TOTAL row to the bottom
df_with_total = df_all.unionByName(total_row)

display(df_with_total)


### G3. Run and Explore the Pipeline

1. Run the Spark Declarative Pipeline to **incrementally** ingest, process and aggregate the **new daily sales drop**.. Confirm it runs successfully. 

2. Explore the run in the Lakeflow Pipelines Editor.
  - Confirm **502** rows were ingested into the **bronze table** from the new daily orders drop in the three volumes.
  - Confirm all **502** were processed and passed the data quality checks in the **silver table**.
  - Confirm the materialized views:
    - Contain **6** and **15** rows respectively
    - Were both incrementally refreshed (**Incremental**). 
      - **NOTE:** For more information view the [Incremental refresh for materialized views](https://docs.databricks.com/aws/en/optimizations/incremental-refresh) documentation.

> **TROUBLESHOOTING:** If your pipeline does does not match the output below make sure you have landed the second file in each volume from the 'Land a New File in Each Volume' section above.

#### Checkpoint

![SDP Run 2 Files in All Volumes](./Includes/images/multi_flow/checkpoint_run_daily_drop_2.png)


### G4. Explore the Final Pipeline Objects

1. Run the cell below to view the **bronze** table. 

    Notice that the table contains **951 rows** (The total number of rows after both runs)

In [0]:
%sql
SELECT *
FROM multi_flows_1_bronze.orders_bronze_flows_demo;

2. Count the number of rows ingested by each **source_file** in the **bronze** streaming table. 

    Notice that we can easily examine how many rows were ingested by each source file (**daily orders drop**).

In [0]:
%sql
SELECT source_file, count(*) AS TotalRows
FROM multi_flows_1_bronze.orders_bronze_flows_demo
GROUP BY source_file
ORDER BY source_file;

3. View the data in the **silver table**. Notice that the data is clean, adheres to our defined schema and contains our 'single source of truth'.

In [0]:
%sql
SELECT *
FROM multi_flows_2_silver.orders_silver_flows_demo;

4. View the gold level materialized view **mv_daily_subsidiary_scorecard_demo**. 

    Notice downstream consumers can easily examine orders by date for each subsidiary.

In [0]:
%sql
SELECT *
FROM multi_flows_3_gold.mv_daily_subsidiary_scorecard_demo
ORDER BY order_date, subsidiary_id;

5. View the gold level materialized view **mv_product_performance_by_subsidiary_demo**. 

    Notice downstream consumers can easily examine detailed order metrics by each **subsidiary_id** and **sku**. 

In [0]:
%sql
SELECT *
FROM multi_flows_3_gold.mv_product_performance_by_subsidiary_demo
ORDER BY subsidiary_id, category;

## H. Introduction to Adding Tags to Bronze, Silver and Gold Objects
#### This requires the necessary permissions to add tags

In this step you add semantic metadata to your tables and materialized views. Tags make it easier to organize, search and govern objects in Unity Catalog. They help downstream teams quickly understand what each object represents and how it should be used.

You apply two types of tags:

- **Custom demo tags**  
  These describe the department that owns the data and the quality level in the medallion architecture.  
  Examples:  
  - `demo_tag_Department = 'Sales'`  
  - `demo_tag_Quality = 'bronze' | 'silver' | 'gold'`

- **System tags**  
  These are built-in Unity Catalog tags. Here you add the `system.Certified` tag to identify trusted, production-ready objects.


**NOTE**: For more information view the [Apply tags to Unity Catalog securable objects](https://docs.databricks.com/aws/en/database-objects/tags)

1. The code below creates the following tags:
  - Updates the **bronze table** with department and quality tags so users can see this is raw, ingested data owned by Sales.
  - Updates the **silver table** with the same tags, then adds the `system.Certified` tag to mark it as a clean, trusted dataset.
  - Updates both **gold materialized views** with department and quality tags, then marks each one as `system.Certified` because these are curated analytics objects intended for broad consumption.

**NOTE:** [ALTER TABLE]()

In [0]:
%sql

------------------------------------
-- Bronze table tags
----------------------------------
ALTER TABLE multi_flows_1_bronze.orders_bronze_flows_demo
SET TAGS (
  'demo_tag_Department' = 'Sales',
  'demo_tag_Quality' = 'bronze'
);


----------------------------------
-- Silver table tags
----------------------------------
ALTER TABLE multi_flows_2_silver.orders_silver_flows_demo
SET TAGS (
  'demo_tag_Department' = 'Sales',
  'demo_tag_Quality' = 'silver'
);

-- Add certified system tags
ALTER TABLE multi_flows_2_silver.orders_silver_flows_demo
SET TAGS ('system.Certified');


----------------------------------
-- Materialized views table tags
------------------------------------
ALTER TABLE multi_flows_3_gold.mv_product_performance_by_subsidiary_demo
SET TAGS (
  'demo_tag_Department' = 'Sales',
  'demo_tag_Quality' = 'gold'
);

ALTER TABLE multi_flows_3_gold.mv_daily_subsidiary_scorecard_demo
SET TAGS (
  'demo_tag_Department' = 'Sales',
  'demo_tag_Quality' = 'gold'
);

-- Add certified system tags
ALTER TABLE multi_flows_3_gold.mv_product_performance_by_subsidiary_demo
SET TAGS ('system.Certified');

ALTER TABLE multi_flows_3_gold.mv_daily_subsidiary_scorecard_demo
SET TAGS ('system.Certified');

2. After you add tags to a table or materialized view, you can query them directly using the **information_schema.table_tags** view within the specific catalog, or the **system.information_schema** schema. 

    The query below queries **your-catalog.information_schema**.

In [0]:
%sql
SELECT *
FROM information_schema.table_tags;

3. You can also view tags in **Catalog Explorer**. Follow these steps to inspect the tags on one of your objects:

   a. Right click **Catalog** and select **Open in New Tab**  

   b. Navigate to your catalog  

   c. Open the **multi_flows_3_gold** schema  

   d. Select the **mv_daily_subsidiary_scorecard_demo** materialized view  

   e. In the right pane, locate the **Tags** section and confirm the three tags that were added

#### Checkpoint
![Tagging Checkpoint](./Includes/images/multi_flow/checkpoint_tagging.png)

## I. (OPTIONAL) Land Additional Files in your Volumes

If you'd like to continue practicing, use the demonstration function `copy_files` to dynamically add another JSON file (file number 3) to your cloud storage location.  

**NOTES:** 
- Ensure the variables you defined at the start of this lab: `your_marketplace_share_catalog_name`, `my_catalog` are still active for the function to work properly.
- There are a total of 7 available files you can continue practicing with.

In [0]:
%skip

## Copy a second CSV file into the bright_home_orders volume
marketplace_share_path = f'/Volumes/{your_marketplace_share_catalog_name}/v02/subsidiary_daily_orders'

copy_files(
    copy_from = f'{marketplace_share_path}/bright_home_orders', 
    copy_to = f'/Volumes/{my_catalog}/multi_flows_1_bronze/bright_home_orders', 
    n = 3 # <-- Add a third file to the volume
)

## Copy a second CSV file into the lumina_sports_orders volume
copy_files(
    copy_from = f'{marketplace_share_path}/lumina_sports_orders', 
    copy_to = f'/Volumes/{my_catalog}/multi_flows_1_bronze/lumina_sports_orders', 
    n = 3 # <-- Add a third file to the volume
)

## Copy a second JSON file into the northstar_outfitters_orders volume
copy_files(
    copy_from = f'{marketplace_share_path}/northstar_outfitters_orders', 
    copy_to = f'/Volumes/{my_catalog}/multi_flows_1_bronze/northstar_outfitters_orders', 
    n = 3 # <-- Add a third file to the volume
)


## List files in your volumes
spark.sql(f'LIST "{my_vol_path}/bright_home_orders"').display()
spark.sql(f'LIST "{my_vol_path}/lumina_sports_orders"').display()
spark.sql(f'LIST "{my_vol_path}/northstar_outfitters_orders"').display()

## J. Clean up
1. Feel free to delete the schemas you create in this demonstration by running cell below and confirming the delete (**Y**).

In [0]:
delete_schemas(
    catalog = my_catalog, ## <--- Your catalog name using the variable you set earlier
    schemas = ['multi_flows_1_bronze', 'multi_flows_2_silver', 'multi_flows_3_gold']
)

2. Delete your Spark Declarative Pipeline through the **Jobs & Pipelines** UI.

## K. Summary and Key Takeaways 

- **Multi-Flow Ingestion**: Successfully ingested data from three different subsidiaries (`CSV` and `JSON` formats) into a single bronze streaming table using separate flows
- **Schema Standardization**: Resolved format conflicts by casting all columns to STRING in bronze, then applying proper data types in silver
- **Data Quality & Performance**: Implemented constraint-based data quality checks and enabled liquid clustering for optimized query performance
- **Incremental Processing**: Demonstrated true incremental processing across the entire medallion architecture with automatic materialized view refresh

#### Business Value Delivered

The pipeline processed **951 total records** from six source files across two daily drops, creating a unified view of sales data that enables:
- Cross-subsidiary performance analysis
- Real-time business intelligence through auto-refreshing materialized views
- Scalable data quality enforcement as new subsidiaries are added

This architecture provides a foundation for enterprise-scale data consolidation while maintaining data lineage, quality, and performance optimization.