In [2]:
import dlt
from dlt.sources.helpers import requests

In [3]:
# QUICK START

data = [{'id': 1, 'name': 'Alice'},
        {'id': 2, 'name': 'Bob'},
        {'id': 3, 'name': 'Charlie'}]

pipeline = dlt.pipeline(
    dataset_name='quick_start_data',
    pipeline_name='quick_start',
    destination='duckdb'
)

load_data = pipeline.run(data=data, table_name='users')

print(load_data)

Pipeline quick_start load step completed in 0.44 seconds
1 load package(s) were loaded to destination duckdb and into dataset quick_start_data
The duckdb destination used duckdb:////workspaces/zoomcamp-data-engineer-2024/workshop_dlt/quick_start.duckdb location to store data
Load package 1707716032.2552636 is LOADED and contains no failed jobs


### LOAD DATA FROM API
> Retrieve and load data from the GitHub API into DuckDB. Specifically, we will load issues from our dlt-hub/dlt repository.

In [4]:
# BASIC LOAD

url = "https://api.github.com/repos/dlt-hub/dlt/issues"

# Make a request and check if it was successful
response = requests.get(url)
response.raise_for_status()

pipeline = dlt.pipeline(
    pipeline_name='github_issues',
    destination='duckdb',
    dataset_name='github_issues_data'
)

load_data = pipeline.run(
    data=response.json(),
    table_name='github_issues',
    write_disposition='replace'
)

print(load_data)

Pipeline github_issues load step completed in 0.70 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_issues_data
The duckdb destination used duckdb:////workspaces/zoomcamp-data-engineer-2024/workshop_dlt/github_issues.duckdb location to store data
Load package 1707716033.6118846 is LOADED and contains no failed jobs


In [5]:
# INCREMENTAL LOAD

# We can pass a generator to the run method directly or use the @dlt.resource decorator to turn the generator into a dlt resource. The decorator allows you to specify the loading behavior and relevant resource parameters.

# Let's improve our GitHub API example and get only issues that were created since last load. Instead of using replace write disposition and downloading all issues each time the pipeline is run. We a python generator

@dlt.resource(table_name='github_issues', write_disposition='append')
def get_issues(
    created_at=dlt.sources.incremental(
        'created_at', initial_value="1970-01-01T00:00:00Z")
):
    # NOTE: we read only open issues to minimize number of calls to the API.
    # There's a limit of ~50 calls for not authenticated Github users.
    url = (
        "https://api.github.com/repos/dlt-hub/dlt/issues"
        "?per_page=100&sort=created&directions=desc&state=open"
    )

    while True:
        res = requests.get(url)
        res.raise_for_status()

        yield res.json()

        # Stop requesting pages if the last element was already
        # older than initial value
        # Note: incremental will skip those items anyway, we just
        # do not want to use the api limits
        if created_at.start_out_of_range:
            break

        # Next page
        if 'next' not in res.links:
            break

        print(f">>>>>>>>>>>>>Next page: {res.links}")

        url = res.links['next']['url']


pipeline = dlt.pipeline(
    pipeline_name='github_issues_incremental',
    destination='duckdb',
    dataset_name='github_issues_incremental_data'
)

load_data_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info

print(f'row counts => {row_counts}')
print('=============================')
print(f'load_data_info => {load_data_info}')

row counts => Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- github_issues: 3 row(s)

Load package 1707716035.0498624 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs
load_data_info => Pipeline github_issues_incremental load step completed in 0.69 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_issues_incremental_data
The duckdb destination used duckdb:////workspaces/zoomcamp-data-engineer-2024/workshop_dlt/github_issues_incremental.duckdb location to store data
Load package 1707716035.0498624 is LOADED and contains no failed jobs


In [6]:
# UPDATE AND DEDUPLICATE DATA

# The script above finds new issues and adds them to the database. It will ignore any updates to existing issue text, emoji reactions etc. To get always fresh content of all the issues you combine incremental load with merge write disposition, like in the script below.

# We add primary_key argument to the dlt.resource() that tells dlt how to identify the issues in the database to find duplicates which content it will merge.

# NOTE: we now track the updated_at field — so we filter in all issues updated since the last pipeline run (which also includes those newly created).

# Pay attention how we use since parameter from GitHub API and updated_at.last_value to tell GitHub to return issues updated only after the date we pass. updated_at.last_value holds the last updated_at value from the previous run.

@dlt.resource(
    table_name="github_issues",
    write_disposition="merge",
    primary_key="id",
)
def get_issues(
    updated_at=dlt.sources.incremental(
        "updated_at", initial_value="1970-01-01T00:00:00Z")
):
    # NOTE: we read only open issues to minimize number of calls to
    # the API. There's a limit of ~50 calls for not authenticated
    # Github users
    url = (
        "https://api.github.com/repos/dlt-hub/dlt/issues"
        f"?since={updated_at.last_value}&per_page=100&sort=updated"
        "&directions=desc&state=open"
    )

    while True:
        response = requests.get(url)
        response.raise_for_status()
        yield response.json()

        # Get next page
        if "next" not in response.links:
            break
        url = response.links["next"]["url"]


pipeline = dlt.pipeline(
    pipeline_name="github_issues_merge",
    destination="duckdb",
    dataset_name="github_issues_data_merge",
)

load_data_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info

print(f'row counts => {row_counts}')
print('=============================')
print(f'load_data_info => {load_data_info}')

row counts => Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- github_issues: 2 row(s)

Load package 1707716037.018522 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs
load_data_info => Pipeline github_issues_merge load step completed in 0.99 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_issues_data_merge
The duckdb destination used duckdb:////workspaces/zoomcamp-data-engineer-2024/workshop_dlt/github_issues_merge.duckdb location to store data
Load package 1707716037.018522 is LOADED and contains no failed jobs
