# DLT pipeline

This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/ub_dlt_demo_pipeline.yml.

In [None]:
# Import DLT and src/ub_dlt_demo
import dlt
import sys
sys.path.append(spark.conf.get("bundle.sourcePath", "."))
import pyspark.sql.functions as F
from ub_dlt_demo import main

### Setting variables to identify the landing zone location

In [None]:
storage_account = "ubsadatabrickspocnpl2"
storage_container = "umpquapocdev"
lz_base_path = "umpqua_poc/landing_zone"

### Ingest Data From Landing Zone

#### DLT Decorator Function (Invokes DLT & Adds Metadata to Table)
- Table name set using the "name=" notation in the @dlt.table() decorator
- Comment added to UC Table in the @dlt.table() decorator
- Table Properties added using the "table_properties={}" notation in the @dlt.table() decorator

#### Table Function (Defines logic used by DLT for the table generated)
- Includes the type of data source. 
    - The "readStream" denotes that we will be performing incremental ingestion
    - In this case we use the "cloudfiles" format to denote ingestion from object storage w/ AutoLoader
- Numerous options are specified to modify the way data is handled:
    - The format of the data files is specified as "csv"
    - The data contains headers specified with "header": "true"
    - The data delimiter is set with "delimiter": "||"
    - The column name for rescued data is specified with "rescuedDataColumn": "_rescued_data"
    - A check is performed to ensure all specified options are valid with "cloudFiles.validateOptions": "true"
    - Directory listing v. File Notification method is specified with "cloudFiles.useNotifications": "false"
    - Backfill interval is specified with "cloudFiles.backfillInterval": "1 day"
    - Schema Evolution Mode is set to Rescue with "cloudFiles.schemaEvolutionMode": "rescue"
    - Processing of overwritten files is ignored with "cloudFiles.allowOverwrites": "false"
- The path to the data source is set
    - f"abfss://{storage_container}@{storage_account}.dfs.core.windows.net/{lz_base_path}/customerpiidata"
    - A storage container, storage account, and landing zone path are set with variables declared previously 

In [None]:
@dlt.table(
    name="customerpiidata",
    comment="Raw custom data capture for customerpiidata",
    table_properties={
        "quality": "bronze",
    },
)
def customerpiidata_dlt():
    return (
        spark.readStream.format("cloudFiles")
        .options(
            **{
                "cloudFiles.format": "csv",
                "header": "true",
                "delimiter": "||",
                "rescuedDataColumn": "_rescued_data",
                "cloudFiles.validateOptions": "true",
                "cloudFiles.useNotifications": "false",
                "cloudFiles.inferColumnTypes": "true",
                "cloudFiles.backfillInterval": "1 day",
                "cloudFiles.schemaEvolutionMode": "rescue",
                "cloudFiles.allowOverwrites": "false",
            }
        )
        .load(
            f"abfss://{storage_container}@{storage_account}.dfs.core.windows.net/{lz_base_path}/customerpiidata"
        )
    )
abfss://umpquapocdev@ubsadatabrickspocnpl2.dfs.core.windows.net/umpqua_poc/landing_zone/customerpiidata

### Further process raw bronze data to silver

#### Additional DLT Decorator Is Added
- The @dlt.expect_or_drop() decorator is used to enforce data quality standards on the silver level

#### Table Function (Defines logic used by DLT for the table generated)
- The data source is changed from the previous example
    - Instead of using AutoLoader to ingest data from object storage, we will be using a Delta Table as the source

In [None]:
@dlt.table(
    name="customerpiidata_clean",
    comment="Cleaned customerpiidata",
    table_properties={
        "quality": "silver",
    },
)
@dlt.expect_or_drop("valid_tax_id", "tax_id IS NOT NULL")
def customerpiidata_clean_dlt():
    return dlt.read_stream("customerpiidata")

### Further process silver data to 2x gold tables

#### Table Function (Defines logic used by DLT for the table generated)
- Delta Tables are read & filtered to create gold tables for a specific business purpose

In [None]:
@dlt.table(
    name="corporate_customer_data",
    comment="Clean corporate customer data",
    table_properties={
        "quality": "gold",
    },
)
def corporate_customer_data_dlt():
    return dlt.read_stream("customerpiidata_clean").filter(F.col("is_company") == 1)

In [None]:
@dlt.table(
    name="consumer_customer_data",
    comment="Clean consumer customer data",
    table_properties={
        "quality": "gold",
    },
)
def consumer_customer_data_dlt():
    return dlt.read_stream("customerpiidata_clean").filter(F.col("is_company") == 0)