In [1]:
!pip install dlt[duckdb]

Collecting dlt[duckdb]
  Downloading dlt-0.4.2-py3-none-any.whl.metadata (9.5 kB)
Collecting astunparse>=1.6.3 (from dlt[duckdb])
  Downloading astunparse-1.6.3-py2.py3-none-any.whl (12 kB)
Collecting duckdb<0.10.0,>=0.6.1 (from dlt[duckdb])
  Downloading duckdb-0.9.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (760 bytes)
Collecting gitpython>=3.1.29 (from dlt[duckdb])
  Downloading GitPython-3.1.41-py3-none-any.whl.metadata (14 kB)
Collecting giturlparse>=0.10.0 (from dlt[duckdb])
  Downloading giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from dlt[duckdb])
  Downloading hexbytes-1.0.0-py3-none-any.whl.metadata (5.3 kB)
Collecting humanize>=4.4.0 (from dlt[duckdb])
  Downloading humanize-4.9.0-py3-none-any.whl.metadata (7.9 kB)
Collecting jsonpath-ng>=1.5.3 (from dlt[duckdb])
  Downloading jsonpath_ng-1.6.1-py3-none-any.whl.metadata (18 kB)
Collecting makefun>=1.15.0 (from dlt[duckdb])
  Downloading makefun-1.15.2-py2.py

## 1. Use a generator

In [1]:
def square_root_generator(limit):
    n = 1
    while n <= limit:
        yield n ** 0.5
        n += 1

### Question 1: What is the sum of the outputs of the generator for limit = 5?

In [3]:
generator = square_root_generator(5)
all_output = list(square_root_generator(5))
print(sum(all_output))

8.382332347441762


### Question 2: What is the 13th number yielded by the generator?

In [4]:
generator_list = list(square_root_generator(13))
print(generator_list[-1])

3.605551275463989


## 2. Append a generator to a table with existing data

In [1]:
def people_1():
    for i in range(1, 6):
        yield {"ID": i, "Name": f"Person_{i}", "Age": 25 + i, "City": "City_A"}

for person in people_1():
    print(person)


def people_2():
    for i in range(3, 9):
        yield {"ID": i, "Name": f"Person_{i}", "Age": 30 + i, "City": "City_B", "Occupation": f"Job_{i}"}


# for person in people_2():
#     print(person)

{'ID': 1, 'Name': 'Person_1', 'Age': 26, 'City': 'City_A'}
{'ID': 2, 'Name': 'Person_2', 'Age': 27, 'City': 'City_A'}
{'ID': 3, 'Name': 'Person_3', 'Age': 28, 'City': 'City_A'}
{'ID': 4, 'Name': 'Person_4', 'Age': 29, 'City': 'City_A'}
{'ID': 5, 'Name': 'Person_5', 'Age': 30, 'City': 'City_A'}


### Question 3: Append the 2 generators. After correctly appending the data, calculate the sum of all ages of people.

In [2]:
def append_generators(gen_1, gen_2):
    for person in gen_1:
        yield person
    for person in gen_2:
        yield person
        
sum_of_ages = 0
for person in append_generators(people_1(), people_2()):
    print(person)
    sum_of_ages += person['Age']

print(f'Sum of all ages: {sum_of_ages}')

{'ID': 1, 'Name': 'Person_1', 'Age': 26, 'City': 'City_A'}
{'ID': 2, 'Name': 'Person_2', 'Age': 27, 'City': 'City_A'}
{'ID': 3, 'Name': 'Person_3', 'Age': 28, 'City': 'City_A'}
{'ID': 4, 'Name': 'Person_4', 'Age': 29, 'City': 'City_A'}
{'ID': 5, 'Name': 'Person_5', 'Age': 30, 'City': 'City_A'}
{'ID': 3, 'Name': 'Person_3', 'Age': 33, 'City': 'City_B', 'Occupation': 'Job_3'}
{'ID': 4, 'Name': 'Person_4', 'Age': 34, 'City': 'City_B', 'Occupation': 'Job_4'}
{'ID': 5, 'Name': 'Person_5', 'Age': 35, 'City': 'City_B', 'Occupation': 'Job_5'}
{'ID': 6, 'Name': 'Person_6', 'Age': 36, 'City': 'City_B', 'Occupation': 'Job_6'}
{'ID': 7, 'Name': 'Person_7', 'Age': 37, 'City': 'City_B', 'Occupation': 'Job_7'}
{'ID': 8, 'Name': 'Person_8', 'Age': 38, 'City': 'City_B', 'Occupation': 'Job_8'}
Sum of all ages: 353


## 3. Merge a generator

### Question 4: Merge the 2 generators using the ID column. Calculate the sum of ages of all the people loaded as described above.

In [3]:
import dlt
import duckdb

# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
pipeline = dlt.pipeline(destination='duckdb', dataset_name='people')

# run with merge write disposition.
# This is so scaffolding is created for the next example,
# where we look at merging data

info = pipeline.run(people_1(),
                    table_name="person_detail",
                    write_disposition="merge",
                    primary_key="id"
                   )

print(info)



Pipeline dlt_ipykernel_launcher load step completed in 0.41 seconds
1 load package(s) were loaded to destination duckdb and into dataset people
The duckdb destination used duckdb:////home/faisal/my_projects/data-engineering-zoomcamp/week_03_dlt_workshop/dlt_ipykernel_launcher.duckdb location to store data
Load package 1707266184.2675464 is LOADED and contains no failed jobs


In [4]:
# show the outcome

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))

print("\n\n\n person_detail table below: Note the times are properly typed")
person_detail = conn.sql("SELECT * FROM person_detail").df()
display(person_detail)

Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ person_detail       │
└─────────────────────┘




 person_detail table below: Note the times are properly typed


Unnamed: 0,id,name,age,city,_dlt_load_id,_dlt_id,occupation
0,5,Person_5,30,City_A,1707266184.2675464,W9ND2/Ernojqhw,
1,4,Person_4,29,City_A,1707266184.2675464,ZQ3AETPU2ZeYIw,
2,2,Person_2,27,City_A,1707266184.2675464,a7yECG2P50AU9Q,
3,3,Person_3,28,City_A,1707266184.2675464,B3ZB6GyqRsTxzw,
4,1,Person_1,26,City_A,1707266184.2675464,IzMk8qYewyE80A,


In [5]:
# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
pipeline = dlt.pipeline(destination='duckdb', dataset_name='people')

# run with merge write disposition.
# This is so scaffolding is created for the next example,
# where we look at merging data

info = pipeline.run(people_2(),
                    table_name="person_detail",
                    write_disposition="merge",
                    primary_key="id"
                   )

print(info)



Pipeline dlt_ipykernel_launcher load step completed in 0.29 seconds
1 load package(s) were loaded to destination duckdb and into dataset people
The duckdb destination used duckdb:////home/faisal/my_projects/data-engineering-zoomcamp/week_03_dlt_workshop/dlt_ipykernel_launcher.duckdb location to store data
Load package 1707266190.1726866 is LOADED and contains no failed jobs


In [6]:
# show the outcome

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))

print("\n\n\n person_detail table below: Note the times are properly typed")
person_detail = conn.sql("SELECT * FROM person_detail").df()
display(person_detail)

Loaded tables: 


┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ person_detail       │
└─────────────────────┘




 person_detail table below: Note the times are properly typed


Unnamed: 0,id,name,age,city,_dlt_load_id,_dlt_id,occupation
0,2,Person_2,27,City_A,1707266184.2675464,a7yECG2P50AU9Q,
1,1,Person_1,26,City_A,1707266184.2675464,IzMk8qYewyE80A,
2,8,Person_8,38,City_B,1707266190.1726866,1TB2pPDM8KTtyw,Job_8
3,5,Person_5,35,City_B,1707266190.1726866,r4auqp56T1NUQg,Job_5
4,7,Person_7,37,City_B,1707266190.1726866,PDEQoMPSgFqCDw,Job_7
5,4,Person_4,34,City_B,1707266190.1726866,v2DKs48KW/9dwg,Job_4
6,3,Person_3,33,City_B,1707266190.1726866,Z/6IMlhuWhN8QA,Job_3
7,6,Person_6,36,City_B,1707266190.1726866,AyZbwrJaYf5/IQ,Job_6


In [9]:
sum_of_age_result = conn.sql("SELECT sum(age) as sum_of_age FROM person_detail").df()
display(sum_of_age_result)

Unnamed: 0,sum_of_age
0,266.0


## Done