# DLT pipeline

This Delta Live Tables (DLT) definition is executed using a pipeline defined in `resources/tpch/pipeline.yml`.

## Ingest into Delta

Define sources to ingest.

In [None]:
source_catalog = "samples"
source_schema = "tpch"

# Define current table name and new table name.
tpch_tables = {
    "customer": "customer_raw",
    "lineitem": "lineitem_raw",
    "nation": "nation_raw",
    "orders": "orders_raw",
    "part": "part_raw",
    "partsupp": "partsupp_raw",
    "region": "region_raw",
    "supplier": "supplier_raw",
}

Define Python inner function. This is leveraging the [DLT metaprogramming pattern]("https://docs.databricks.com/en/delta-live-tables/create-multiple-tables.html").

In [None]:
import dlt


def create_table(src_catalog, src_schema, src_table, target_table):
    @dlt.table(name=target_table)
    def t():
        return spark.read.table(f"{src_catalog}.{src_schema}.{src_table}")


for source_table, target_table in tpch_tables.items():
    create_table(source_catalog, source_schema, source_table, target_table)

## Apply Transformations

In [None]:
@dlt.table(name=target_table)
def t():
    df = spark.read.table(f"LIVE.customer_raw")
    return df