In [None]:
%%capture
!pip install dlt[duckdb] # Install dlt with all the necessary DuckDB dependencies
!dlt --version

# Dlt Schema evolution demo:


## Assume we have the following data - we can load it, and inspect it. Take particular note of the building column - it's loaded to duckdb as Bigint. This column will later come as a string, and we can see what happens.

In [None]:
def citizens_pipeline():
    people = [{"name": "Max", "age": 34, "Nationality": "Germany", "Street": "Müllerstr.", "Building": 131},
            {"name": "Aisha", "age": 19, "Nationality": "Ukraine", "Street": "Karl Marx Str.", "Building": 26}
    ]
    for person in people:
        yield person

In [None]:
import dlt
#make a connection to the destination
pipeline = dlt.pipeline(destination="duckdb", dataset_name="people")
# Load data with replace
info = pipeline.run(citizens_pipeline, table_name="people", write_disposition='replace')

In [None]:
import duckdb
from google.colab import data_table
data_table.enable_dataframe_formatter()

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

display(conn.sql("DESCRIBE"))

people_table = conn.sql("SELECT * FROM people").df()
display(people_table)

┌─────────────────────┬──────────────────────┬─────────────────────────────────────────────────────────────┬───────────┐
│     table_name      │     column_names     │                        column_types                         │ temporary │
│       varchar       │      varchar[]       │                          varchar[]                          │  boolean  │
├─────────────────────┼──────────────────────┼─────────────────────────────────────────────────────────────┼───────────┤
│ _dlt_loads          │ [load_id, schema_n…  │ [VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME ZONE]        │ false     │
│ _dlt_pipeline_state │ [version, engine_v…  │ [BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP WITH TIME ZO…  │ false     │
│ _dlt_version        │ [version, engine_v…  │ [BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VARCHAR, VARCH…  │ false     │
│ people              │ [name, age, nation…  │ [VARCHAR, BIGINT, VARCHAR, VARCHAR, BIGINT, VARCHAR, VARC…  │ false     │
└─────────────────────┴─────────

Unnamed: 0,name,age,nationality,street,building,_dlt_load_id,_dlt_id
0,Max,34,Germany,Müllerstr.,131,1684260729.81942,kY1EJLY8gxRKrQ
1,Aisha,19,Ukraine,Karl Marx Str.,26,1684260729.81942,egGe6fl4ld2PSA


## Something changed. The data now contains a building nr which is a string as opposed to the number before, and a new vaccinations column. Let's load the data and see the schema evolve

In [None]:
def citizens_pipeline():
    people = [{"name": "Rahul", "age": 28, "Nationality": "India", "Street": "Brunnenstr.", "Building": "67A", "Vaccinations": 2},
    ]
    for person in people:
        yield person

In [None]:
import dlt
#make a connection to the destination
pipeline = dlt.pipeline(destination="duckdb", dataset_name="people")
# Load data with replace
info = pipeline.run(citizens_pipeline, table_name="people", write_disposition='append')

## As you can now see, the schemas were merged: The vaccinations column was added, and the text variant of the building column was created as `building__v__text`

In [None]:
import duckdb
from google.colab import data_table
data_table.enable_dataframe_formatter()

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

display(conn.sql("DESCRIBE"))

people_table = conn.sql("SELECT * FROM people").df()
display(people_table)

┌─────────────────────┬──────────────────────┬─────────────────────────────────────────────────────────────┬───────────┐
│     table_name      │     column_names     │                        column_types                         │ temporary │
│       varchar       │      varchar[]       │                          varchar[]                          │  boolean  │
├─────────────────────┼──────────────────────┼─────────────────────────────────────────────────────────────┼───────────┤
│ _dlt_loads          │ [load_id, schema_n…  │ [VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME ZONE]        │ false     │
│ _dlt_pipeline_state │ [version, engine_v…  │ [BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP WITH TIME ZO…  │ false     │
│ _dlt_version        │ [version, engine_v…  │ [BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VARCHAR, VARCH…  │ false     │
│ people              │ [name, age, nation…  │ [VARCHAR, BIGINT, VARCHAR, VARCHAR, BIGINT, VARCHAR, VARC…  │ false     │
└─────────────────────┴─────────

Unnamed: 0,name,age,nationality,street,building,_dlt_load_id,_dlt_id,building__v_text,vaccinations
0,Max,34,Germany,Müllerstr.,131.0,1684260729.81942,kY1EJLY8gxRKrQ,,
1,Aisha,19,Ukraine,Karl Marx Str.,26.0,1684260729.81942,egGe6fl4ld2PSA,,
2,Rahul,28,India,Brunnenstr.,,1684260774.626208,qZQXsHUZ+k6zng,67A,2.0


## Finally, any change should be notified to the teams that produce and curate or consume the data. We can read the load outcome and send it to a slack webhook with dlt.

In [None]:
from dlt.common.runtime.slack import send_slack_message

In [None]:
hook = "https://hooks.slack.com/services/xxx/xxx/xxx"

In [None]:
for package in info.load_packages:
  for table_name, table in package.schema_update.items():
    for column_name, column in table["columns"].items():
      send_slack_message(hook, message=f"\tTable updated: {table_name}: Column changed: {column_name}: {column['data_type']}")