# **Install `dlt`⏳**

What is dlt?

* dlt is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets.
* You can install it using pip and there's no need to start any backends or containers. You can simply import dlt in your Python script and write a simple pipeline to load data from sources like APIs, databases, files, etc. into a destination of your choice.

Here are a few reasons why you should use dlt:

* Automated maintenance: With schema inference and evolution and alerts, and with short declarative code, maintenance becomes simple.
* Run it where Python runs: You can use dlt on Airflow, serverless functions, notebooks. It doesn't require external APIs, backends or containers, and scales on both micro and large infrastructures.
* User-friendly, declarative interface: dlt provides a user-friendly interface that removes knowledge obstacles for beginners while empowering senior professionals.

Benefits: As a data engineer, dlt offers several benefits:

* Efficient Data Extraction and Loading: dlt simplifies the process of extracting and loading data. It allows you to decorate your data-producing functions with loading or incremental extraction metadata, enabling dlt to extract and load data according to your custom logic. This is particularly useful when dealing with large datasets, as dlt supports scalability through iterators, chunking, and parallelization. Read more

* Automated Schema Management: dlt automatically infers a schema from data and loads the data to the destination. It can easily adapt and structure data as it evolves, reducing the time spent on maintenance and development. This ensures data consistency and quality. Read more
* Data Governance Support: dlt pipelines offer robust governance support through pipeline metadata utilization, schema enforcement and curation, and schema change alerts. This promotes data consistency, traceability, and control throughout the data processing lifecycle. Read more

* Flexibility and Scalability: dlt can be used on Airflow, serverless functions, notebooks, and scales on both micro and large infrastructures. It also offers several mechanisms and configuration options to scale up and fine-tune pipelines. Read more

* Post-Loading Transformations: dlt provides several options for transformations after loading the data, including using dbt, the dlt SQL client, or Pandas. This allows you to shape and manipulate the data before or after loading it, allowing you to meet specific requirements and ensure data quality and consistency. Read more



In [None]:
%%capture
!pip install dlt[duckdb] # Install dlt with all the necessary DuckDB dependencies

# Part 1: Data Extraction


## Example 1: Extracting API data with a generator

Premise:

For this example, we created a simple http api that returns json "page by page",  1000 records per page.

It accepts a parameter called `page`, representing the page number.
If we request a larger page number than there is data, we get an empty response.

To get the pages, we write a loop that asks for pages starting from 1 and increasing, until we receive an empty page.

As we do not know ahead of time how many pages have data and if they fit in memory, yielding the data so it can be handled page by page scales better than first collecting all pages in memory and then returning them.

In [None]:
import requests



BASE_API_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"

# I call this a paginated getter
# as it's a function that gets data
# and also paginates until there is no more data
# by yielding pages, we "microbatch", which speeds up downstream processing

def paginated_getter():
    page_number = 1

    while True:
        # Set the query parameters
        params = {'page': page_number}

        # Make the GET request to the API
        response = requests.get(BASE_API_URL, params=params)
        response.raise_for_status()  # Raise an HTTPError for bad responses
        page_json = response.json()
        print(f'got page number {page_number} with {len(page_json)} records')

        # if the page has no records, stop iterating
        if page_json:
            yield page_json
            page_number += 1
        else:
            # No more data, break the loop
            break


if __name__ == '__main__':
    # Use the generator to iterate over pages
    for page_data in paginated_getter():
        # Process each page as needed
        print(page_data)


## Example 2: The "bad" way to download a file

In this example we download a json lines file.

Since the download is text but we want to work with iterable data strutures for loading, we convert the contents to list of jsons.

This is a less than ideal approach because if the file size is unknown, we run the risk of running out of memory. In the case of machines that run multiple jobs, an out of memory error runs the risk of killing not just the current jobs but also anything else running on the machine at the time - a situation most data engineers **really really** like to avoid.

In [None]:
import requests
import json


def download_and_read_jsonl(url):
    response = requests.get(url)
    response.raise_for_status()  # Raise an HTTPError for bad responses
    data = response.text.splitlines()
    parsed_data = [json.loads(line) for line in data]
    return parsed_data


# time the download
import time
start = time.time()

url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"
downloaded_data = download_and_read_jsonl(url)

if downloaded_data:
    # Process or print the downloaded data as needed
    print(downloaded_data[:5])  # Print the first 5 entries as an example

# time the download
end = time.time()
print(end - start)

## Example 3: Extracting file data with a generator "the best practice way"

"The best practice way" here refers to the most scalable way to do it, but if you are confident scale will not be an issue, then the right way might be the simplest :)

In this example we download a jsonl (like json, but lines) file.
Since it's jsonl, it has lines so we can process it line by line.

We stream download it and yield the data.

If this file were json and not jsonl, we could use ijson library to break it into lines without loading to memory.



In [None]:
import requests
import json

url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"

def stream_download_jsonl(url):
    response = requests.get(url, stream=True)
    response.raise_for_status()  # Raise an HTTPError for bad responses
    for line in response.iter_lines():
        if line:
            yield json.loads(line)

# time the download
import time
start = time.time()

# Use the generator to iterate over rows with minimal memory usage
row_counter = 0
for row in stream_download_jsonl(url):
    print(row)
    row_counter += 1
    if row_counter >= 5:
        break

# time the download
end = time.time()
print(end - start)

### Loading the generator (any of the above)

We have 3 ways to download the same data. Let's use the fast and reliable way to load some data and inspect it in DuckDB.

In this example, we are using `dlt` library to do the loading, which will process data from the generators incrementally, following the same memory management paradigm.

We will discuss more details about `dlt` or "data load tool" later.

In [None]:
import dlt

# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
generators_pipeline = dlt.pipeline(destination='duckdb', dataset_name='generators')


# we can load any generator to a table at the pipeline destnation as follows:
info = generators_pipeline.run(paginated_getter(),
										table_name="http_download",
										write_disposition="replace")

# the outcome metadata is returned by the load and we can inspect it by printing it.
print(info)

# we can load the next generator to the same or to a different table.
info = generators_pipeline.run(stream_download_jsonl(url),
										table_name="stream_download",
										write_disposition="replace")

print(info)


In [None]:
# show outcome

import duckdb

conn = duckdb.connect(f"{generators_pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{generators_pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))

# and the data

print("\n\n\n http_download table below:")

rides = conn.sql("SELECT * FROM http_download").df()
display(rides)

print("\n\n\n stream_download table below:")

passengers = conn.sql("SELECT * FROM stream_download").df()
display(passengers)

# As you can see, the same data was loaded in both cases.