# `dlt` Demo

Resource: https://dlthub.com/docs/intro

## Basic

`dlt` can be used in jupyter notebook or command line (config).

You can create your own [transformer](https://dlthub.com/docs/dlt-ecosystem/verified-sources/filesystem/advanced#example-read-data-from-excel-files) to load excel files.

In [399]:
import dlt
import duckdb

# for data validation
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Literal, Optional
from decimal import Decimal

# using read_csv_duckdb is much more efficient than read_csv, which uses pandas
from dlt.sources.filesystem import filesystem, read_csv_duckdb

In [2]:
class BusinessRecord(BaseModel):
    """Represents a single business record in the dataset."""

    id: int = Field(gt=0, description="Unique identifier for the record")
    value: Decimal = Field(decimal_places=2, description="Business metric value")
    timestamp: datetime = Field(description="Timestamp of the record")
    description: str = Field(
        min_length=1, description="Description of the business activity"
    )
    category: Literal[
        "Finance", "Sales", "Customer Service", "Marketing", "HR", "IT"
    ] = Field(description="Business department category")

Fetch data from the [file system](https://dlthub.com/docs/dlt-ecosystem/verified-sources/filesystem/basic).

In [3]:
filesystem_resource_topic = filesystem(bucket_url="file:data/normal", file_glob="*.csv")

You can add filters (filter by name or size) at this stage.

In [4]:
filesystem_resource_topic.add_filter(
    lambda item: item["file_name"]
    not in ["normal3.csv", "normal4_1.csv", "normal4_2.csv"]
)

<dlt.extract.resource.DltResource at 0x103d19c40>

In [5]:
filesystem_pipe_topic = filesystem_resource_topic | read_csv_duckdb()

You can apply hints (e.g., [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading), i.e., only load the new data, create table name, and specify table schema) at this stage.

In [6]:
filesystem_pipe_topic.apply_hints(
    write_disposition="replace", table_name="normal", columns=BusinessRecord
)

<dlt.extract.resource.DltResource at 0x107756690>

The code below generates a `example.duckdb` file. This file can be used in dbt via dbt-duckdb, see [this doc](https://dlthub.com/docs/dlt-ecosystem/destinations/duckdb).

In [7]:
pipeline_topic = dlt.pipeline(
    pipeline_name="csv_load",
    destination=dlt.destinations.duckdb("example.duckdb"),
    dataset_name="mydata",
)

load_info = pipeline_topic.run(
    filesystem_pipe_topic
)  # the hints can be passed here as well

In [8]:
# print(load_info)

In [9]:
# print(pipeline_topic.default_schema.to_pretty_yaml())

In [10]:
db = duckdb.connect(database="example.duckdb")

In [11]:
db.sql("DESCRIBE;")

┌──────────┬───────────────────────┬─────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
│ database │        schema         │        name         │                                           column_names                                           │                                      column_types                                       │ temporary │
│ varchar  │        varchar        │       varchar       │                                            varchar[]                                             │                                        varchar[]                                        │  boolean  │
├──────────┼───────────────────────┼─────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────────────────────────────────────────────────────

In [12]:
# db.sql("SELECT id, COUNT(*) FROM mydata.normal GROUP BY id;")

In [13]:
db.close()

If you want to use Pandas, the data can be accessed via [`ReadableDataset`](https://dlthub.com/docs/general-usage/dataset-access/dataset).

In addition to transforming data using `duckdb` explicitly, you can use [the `dlt` SQL client](https://dlthub.com/docs/dlt-ecosystem/transformations/sql) as well.

In [14]:
# with pipeline_topic.sql_client() as p:
#     ans = p.execute_sql(
#         "SELECT category, COUNT(*) FROM mydata.normal GROUP BY category;"
#     )

# ans

## Join Transformation

In [15]:
filesystem_resource_task = filesystem(
    bucket_url="file:data/joinable", file_glob="*.csv"
)

filesystem_resource_task.add_filter(lambda item: item["file_name"] != "j03.csv")

filesystem_pipe_task = filesystem_resource_task | read_csv_duckdb()

filesystem_pipe_task.apply_hints(write_disposition="replace", table_name="join")

pipeline_task = dlt.pipeline(
    pipeline_name="csv_load_join",
    destination=dlt.destinations.duckdb("example.duckdb"),
    dataset_name="mydata",
    dev_mode=True,
)

load_info_task = pipeline_task.run(
    filesystem_pipe_task
)  # the hints can be passed here as well

In [16]:
# with pipeline_task.sql_client() as p:
#     ans = p.execute_sql(
#         "SELECT n.id, value, category, assigned_to, status FROM mydata.normal AS n JOIN mydata.join AS j ON n.id=j.id;"
#     )

# print(ans)

## Schema

In addition to creating a Pydantic schema, you can [adjust a schema](https://dlthub.com/docs/walkthroughs/adjust-a-schema) based on the autogenerated one.

To be specific, [schema](https://dlthub.com/docs/general-usage/schema) "describes the structure of normalized data (e.g., tables, columns, data types, etc.) and provides instructions on how the data should be processed and loaded." And normalized means that `dlt` will change the structure of the input data, such as feature names, data types, etc., to load it into the destination.

In [17]:
pipeline_topic_schema = dlt.pipeline(
    import_schema_path="schemas/import",  # path to the schema file (imported schema)
    export_schema_path="schemas/export",  # path to the schema file (exported schema)
    pipeline_name="csv_load_schema",
    destination=dlt.destinations.duckdb("example_schema.duckdb"),
    dataset_name="mydata_schema",
    dev_mode=True,
)

In [18]:
pipeline_topic_schema.run(filesystem_pipe_topic)

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x110020d40>, metrics={'1737363945.662146': [{'started_at': DateTime(2025, 1, 20, 9, 5, 45, 776708, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2025, 1, 20, 9, 5, 45, 865676, tzinfo=Timezone('UTC')), 'job_metrics': {'_dlt_pipeline_state.287409741f.insert_values': LoadJobMetrics(job_id='_dlt_pipeline_state.287409741f.insert_values', file_path='/Users/alex/.dlt/pipelines/csv_load_schema/load/normalized/1737363945.662146/started_jobs/_dlt_pipeline_state.287409741f.0.insert_values', table_name='_dlt_pipeline_state', started_at=DateTime(2025, 1, 20, 9, 5, 45, 834119, tzinfo=Timezone('UTC')), finished_at=DateTime(2025, 1, 20, 9, 5, 45, 835030, tzinfo=Timezone('UTC')), state='completed', remote_url=None), 'normal.688ee9342f.insert_values': LoadJobMetrics(job_id='normal.688ee9342f.insert_values', file_path='/Users/alex/.dlt/pipelines/csv_load_schema/load/normalized/1737363945.662146/started_jobs/normal.688ee9342f.0.insert_values'

In [19]:
db = duckdb.connect(database="example_schema.duckdb")
db.sql("DESCRIBE;")

┌────────────────┬──────────────────────────────┬─────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
│    database    │            schema            │        name         │                                           column_names                                           │                                         column_types                                         │ temporary │
│    varchar     │           varchar            │       varchar       │                                            varchar[]                                             │                                          varchar[]                                           │  boolean  │
├────────────────┼──────────────────────────────┼─────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────

In [20]:
db.close()

Once the folders and schemas are created, you can edit the schema in the `import` folder to take effect.

Acceptable data types: `['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']`

Note: You should keep the import schema as simple as possible and let dlt do the rest.

In [21]:
pipeline_topic_schema.run(filesystem_pipe_topic)

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x110020d40>, metrics={'1737363945.95684': [{'started_at': DateTime(2025, 1, 20, 9, 5, 46, 29876, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2025, 1, 20, 9, 5, 46, 82863, tzinfo=Timezone('UTC')), 'job_metrics': {'normal.6e2610c9c1.insert_values': LoadJobMetrics(job_id='normal.6e2610c9c1.insert_values', file_path='/Users/alex/.dlt/pipelines/csv_load_schema/load/normalized/1737363945.95684/started_jobs/normal.6e2610c9c1.0.insert_values', table_name='normal', started_at=DateTime(2025, 1, 20, 9, 5, 46, 51939, tzinfo=Timezone('UTC')), finished_at=DateTime(2025, 1, 20, 9, 5, 46, 53279, tzinfo=Timezone('UTC')), state='completed', remote_url=None)}}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:////Users/alex/CAPE/dlt-demo/example_schema.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerp

In [22]:
db = duckdb.connect(database="example_schema.duckdb")
db.sql("DESCRIBE;")

┌────────────────┬──────────────────────────────┬─────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
│    database    │            schema            │        name         │                                           column_names                                           │                                         column_types                                         │ temporary │
│    varchar     │           varchar            │       varchar       │                                            varchar[]                                             │                                          varchar[]                                           │  boolean  │
├────────────────┼──────────────────────────────┼─────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────

In [23]:
db.close()

## Contract

[Contracts](https://dlthub.com/docs/general-usage/schema-contracts) define how a schema evolve with the future data. There are three levels: `tables`, `columns`, `data_type`, and four actions: `evolve`, `freeze`, `discard_row`, `discard_value`.

|  | `evolve` | `freeze` | `discard_row` | `discard_value` |
|---|---|---|---|---|
| `tables` | Allow to add new tables | Error, not allow to add new tables | Only add metadata, all data is discarded | Only add metadata, all data is discarded |
| `columns` | Allow to add new columns | Error, not allow to add new columns | The rows with new column(s) are discarded | The new column(s) are discarded |
| `data_type` | Use [variant column](https://dlthub.com/docs/general-usage/schema#variant-columns) | Error, not allow to use variant column | The rows with different column(s) are discarded | The value with different column(s) are discarded |

Note: Under `tables` scope, the table with same name is still acceptable. The constraint is only on the table with a different name.

Note: For coercible data type, `dlt` coerces the data type implicitly regardless of contracts. The table above shows how `dlt` deals with non-coercible data types with contracts.

In [324]:
# demo for table and column contract
# filesystem_resource_task_con = filesystem(
#     bucket_url="file:data/normal", file_glob="*.csv"
# )

# filesystem_resource_task_con.add_filter(lambda item: item["file_name"] not in ["normal3.csv", "normal4_1.csv", "normal4_2.csv"])

# filesystem_pipe_task_con = filesystem_resource_task_con | read_csv_duckdb()

# filesystem_pipe_task_con.apply_hints(write_disposition="append", table_name="join")

# pipeline_task_con = dlt.pipeline(
#     pipeline_name="csv_load_join_con",
#     destination=dlt.destinations.duckdb("example_con.duckdb"),
#     dataset_name="mydata",
# )

# load_info_task_con = pipeline_task_con.run(
#     filesystem_pipe_task_con,
#     schema_contract={"tables": "evolve", "columns": "evolve"},
# )



In [325]:
# demo for table contract
# filesystem_resource_task_con2 = filesystem(
#     bucket_url="file:data/normal", file_glob="normal3.csv"
# )

# filesystem_pipe_task_con2 = filesystem_resource_task_con2 | read_csv_duckdb()

# filesystem_pipe_task_con2.apply_hints(write_disposition="append", table_name="join")

# load_info_task_con2 = pipeline_task_con.run(
#     filesystem_pipe_task_con2, schema_contract={"tables": "freeze"}
# )

In [326]:
# demo for column contract
# filesystem_resource_task_con2 = filesystem(
#     bucket_url="file:data/normal", file_glob="normal3.csv"
# )

# filesystem_pipe_task_con2 = filesystem_resource_task_con2 | read_csv_duckdb()

# filesystem_pipe_task_con2.apply_hints(write_disposition="append", table_name="join")

# load_info_task_con2 = pipeline_task_con.run(
#     filesystem_pipe_task_con2, schema_contract={"tables": "evolve", "columns": "discard_value"}
# )

In [438]:
# demo for data type contract
# class BusinessRecord2(BaseModel):
#     """Represents a single business record in the dataset."""

#     id: Optional[int] = Field(
#         gt=0, description="Unique identifier for the record"
#     )  # to focus on data types, use Optional to allow NULL
#     value: Decimal = Field(decimal_places=2, description="Business metric value")
#     timestamp: datetime = Field(description="Timestamp of the record")
#     description: str = Field(
#         min_length=1, description="Description of the business activity"
#     )
#     category: Literal[
#         "Finance", "Sales", "Customer Service", "Marketing", "HR", "IT"
#     ] = Field(description="Business department category")


# filesystem_resource_task_con = filesystem(
#     bucket_url="file:data/normal", file_glob="*.csv"
# )

# filesystem_resource_task_con.add_filter(
#     lambda item: item["file_name"]
#     not in ["normal3.csv", "normal4_1.csv", "normal4_2.csv"]
# )

# filesystem_pipe_task_con = filesystem_resource_task_con | read_csv_duckdb()

# filesystem_pipe_task_con.apply_hints(
#     write_disposition="append", table_name="join", columns=BusinessRecord2
# )

# pipeline_task_con = dlt.pipeline(
#     pipeline_name="csv_load_join_con",
#     destination=dlt.destinations.duckdb("example_con.duckdb"),
#     dataset_name="mydata",
# )

# load_info_task_con = pipeline_task_con.run(
#     filesystem_pipe_task_con,
#     schema_contract={"tables": "evolve", "columns": "evolve", "data_type": "evolve"},
# )



In [None]:
# filesystem_resource_task_con2 = filesystem(
#     bucket_url="file:data/normal", file_glob="normal4_2.csv"
# )

# filesystem_pipe_task_con2 = filesystem_resource_task_con2 | read_csv_duckdb()

# filesystem_pipe_task_con2.apply_hints(write_disposition="append", table_name="join")

# load_info_task_con2 = pipeline_task_con.run(
#     filesystem_pipe_task_con2,
#     schema_contract={
#         "tables": "evolve",
#         "columns": "evolve",
#         "data_type": "discard_value",
#     },
# )

In [440]:
db = duckdb.connect(database="example_con.duckdb")
db.sql("DESCRIBE;")

┌─────────────┬─────────┬─────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
│  database   │ schema  │        name         │                                           column_names                                           │                                      column_types                                       │ temporary │
│   varchar   │ varchar │       varchar       │                                            varchar[]                                             │                                        varchar[]                                        │  boolean  │
├─────────────┼─────────┼─────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────┼─────────────────────────────────────────────────────────────────────────────────────────┼───────────┤
│ ex

In [441]:
db.sql("SELECT * FROM mydata.join;")

┌───────┬───────────────┬──────────────────────────┬────────────────────────────────┬──────────────────┬───────────────────┬────────────────┐
│  id   │     value     │        timestamp         │          description           │     category     │   _dlt_load_id    │    _dlt_id     │
│ int64 │ decimal(38,9) │ timestamp with time zone │            varchar             │     varchar      │      varchar      │    varchar     │
├───────┼───────────────┼──────────────────────────┼────────────────────────────────┼──────────────────┼───────────────────┼────────────────┤
│     1 │ 157.230000000 │ 2024-01-01 17:00:00+08   │ Annual financial report review │ Finance          │ 1737445109.842436 │ UbH7MEYu9WMJlw │
│     2 │ 293.450000000 │ 2024-01-01 18:30:00+08   │ Quarterly sales analysis       │ Sales            │ 1737445109.842436 │ x8IDMupsTc/A8Q │
│     3 │ 432.180000000 │ 2024-01-01 19:45:00+08   │ Customer feedback summary      │ Customer Service │ 1737445109.842436 │ yv7SB8JAjqJAhw │
│     

In [442]:
db.close()

## Incremental Config