In [None]:
pip install dlt[duckdb]

In [6]:
import dlt

In [7]:
def load_first_people():
    for i in range(1, 6):
        yield {"ID": i, "Name": f"Person_{i}", "Age": 25 + i, "City": "City_A"}

for person in load_first_people():
    print(person)

{'ID': 1, 'Name': 'Person_1', 'Age': 26, 'City': 'City_A'}
{'ID': 2, 'Name': 'Person_2', 'Age': 27, 'City': 'City_A'}
{'ID': 3, 'Name': 'Person_3', 'Age': 28, 'City': 'City_A'}
{'ID': 4, 'Name': 'Person_4', 'Age': 29, 'City': 'City_A'}
{'ID': 5, 'Name': 'Person_5', 'Age': 30, 'City': 'City_A'}


In [8]:
#create pipeline
pipeline = dlt.pipeline(pipeline_name="workshop_dlt",
						destination='duckdb', 
						dataset_name='people_data')

In [10]:
# run the pipeline with default settings, and capture the outcome
info = pipeline.run(load_first_people(), 
                    table_name="people", 
                    write_disposition="replace")

In [11]:
import duckdb

#connect to duckdb
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))

Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ people              │
└─────────────────────┘

In [12]:
#sum of ages of all people
sum_ages = conn.sql("SELECT SUM(Age) AS sum_of_ages FROM people").df()
display(sum_ages)

Unnamed: 0,sum_of_ages
0,140.0


In [13]:
def load_second_people():
    for i in range(3, 9):
        yield {"ID": i, "Name": f"Person_{i}", "Age": 30 + i, "City": "City_B", "Occupation": f"Job_{i}"}

for person in load_second_people():
    print(person)

{'ID': 3, 'Name': 'Person_3', 'Age': 33, 'City': 'City_B', 'Occupation': 'Job_3'}
{'ID': 4, 'Name': 'Person_4', 'Age': 34, 'City': 'City_B', 'Occupation': 'Job_4'}
{'ID': 5, 'Name': 'Person_5', 'Age': 35, 'City': 'City_B', 'Occupation': 'Job_5'}
{'ID': 6, 'Name': 'Person_6', 'Age': 36, 'City': 'City_B', 'Occupation': 'Job_6'}
{'ID': 7, 'Name': 'Person_7', 'Age': 37, 'City': 'City_B', 'Occupation': 'Job_7'}
{'ID': 8, 'Name': 'Person_8', 'Age': 38, 'City': 'City_B', 'Occupation': 'Job_8'}


In [14]:
#append data
info = pipeline.run(load_second_people(), 
                    table_name="people", 
                    write_disposition="append")

In [15]:
#sum of ages of all people after append
sum_ages = conn.sql("SELECT SUM(Age) AS sum_of_ages FROM people").df()
display(sum_ages)

Unnamed: 0,sum_of_ages
0,353.0


In [16]:
#same table with primary key
info = pipeline.run(load_first_people(), 
                    table_name="people_with_primarykey", 
                    write_disposition="merge",
                    primary_key="ID")

In [None]:
#same table with primary key
info = pipeline.run(load_second_people(), 
                    table_name="people_with_primarykey", 
                    write_disposition="merge",
                    primary_key="ID")

In [21]:
#total records
total_records = conn.sql("SELECT COUNT(*) AS TOTAL_RECORDS FROM people_with_primarykey").df()
display(total_records)

Unnamed: 0,TOTAL_RECORDS
0,8


In [22]:
#Age of person with ID = 3
age = conn.sql("SELECT Age AS Age FROM people_with_primarykey WHERE ID = 3").df()
display(age)

Unnamed: 0,Age
0,33


In [23]:
#sum of ages of all people
sum_ages = conn.sql("SELECT SUM(Age) AS sum_of_ages FROM people_with_primarykey").df()
display(sum_ages)

Unnamed: 0,sum_of_ages
0,266.0
