# Part One

## Data Engineering with Python & AI:
### Data Loading 

1. Extracting Data from APIs & Handling API Challenges - Work with REST APIs, authentication, rate limits, retries, and pagination to extract data efficiently.
2. Schema Management & Automatic Normalization - Use dtl to infer schemas, flatten nested JSON, extract lists into child tables, and handle schema evolution automatically.
3. Incremental Data Extraction & State Tracking - Load only new or modified records, avoiding unnecessary reprocessing and improving pipeline efficiency.
4. Loading Data Into Various Destinations - Store data in DuckDB or Postgres, BigQuery, Snowflake, or a Data Lake while ensuring efficient schema mapping and performance.
5. Automating and Orchestrating Pipelines - Deploy, schedule, and maintain ingestion workflows with Dagster, Github Actions, and Cron Jobs.
6. Scaling Data Pipelines Efficiently - Handle large-scale data ingestion while optimizing performance, retries, and parallel execution.

After this exercise, you won't just know ingestion - you'll be able to build an API ingestion pipeline that `auto-detects schema changes`, `retries intelligently`, and `scales with demand`. (resilient, scalable, efficient and reliable pipeline)

# Data Ingestion
We'll extract data from a source. It often includes normalizing, cleaning, and adding metadata.

## Extracting Data: 
Data Streaming and Batching
* Batching - Processing data in chunks at scheduled intervals. It's suitable for scheduled tasks and reduces system load.
* Streaming - Processing data continuously as it arrives.It's ideal for real-time data processing and inmmediate insights.

Choosing the right approach depends on factors like `data volume`, `latency requirements` and `system architecture`.


1. Batch processing
Batch processing is best when you can wait for data to accumulate before processing it in large chunks. It is `cost-efficient` and works well for `non-time-sensitive` workloads.
- Common use cases
    * Nightly database updates.
    * Generating daily or weekly reports.

2. Streaming data processing
Streaming is useful when you need to `process data in real-time` or `with minimal delay`. Instead of waiting for a batch, events are processed continuously.
- Common use cases
    * Fraud detection (e.g. analyzing transactions in real-time)
    * IoT devide monitoring (e.g. temperature sensors)
    * Event-driven applications (e.g. user activity tracking)
    * Log and telemetry data ingestion

3. When to use Batch vs Streaming

|Factor      |        Batch processing      |              Streaming processing        |
|------------|------------------------------|------------------------------------------|
|Latency     |   High (minutes, hours)      |   Low (milliseconds, seconds)            |
|Data volume |   Large batches              |   Continuous small events                |
|Use case    |   Reports, ETL, backups      |   Real-time analytics, event-driven apps |
|Complexity  |   Easier to manage           |   Requires event-driven architecture     |
|Cost        |   Lower for periodic runs    |   Higher for always-on processing        |

4. Tools
Many tools support both `batch` and `streaming` data extraction. Some tools are optimized for one approach, while others provide flexibility for both.

`Message queues & Event streaming`
These tools enable real-time data ingestion and processing but can also buffer data for mini-batch processing.

  * `Apache Kafka` - Distributed event streaming platformfor real-time and batch        workloads.
  * `RabbitMQ` - Message broker that supports real-time message passing.
  * `AWS Kinesis` - Cloud-native alternative to Kafka for real-time ingestion.
  * `Google Pub/Sub` - Managed messaging service for real-time and batch workloads.

`ETL & ELT Pipelines`
These tools handle extraction, transformation and loading (ETL) for both batch and streaming pipelines.
  
  * `Apache Spark` - Supports batch processing and structured streaming.
  * `dbt (Data Build Tool)` - Focuses on batch transformations but can be used with streaming inputs.
  * `Flink` - Real-time stream processing but can also handle mini-batch workloads.
  * `NiFi` - A data flow tool for moving and transforming data in real time or batch.
  * `AWS Glue` - Serverless ETL service for batch workloads, with limited streaming support.
  * `Google Cloud Dataflow` - Managed ETL platform supporting both batch and streaming.
  * `dlt` - Automates API extraction, incremental ingestion, and schema evolution for both batch and streaming pipelines. 

### Working with RestAPI

#### APIs as a data source: Batch vs. Streaming approaches

APIs are a major source of data ingestion. Depending on how APIs provide data, they can be used in both `batch` and `streaming` workflows.

1. **APIs for batch extraction**

Some APIs return large datasets at once. This data is often fetched on a schedule or as part of an ETL process.

**Common batch API examples:**

   - **CRM APIs (Salesforce, HubSpot)** - Export customer data daily.
   - **E-commerce APIs (Shopify, Amazon)** - Download product catalogs or sales reports periodically.
   - **Public APIs (Weather, Financial Data)** - Retrieve daily stock market updates.

**How batch API extraction works:**
1. Call an API at **scheduled intervals** (e.g. every hour or day).
2. Retrieve all available data (e.g. last 24 hours of records).
3. Store results in a database, data warehouse, or file storage.

In [3]:
#%pip install requests

In [4]:
'''
import requests
import json

def fetch_batch_data():
    url = "https://api.example.com/daily_reports"
    response = requests.get(url) 
    data = response.json()

    with open("daily_report.json", "w") as file:
        json.dump(data, file)


fetch_batch_data()

'''

'\nimport requests\nimport json\n\ndef fetch_batch_data():\n    url = "https://api.example.com/daily_reports"\n    response = requests.get(url) \n    data = response.json()\n\n    with open("daily_report.json", "w") as file:\n        json.dump(data, file)\n\n\nfetch_batch_data()\n\n'

In [5]:
import requests


url = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomCamp/events"

response = requests.get(url)
response.json()

[{'id': '5579109808',
  'type': 'WatchEvent',
  'actor': {'id': 232700169,
   'login': 'reneej6573',
   'display_login': 'reneej6573',
   'gravatar_id': '',
   'url': 'https://api.github.com/users/reneej6573',
   'avatar_url': 'https://avatars.githubusercontent.com/u/232700169?'},
  'repo': {'id': 419661684,
   'name': 'DataTalksClub/data-engineering-zoomcamp',
   'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'},
  'payload': {'action': 'started'},
  'public': True,
  'created_at': '2026-01-02T15:43:01Z',
  'org': {'id': 72699292,
   'login': 'DataTalksClub',
   'gravatar_id': '',
   'url': 'https://api.github.com/orgs/DataTalksClub',
   'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}},
 {'id': '5578936787',
  'type': 'WatchEvent',
  'actor': {'id': 230978005,
   'login': 'eveliinavesala',
   'display_login': 'eveliinavesala',
   'gravatar_id': '',
   'url': 'https://api.github.com/users/eveliinavesala',
   'avatar_url': 'https://avatars

2. **APIs for streaming data extraction**

Some APIs support **event-driven** data extraction, where updates are pushed in real-time. This method is used for systems that require immediate action on new data.

**Common streaming API examples:**

   - **Webhooks (Stripe, Github, Slack)** - Real-time event notifications.
   - **Social Media APIs (Twitter Streaming, Reddit Firehose)** - Continuous data from user interactions.
   - **Financial Market APIs (Binance WebSocket, AlphaVantage Streaming)** - Live stock prices and cryptocurrency trades.

**How streaming API extraction works:**

1. API sends **real-time updates** as data changes.
2. A webhook or WebSocket **listens for events**.
3. Data is **processed immediately** instead of being stored in bulk.

In [6]:
#%pip install websocket

In [7]:
'''
import websocket

def on_message(ws, message):
    print("Recieved event:", message)

ws = websocket.WebSocketApp("wss://api.example.com/stream", on_message=on_message) # We do not request we subscribe.
ws.run_forever()
'''

'\nimport websocket\n\ndef on_message(ws, message):\n    print("Recieved event:", message)\n\nws = websocket.WebSocketApp("wss://api.example.com/stream", on_message=on_message) # We do not request we subscribe.\nws.run_forever()\n'

As an engineer, you will need to build pipelines that "just work".
So here's what you need to consider on extraction, to prevent the pipelines from breaking and to keept them running smoothly:

  1. **Hardware limits:** Be mindful of memory (RAM) and storage (disk space). Overloading these can crash your system.
  2. **Network reliability:** Networks can fail! Always account for retries to make your pipelines more robust.
        * Tip: Use libraries like dlt that have built-in retry mechanisms.
  3. **API rate limits:** APIs often restrict the number of requests you can make in a given time.
        * Tip: Check the API documentation to understand its limits (e.g. Zendesk, Shopify).

There are even more challenges to consider when working with APIs - such as **pagination and authentication**. Let's explore how to handle these effectively when working with **REST APIs.**

### Common Challenges: API Interaction Challenges

1. **Authentication**
  - API keys
  - OAuth Tokens
  - Basic Authentication

2. **Memory Management**
  - Limited Memory
  - Streaming

3. **Pagination**
  - Pages
  - Data Chunks

4. **Rate limits**
  - Pause requests
  - Retry-After Header

APIs:

* Monitor Rate Limits - Check API headers for remaining requests.
* Pause Requests - Delay further requests if limits are near.
* Implement Retries - Retry failed requests after a delay.

#### 1. API Rate Limit

In [8]:
import time

url = 'https://api.github.com/rate_limit'  # some webs have a special "end point" to see if we reached the limit of requests

remaining = requests.get(url).json()['rate']['remaining']

if remaining == 0:
    time.sleep(30)

remaining

59

#### 2. Authentication

Many APIs require an **API key or token** to access data securely. Without authentication, requests may be limited or denied.

**Types of Authentication in APIs:**

  - **API Keys** - A simple token included in the request header or URL.
  - **OAuth Tokens** - A more secure authentication method requiring user authorization.
  - **Basic Authentication** - Using a username and password (less common today).

Never share your API token publicly! Store it in environment variables or use a secure secrets manager.


Example:
    In this example, we'll request data from Github API.

In [9]:
url = 'https://api.github.com/user'

requests.get(url).json()

{'message': 'Requires authentication',
 'documentation_url': 'https://docs.github.com/rest',
 'status': '401'}

In [10]:
# %pip install python-dotenv

In [11]:
# If we were in google colab

'''from google.colab import userdata
API_TOKEN = userdata.get('ACCESS_TOKEN')

headers = {
            'Authorization': f'Bearer {API_TOKEN}'
}
'''
# Here in vs code

import os
from dotenv import load_dotenv, dotenv_values

# The equivalent of userdata.get in colab

load_dotenv()
API_TOKEN = os.getenv('ACCESS_TOKEN')

#print(API_TOKEN)
#print(dotenv_values('.env'))

headers = {
            'Authorization': f'Bearer {API_TOKEN}'
}

url = 'https://api.github.com/user'

requests.get(url, headers=headers).json()



{'login': 'M21x1',
 'id': 123788365,
 'node_id': 'U_kgDOB2DcTQ',
 'avatar_url': 'https://avatars.githubusercontent.com/u/123788365?v=4',
 'gravatar_id': '',
 'url': 'https://api.github.com/users/M21x1',
 'html_url': 'https://github.com/M21x1',
 'followers_url': 'https://api.github.com/users/M21x1/followers',
 'following_url': 'https://api.github.com/users/M21x1/following{/other_user}',
 'gists_url': 'https://api.github.com/users/M21x1/gists{/gist_id}',
 'starred_url': 'https://api.github.com/users/M21x1/starred{/owner}{/repo}',
 'subscriptions_url': 'https://api.github.com/users/M21x1/subscriptions',
 'organizations_url': 'https://api.github.com/users/M21x1/orgs',
 'repos_url': 'https://api.github.com/users/M21x1/repos',
 'events_url': 'https://api.github.com/users/M21x1/events{/privacy}',
 'received_events_url': 'https://api.github.com/users/M21x1/received_events',
 'type': 'User',
 'user_view_type': 'public',
 'site_admin': False,
 'name': 'Max Jeffer',
 'company': None,
 'blog': '',

#### 3. Pagination

Many APIs return data in **chunks (or pages)** rather than sending everything at once. This prevents **overloading the server** and improves performance, especially for large datasets. To retrieve **all the data**, we need to make multiple requests and keep track of pages until we reach the last one.

**API Data Retrieval Process**

Initiate Data Request >>> Check for More Data >>> Repeat Process


- **Pagination:** When there's no more data, the API returns no links to the next page.
- **Details:**

    * **Method:** GET
    * **URL:** https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events
    * **Parameters:**
        - page: Integers (page number), defaults to 1.
        - per_page: The number of results per page (max 100). Defaults to 30.

  When the response is paginated, the response headers will include a link header. If the endpoint does not support pagination. or if all results fit on a single page, the link header will be omitted.

  The link header contains URLs that you can use to fetch additional pages of results. For example, the previous, next, first, and last page of results.

This script demonstrates how to handle paginated responses by automatically requesting the next page of data until all pages are retrieved:

In [12]:
url = 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events?page=9'

response = requests.get(url)

response

<Response [200]>

In [13]:
response.headers['Link']

'<https://api.github.com/repositories/419661684/events?page=8>; rel="prev", <https://api.github.com/repositories/419661684/events?page=10>; rel="next", <https://api.github.com/repositories/419661684/events?page=10>; rel="last", <https://api.github.com/repositories/419661684/events?page=1>; rel="first"'

In [14]:
response.links # Because we don't want to parse

{'prev': {'url': 'https://api.github.com/repositories/419661684/events?page=8',
  'rel': 'prev'},
 'next': {'url': 'https://api.github.com/repositories/419661684/events?page=10',
  'rel': 'next'},
 'last': {'url': 'https://api.github.com/repositories/419661684/events?page=10',
  'rel': 'last'},
 'first': {'url': 'https://api.github.com/repositories/419661684/events?page=1',
  'rel': 'first'}}

In [15]:
response.links['next']['url'] # If there is no next, i might throw an error

'https://api.github.com/repositories/419661684/events?page=10'

In [16]:
url = 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events' # ?page=9

while True:
    response = requests.get(url)
    data = response.json()
    print(len(data))
    
    if 'next' not in response.links:
        break
    url = response.links['next']['url']

30
30
30
30
30
30
30
30
29
29


**What happens here:**
   - It starts at **page 1** and makes a **GET request** to the API.
   - It retrieves **JSON data.**
   - It looks for the **"next" page URL** in the response headers.
   - If a **next page exists**, updates BASE_URL and requests more data.
   - If there's **no next page**, stops fetching and ends the loop.


Different APIs handle pagination differently (some use offsets, cursors, page numbers, or tokens instead of links). Always check the API documentation for the correct method!

#### 4. Avoiding memory issues during extraction

To prevent your pipeline from crashing, you need to control memory usage.

**Challenges with memory**
  * Many pipelines run on systems with limited memory, like serverless functions or shared clusters.
  * If you try to load all the data into memory at once, it can crash the entire system.
  * Even disk space can become an issue if you're storing large amounts of data.


Memory Limitations : Pipelines may crash if data is loaded all at once.

Disk Space Issues : Storing large amounts of data can lead to problems.

**The solution: batch processing/ streeaming data**

**Streaming** means processing data in small chunks or events, rather than loading everything at once. This keeps memory usage low and ensures your pipeline remains efficient.

As a data engineer, you'll use streaming to transfer data between buffers, such as:

  * from APIs to local files.
  * from Webhooks to event queues.
  * from Event queues (like Kafka) to storage buckets.


In [17]:
def events_getter():
    url = 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events'

    while True:
        response = requests.get(url)
        data = response.json()
        yield data
        
        if 'next' not in response.links:
            break
        url = response.links['next']['url']

In [18]:
events_pages = events_getter()

for events_page in events_pages:
    print(events_page)

[{'id': '5579109808', 'type': 'WatchEvent', 'actor': {'id': 232700169, 'login': 'reneej6573', 'display_login': 'reneej6573', 'gravatar_id': '', 'url': 'https://api.github.com/users/reneej6573', 'avatar_url': 'https://avatars.githubusercontent.com/u/232700169?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2026-01-02T15:43:01Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}, {'id': '5578936787', 'type': 'WatchEvent', 'actor': {'id': 230978005, 'login': 'eveliinavesala', 'display_login': 'eveliinavesala', 'gravatar_id': '', 'url': 'https://api.github.com/users/eveliinavesala', 'avatar_url': 'https://avatars.githubusercontent.com/u/230978005?'}, 'repo': {'id': 419661684, 'n

The snipped below demonstrates streaming (chunked) data processing, where data is processed in small batches instead of loading everything into memory at once. This method helps optimize performance and reduce memory usage.

In this approach to grabbing data from APIs, there are both pros and cons:

‚úÖPros: **Easy memory management** since the API returns data in small pages or events.

‚ùåCons: **Low throughput** because data transfer is limited by API constraints (rate limits, response time).

To simplify data extraction, use specialized tools that follow best practices like streaming - for example, dlt (data load tool). It efficiently processes data while **keeping memory usage low** and **leveraging parallelism** for better performance.

Well, you've successfully **extracted** tha data - great! But raw data isn't always ready to use. Now, you need to **process**, **clean** and **structure** it before it can be loaded into a data lake or data warehouse.

## Normalizing data

You often hear that data professionals spend most of their time **"cleaning" data** - but what does that actually mean?

Data cleaning typically involves two key steps:

 1. **Normalizing data** - Structuring and standardizing data **without changing its meaning.**
 2. **Filtering data for a specific use case** - Selecting or modifying data **in a way that changes its meaning** to fit the analysis.


 ### Data cleaning: more than just fixing errors

 A big part of **data cleaning** is actually **metadata work** - ensuring data is structured and standardized so it can be used effectively.

 **Metadata tasks in data cleaning:**

‚úÖ **Add types** - Convert strings to numbers, timestamps, etc.

‚úÖ **Rename columns** - Ensure names follow a standard format (e.g. no special characters).

‚úÖ **Flatten nested dictionaries** - Bring values from nested dictionaries into the top-level row. Simplify data structure.

‚úÖ **Unnest lists/arrays** - Convert lists into **child tables** since they can't be stored directly in a flat format.

**We'll look at a practical example next, as these concepts are easier to understand with real data**


**Why prepare data? Why not use JSON directly?**

While JSON is great format for **data transfer**, it's not ideal for analysis.

- **No enforced schema** - We don't always know what fields exist in a JSON document.
- **Inconsistent data types** - A field like age might appear as 25, "twenty five", or 25.00, which can break downstream applications. 
- **Hard to process** - if we need to group data by day, we must manually convert date strings to timestamps.
- **Memory-heavy** - JSON requires reading the entire file into memory, unlike databases or columnar formats that allow scanning just the necessary fields.
- **Slow for aggregation and search** - JSON is not optimized for quick lookups or aggregations like columnar formats (e.g. Parquet).

JSON is great for **data exchange** but **not for direct analytical use**. To make data useful, we need to **normalize it** - flattening, typing, and structuring it for efficiency.

#### Normalization Example

To understand what we're working with, let's look at a sample record from the API:

In [19]:
event = events_page[0]
event

{'id': '5450978582',
 'type': 'ForkEvent',
 'actor': {'id': 247011054,
  'login': 'patelvipulkumar',
  'display_login': 'patelvipulkumar',
  'gravatar_id': '',
  'url': 'https://api.github.com/users/patelvipulkumar',
  'avatar_url': 'https://avatars.githubusercontent.com/u/247011054?'},
 'repo': {'id': 419661684,
  'name': 'DataTalksClub/data-engineering-zoomcamp',
  'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'},
 'payload': {'action': 'forked',
  'forkee': {'id': 1121221248,
   'node_id': 'R_kgDOQtR6gA',
   'name': 'data-engineering-zoomcamp',
   'full_name': 'patelvipulkumar/data-engineering-zoomcamp',
   'private': False,
   'owner': {'login': 'patelvipulkumar',
    'id': 247011054,
    'node_id': 'U_kgDODrkW7g',
    'avatar_url': 'https://avatars.githubusercontent.com/u/247011054?v=4',
    'gravatar_id': '',
    'url': 'https://api.github.com/users/patelvipulkumar',
    'html_url': 'https://github.com/patelvipulkumar',
    'followers_url': 'https://

The data we retrieved from the API has **a nested JSON format.** Let's unnest it!

 It means that any **nested structures** (like dictionaries and lists) have to be flattened, to make it easier to store and query in a database or a dataframe.

 #### How to process this data?

 1. **Flatten nested fields:**

 - For example, fields actor, repo, payload and org are nested and we should extract all the necessary data:

 'actor': {

    'id': 198386041,

    'login': 'Anqi0607',

    ...
 }

 to

 'actor__id': 198386041,

 'actor__login': 'Anqi0607',
 
 ...

In [20]:
event

{'id': '5450978582',
 'type': 'ForkEvent',
 'actor': {'id': 247011054,
  'login': 'patelvipulkumar',
  'display_login': 'patelvipulkumar',
  'gravatar_id': '',
  'url': 'https://api.github.com/users/patelvipulkumar',
  'avatar_url': 'https://avatars.githubusercontent.com/u/247011054?'},
 'repo': {'id': 419661684,
  'name': 'DataTalksClub/data-engineering-zoomcamp',
  'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'},
 'payload': {'action': 'forked',
  'forkee': {'id': 1121221248,
   'node_id': 'R_kgDOQtR6gA',
   'name': 'data-engineering-zoomcamp',
   'full_name': 'patelvipulkumar/data-engineering-zoomcamp',
   'private': False,
   'owner': {'login': 'patelvipulkumar',
    'id': 247011054,
    'node_id': 'U_kgDODrkW7g',
    'avatar_url': 'https://avatars.githubusercontent.com/u/247011054?v=4',
    'gravatar_id': '',
    'url': 'https://api.github.com/users/patelvipulkumar',
    'html_url': 'https://github.com/patelvipulkumar',
    'followers_url': 'https://

In [21]:
def process_event(event):
    result = {}

    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']
    result['created_at'] = event['created_at'] 

    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    return result

In [22]:
# process_event(event)

processed_events = []

for event in events_page:
    processed_event = process_event(event)
    processed_events.append(processed_event)

processed_events

[{'id': '5450978582',
  'type': 'ForkEvent',
  'public': True,
  'created_at': '2025-12-22T16:21:02Z',
  'actor__id': 247011054,
  'actor__login': 'patelvipulkumar'},
 {'id': '5449341020',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-12-22T15:05:01Z',
  'actor__id': 103828365,
  'actor__login': 'Jackie00765'},
 {'id': '5449199049',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-12-22T14:58:01Z',
  'actor__id': 65438190,
  'actor__login': 'aungkyawmoenaing'},
 {'id': '5449009875',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-12-22T14:47:52Z',
  'actor__id': 124211281,
  'actor__login': 'Mennah-Elsheikh'},
 {'id': '5448262705',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-12-22T14:11:59Z',
  'actor__id': 135984277,
  'actor__login': 'plijtmaer'},
 {'id': '5447425684',
  'type': 'ForkEvent',
  'public': True,
  'created_at': '2025-12-22T13:26:27Z',
  'actor__id': 140600757,
  'actor__login': 'Shaflovescoffee19'},
 {

2. **Convert timestamps:**

- Originally, timestamps might have been stored as ISO datetime strings:

"created_at": "2024-06-12T14:28:46Z"

You can store them as they are, but in some cases, you may need to **convert** them to timestamps.

- Now, they are **formatted as Unix timestamp:**

"created_at": "1718202526"

In [23]:
from datetime import datetime
# datetime.fromisoformat timestamp()

def process_event(event):
    result = {}

    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']
    
    
    parsed_timestamp = datetime.fromisoformat(event['created_at'])
    result['created_at'] = parsed_timestamp.timestamp()
    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    return result

process_event(event)

{'id': '5430563520',
 'type': 'WatchEvent',
 'public': True,
 'created_at': 1766304828.0,
 'actor__id': 205358434,
 'actor__login': 'trucoshan'}

In [24]:
all_data = []

pages = events_getter()

for page in pages:
    all_data.extend(page)

len(all_data)

298

3. **Unnest lists:**

- The original structure might include a nested list:

'is_template': False,

'web_commit_signoff_required': False,

'topics': ['data-engineering', 'dbt', 'docker', 'kafka', 'kestra', 'spark'],

'visibility': 'public',

'forks': 6093,

* Since lists **cannot be stored directly in a database table**, they were likely **moved to a separate table**. 

In [25]:
def process_event(event):
    result = {}

    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']

    parsed_timestamp = datetime.fromisoformat(event['created_at'])
    result['created_at'] = parsed_timestamp.timestamp()

    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    topics = event.get('payload', {}).get('pull_request', {}).get('base', {}).get('repo', {}).get('topics', [])

    processed_topics = []
    for topic in topics:
        processed_topics = {
            'event_id': event['id'],
            'topic': topic
        }
        processed_topics.append(processed_topics)

    return result, processed_topics

In [26]:
processed_events = []
processed_topics = []

for event in all_data:
    processed_event, topics = process_event(event)
    processed_events.append(processed_event)
    processed_topics.extend(topics)

print(processed_events[:5])
print(processed_topics[:5])

[{'id': '5579109808', 'type': 'WatchEvent', 'public': True, 'created_at': 1767368581.0, 'actor__id': 232700169, 'actor__login': 'reneej6573'}, {'id': '5578936787', 'type': 'WatchEvent', 'public': True, 'created_at': 1767367668.0, 'actor__id': 230978005, 'actor__login': 'eveliinavesala'}, {'id': '5578582817', 'type': 'WatchEvent', 'public': True, 'created_at': 1767365923.0, 'actor__id': 101908599, 'actor__login': 'claudiacp1999'}, {'id': '5578500372', 'type': 'WatchEvent', 'public': True, 'created_at': 1767365470.0, 'actor__id': 143253669, 'actor__login': 'kimodri'}, {'id': '5578250424', 'type': 'WatchEvent', 'public': True, 'created_at': 1767364182.0, 'actor__id': 215745950, 'actor__login': 'awoyemiemmanuel1900'}]
[]


**Real-world data is rarely clean!** We often receive raw, nested, and inconsistent data. This is why the **normalization process** is so important. It **prepares** the data for efficient storage and analysis. dlt (data load tool) simplifies the **normalization process**, automatically transforming raw data into a **structured, clean format** that is ready for storage and analysis.

### Loading data

Now that we've covered **extracting** and **normalizing** data, the final step is **loading** the data **into a destination**. This is where the processed data is stored, making it ready for querying, analysis, or further transformations.

##### How data loading usually happens

Before dlt, data engineers had to manually handle **schema validation, batch processing, error handling, and retries** for every destination. This process becomes especially complex when loading data into **data warehouses and data lakes**, where performance optimization, partitioning and incremental updates are critical.

**Example: Loading data into database**

Here's and DuckDB script that creates two separate tables:

1. Watch_events --> Stores WatchEvents data
2. pull_request_events --> Stores PullRequestEvent data

A basic pipeline requires:
  1. Setting up a database connection.
  2. Creating tables and defining schemas.
  3. Writing queries to insert/update data.
  4. Handling schema changes manually. 

In [27]:
#%pip install duckdb

In [28]:
import duckdb

# 1. Create a connection to a DuckDB database

conn = duckdb.connect('github_events.duckdb')

In [29]:
processed_events[0]

{'id': '5579109808',
 'type': 'WatchEvent',
 'public': True,
 'created_at': 1767368581.0,
 'actor__id': 232700169,
 'actor__login': 'reneej6573'}

In [30]:
# 2. Create the `github_events` table

conn.execute("""
CREATE TABLE IF NOT EXISTS github_events (
             id TEXT PRIMARY KEY,
             type TEXT,
             public BOOLEAN,
             created_at DOUBLE,
             actor__id BIGINT,
             actor__login TEXT
);
""")

<_duckdb.DuckDBPyConnection at 0x24fc014b4f0>

In [31]:
flattened_data = [
    (
        record['id'],
        record['type'],
        record['public'],
        record['created_at'],
        record['actor__id'],
        record['actor__login']

    )
    for record in processed_events
]

# 3. Insert data into the `github_events` table

conn.executemany("""
INSERT INTO github_events (id, type, public, created_at, actor__id, actor__login)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO NOTHING;
""", flattened_data)

<_duckdb.DuckDBPyConnection at 0x24fc014b4f0>

In [32]:
df = conn.execute("SELECT * FROM github_events LIMIT 5").df()
df.head()

Unnamed: 0,id,type,public,created_at,actor__id,actor__login,repo__id
0,5415617638,WatchEvent,True,1766163000.0,98461747,Heinmyatko1996,419661684
1,5415481638,ForkEvent,True,1766162000.0,67496079,tessadt,419661684
2,5414379751,WatchEvent,True,1766159000.0,13095925,piyush-24sharma,419661684
3,5411726096,WatchEvent,True,1766151000.0,1312845,qivhou,419661684
4,5410423440,WatchEvent,True,1766146000.0,130214920,s0nnyc,419661684


In [33]:
conn.close()

Problems:
* **Schema management is manual** - If the schema changes, you need to update table structures manually.
* **No automatic retries** - If the network falls, data may be lost.
* **No incremental loading** - Every run reloads everything, making it slow and expensive.
* **More code to maintain** - A simple pipeline quickly becomes complex.

## **Dynamic schema handling in DuckDB**

Many engineers take shortcuts when loading data, thinking: it works, so why overcomplicate it?

But this mindset leads to technical debt and painful maintenance cycles:

 * **You will be stuck doing maintenance** - Quick scripts break as requirements evolve, turning you into a permanent firefighter.
 * **Fixing ingestion in production is 10x harder** - Once data is in use, breaking changes mean lost trust and late-night emergencies.
 * **Bad data loading is like bad architecture** - Just as a poor data model causes downstream issues, unreliable ingestion causes instability across the entire stack.

 In real-world data engineering, incoming data estructures often change - new columns appear, and old ones may disappear. Instead of manually updating schemas, we can automate schema evolution to ensure our pipelines keep running smoothly. 

In [34]:
def process_event(event):
    result = {}

    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']

    parsed_timestamp = datetime.fromisoformat(event['created_at'])
    result['created_at'] = parsed_timestamp.timestamp()

    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    result['repo__id'] = event['repo']['id']


    topics = event.get('payload', {}).get('pull_request', {}).get('base', {}).get('repo', {}).get('topics', [])

    processed_topics = []
    for topic in topics:   
        processed_topic = {
            'event_id': event['id'],
            'topic': topic
        }
        processed_topics.append(processed_topic)
    
    return result, processed_topics

In [35]:
processed_events = []
processed_topics = []

for event in all_data:
    processed_event, topics = process_event(event)
    processed_events.append(processed_event)
    processed_topics.extend(topics)

print(processed_events[:5])
print(processed_topics[:5])

[{'id': '5579109808', 'type': 'WatchEvent', 'public': True, 'created_at': 1767368581.0, 'actor__id': 232700169, 'actor__login': 'reneej6573', 'repo__id': 419661684}, {'id': '5578936787', 'type': 'WatchEvent', 'public': True, 'created_at': 1767367668.0, 'actor__id': 230978005, 'actor__login': 'eveliinavesala', 'repo__id': 419661684}, {'id': '5578582817', 'type': 'WatchEvent', 'public': True, 'created_at': 1767365923.0, 'actor__id': 101908599, 'actor__login': 'claudiacp1999', 'repo__id': 419661684}, {'id': '5578500372', 'type': 'WatchEvent', 'public': True, 'created_at': 1767365470.0, 'actor__id': 143253669, 'actor__login': 'kimodri', 'repo__id': 419661684}, {'id': '5578250424', 'type': 'WatchEvent', 'public': True, 'created_at': 1767364182.0, 'actor__id': 215745950, 'actor__login': 'awoyemiemmanuel1900', 'repo__id': 419661684}]
[]


In [36]:
# 1. Create a connection to a DuckDB database

conn = duckdb.connect('github_events.duckdb')

In [37]:
current_columns = {row[1] for row in conn.execute("PRAGMA table_info('github_events');").fetchall()}
print(current_columns)

{'actor__id', 'created_at', 'id', 'actor__login', 'public', 'repo__id', 'type'}


In [38]:
# 3. Detect and add new columns dynamically

for record in processed_events[10:]:
    for key in record.keys():
        if key not in current_columns:
            col_type = 'TEXT'  # Default data type
            if isinstance(record[key], bool):
                col_type = 'BOOLEAN'
            elif isinstance(record[key], int):
                col_type = 'BIGINT'
            elif isinstance(record[key], float):
                col_type = 'DOUBLE'
            print(f"ALTER TABLE github_events ADD COLUMN {key} {col_type};")
            alter_query = f"ALTER TABLE github_events ADD COLUMN {key} {col_type};"
            conn.execute(alter_query)
            print(f"Added new column: {key} ({col_type})")
            current_columns.add(key) # Update schema tracking

In [39]:
# 4. Prepare data for insertion (handle missing fields)
columns = sorted(current_columns)    # Maintain consistent column order
flattened_data = [
    tuple(record.get(col, None) for col in columns) # Fill missing values with NULL
    for record in processed_events
]

# 5. Construct dynamic SQL for insertion
placeholders = ', '.join(["?" for _ in columns])
columns_str = ', '.join(columns)

insert_query = f"""
INSERT INTO github_events ({columns_str})
VALUES ({placeholders})
ON CONFLICT (id) DO UPDATE SET {', '.join([f"{col}=excluded.{col}" for col in columns if col != 'id'])};
"""

In [40]:
# 6. Insert data into DuckDB
conn.executemany(insert_query, flattened_data)

<_duckdb.DuckDBPyConnection at 0x24fc089cb30>

In [41]:
# 7. Query the table
df = conn.execute("""SELECT * FROM github_events""").df() #  LIMIT 5

df.head()

Unnamed: 0,id,type,public,created_at,actor__id,actor__login,repo__id
0,5415617638,WatchEvent,True,1766163000.0,98461747,Heinmyatko1996,419661684
1,5415481638,ForkEvent,True,1766162000.0,67496079,tessadt,419661684
2,5414379751,WatchEvent,True,1766159000.0,13095925,piyush-24sharma,419661684
3,5411726096,WatchEvent,True,1766151000.0,1312845,qivhou,419661684
4,5410423440,WatchEvent,True,1766146000.0,130214920,s0nnyc,419661684


In [42]:
# conn.close()

**What's happening here?**

Step                    Action
1. Connects to DuckDB
2. Fetches current schema from DuckDB
3. Detects and **adds new columns dynamically**
4. Fills missing values with NULL to ensure smooth inserts.
5. Dynamically constructs an **INSERT or UPDATE (UPSERT)** query.
6. Inserts new data while updating existing records.
7. Queries the data.

## Welcome to real-world Data Engineering

This example shows how to **dynamically handle schema evolution** in DuckDB. But this is just the beginning!

### **What else can you do?**

 * **Dynamic table creation** - Automatically generate tables for new datasets.
 * **Detect data type changes** - Ensure schema consistency as data evolves.
 * **Enable incremental loading** - Efficiently update data instead of full reloads.
 * **Optimize performance** - Use **parallelization** for faster ingestion.
 * **Ensure atomicity** - Make updates fail-safe with **transactions**.


### ‚ö†Ô∏è**The reality: it's a Full-Time job!**

Imagine how your **codebase will grow** as you extend this logic for productions destinations (PostgreSQL, BigQuery, etc).

**Production challenges:**

‚ùå**Manual work & boilerplate code** - Schema tracking, type conversion, migrations...

‚ùå**Performance tuning** - Parallel execution, indexing, partitioning...

‚ùå**Testing & validation** - Unit tests, integration tests, rollback strategies...

‚ùå**Scaling gets messy** - What works for 1M rows breaks at 100M.

‚ùå**Maintaining & keeping it up-to-date** - Because data never stops changing.


### üí°**Skip all these struggles: use dlt**

Instead of **reinventing the wheel**, you can **skip all this manual work** and just use **dlt**:
‚úÖ**Automatic schema management** - Handles schema changes for you. New columns, type changes? Handled.

‚úÖ**Incremental loading** - Ingest only new or updated records efficiently.

‚úÖ**Works with your stack** - PostgreSQL, BigQuery, Snowflake, DuckDB, and more.

‚úÖ**Built-in parallelization** - Faster ingestion with zero manual tuning.

‚úÖ**Automatic & reliable** - Ensures data consistency with automatic retries.

This is the **real work of a data platform engineer** - making pipelines scalable, robust, and maintainable. But with dlt, you get all of this **out of the box**.


So, what's your next step? Schema versioning? Building data lakes? Automatic retry mechanisms?

The deeper you go, the more you realize: building a production-grade data pipeline is not just a script - **it's an entire platform.**

üî•**Or you can just use dlt and focus on what really matters - your data.** üî•


# Part two: 
# Data engineering for senior data professionals

In [43]:
#%pip install dlt[duckdb] # install dlt with all the necessary DuckDB dependencies

### Overview


!pip install dlt[duckdb] # Install dlt with all the necessary DuckDB dependencies.

* Extraction (Use the Rest API client-step by step, dlt Resource, dlt configuration and secrets)

* Normalizing (Adjusting the normalization, setting a data contract - step by step, alerting schema changes (to slack)- step by step)

* Loading (configure write disposition in resource, understanding how replace works - step by step)

* Incremental loading (understanding incremental Extract, State, and Write disposition - how they work together - step by step incremental loading with a cursor field)

* SQL to SQL copy specifics (Skipping type inference via Arrow or ConnectorX, Backfilling, SCD2) 

* Performance tuning (Memory management, Asynchronous requesting, parallelism)

* Loading to Data Lakes & Lakehouses (Step by step-Load Parquet to local filesystem, step by step-Load iceberg to AWS Athena & Undestanding partitions)

* Loading to Data Warehouses/MPPs (Understanding staging, what is staging?)


### Deployment and orchestration

- Cron job

- Airflow

- Dagster

#### 2.1 Extracting data with dlt

#### 2.1.1 Extraction: Use the Rest API client

What is the dlt Rest client? it's declarative API client that can auto handle authentication, pagination, retries, etc. It was built to be used in the yaml no-code api source, but you can use it directly in your python code to benefit from its automation while mixing with pythonic  customisations.

Read more:

 * Docs rest client: https://dlthub.com/docs/general-usage/http/rest-client
 * Docs rest api source (self learn): https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api/basic


        previously : Python imperative - Spell out step by step what to do, implment your own client.
        Now : Hybrid (fast and customisable) - Use RestClient in python

        Configure RestClient in YAML RestAPI source.

* Step by step:

1. Copy a rest client code sample https://dlthub.com/docs/general-usage/http/rest-client
2. Configure base url, auth, paginator as we had them before

    * Base URL: "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp"
    * Auth: Bearer
    * Paginator: The response, links is in headers, so we need to use the header paginator.
    * Resource: Base URL/events.

In [44]:
# Let's implement here

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator #JSONLinkPaginator

BASE_URL = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp" # /events
API_TOKEN = os.getenv('ACCESS_TOKEN')

client = RESTClient(
    base_url=BASE_URL,
    # headers={"User-Agent": "MyApp/1.0"}, # we dont need this
    auth=BearerTokenAuth(token=API_TOKEN),  # type: ignore
    paginator=HeaderLinkPaginator(links_next_key="next"),
    # data_selector="data", # we dont need this
    # session=MyCustomSession() # we dont need this
)


for page in client.paginate("/events"):
    print(page)

[{'id': '5579109808', 'type': 'WatchEvent', 'actor': {'id': 232700169, 'login': 'reneej6573', 'display_login': 'reneej6573', 'gravatar_id': '', 'url': 'https://api.github.com/users/reneej6573', 'avatar_url': 'https://avatars.githubusercontent.com/u/232700169?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2026-01-02T15:43:01Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}, {'id': '5578936787', 'type': 'WatchEvent', 'actor': {'id': 230978005, 'login': 'eveliinavesala', 'display_login': 'eveliinavesala', 'gravatar_id': '', 'url': 'https://api.github.com/users/eveliinavesala', 'avatar_url': 'https://avatars.githubusercontent.com/u/230978005?'}, 'repo': {'id': 419661684, 'n

#### 2.1.2 dlt resource

* A resource is a function that yields data.
* To create a resource, we add the @dlt.resource decorator to that function.
* It allows for customization (e.g. naming, primary keys, incremental loading, parallel execution) which simplifies integration into data pipelines.

Step by step: 
  * Add a decorator (we will build on that later)

In [45]:
# implement here
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator #JSONLinkPaginator

BASE_URL = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp" # /events
API_TOKEN = os.getenv('ACCESS_TOKEN')


@dlt.resource()
def paginated_getter():

    client = RESTClient(
        base_url=BASE_URL,
        # headers={"User-Agent": "MyApp/1.0"}, # we dont need this
        auth=BearerTokenAuth(token=API_TOKEN),  # type: ignore
        paginator=HeaderLinkPaginator(links_next_key="next"),
        # data_selector="data", # we dont need this
        # session=MyCustomSession() # we dont need this
    )


    for page in client.paginate("/events"):
        yield page
    
for record in paginated_getter():
    print(record)

{'id': '5579109808', 'type': 'WatchEvent', 'actor': {'id': 232700169, 'login': 'reneej6573', 'display_login': 'reneej6573', 'gravatar_id': '', 'url': 'https://api.github.com/users/reneej6573', 'avatar_url': 'https://avatars.githubusercontent.com/u/232700169?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2026-01-02T15:43:01Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}
{'id': '5578936787', 'type': 'WatchEvent', 'actor': {'id': 230978005, 'login': 'eveliinavesala', 'display_login': 'eveliinavesala', 'gravatar_id': '', 'url': 'https://api.github.com/users/eveliinavesala', 'avatar_url': 'https://avatars.githubusercontent.com/u/230978005?'}, 'repo': {'id': 419661684, 'nam

#### Extracting data with dlt: Credentials

#### 2.1.3 dlt configuration and secrets

In dlt, configurations and secrets are essential for setting up data pipelines.

 * Configurations are non-sensitive settings.
 * Secrets are sensitive data like passwords, API keys, and private keys, which should never be hard-coded to avoid security risks.

These can be set up in various ways:

 * Environment variables
 * In code using dlt.secrets and dlt.config
 * Configuration files (secrets.toml and config.toml)

To retrieve the config or secrets from their storage and pass them in code, we will use dlt.secrets.value to define credentials in resources and sources.

Step by step:

  - Set access token in environment
  - Retrieve it from the via dlt's method

In [46]:
# implement here
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator #JSONLinkPaginator
import os

os.environ["ACCESS_TOKEN"] = os.getenv('ACCESS_TOKEN')

BASE_URL = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp" # /events
API_TOKEN = os.getenv('ACCESS_TOKEN')


@dlt.resource()
def paginated_getter(access_token=dlt.secrets.value):

    client = RESTClient(
        base_url=BASE_URL,
        # headers={"User-Agent": "MyApp/1.0"}, # we dont need this
        auth=BearerTokenAuth(token=API_TOKEN),  # type: ignore
        paginator=HeaderLinkPaginator(links_next_key="next"),
        # data_selector="data", # we dont need this
        # session=MyCustomSession() # we dont need this
    )


    for page in client.paginate("/events"):
        yield page
    
for record in paginated_getter():
    print(record)

{'id': '5579109808', 'type': 'WatchEvent', 'actor': {'id': 232700169, 'login': 'reneej6573', 'display_login': 'reneej6573', 'gravatar_id': '', 'url': 'https://api.github.com/users/reneej6573', 'avatar_url': 'https://avatars.githubusercontent.com/u/232700169?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2026-01-02T15:43:01Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}
{'id': '5578936787', 'type': 'WatchEvent', 'actor': {'id': 230978005, 'login': 'eveliinavesala', 'display_login': 'eveliinavesala', 'gravatar_id': '', 'url': 'https://api.github.com/users/eveliinavesala', 'avatar_url': 'https://avatars.githubusercontent.com/u/230978005?'}, 'repo': {'id': 419661684, 'nam

#### 2.2 Nomalizing data with dlt

#### 2.2.1 Normalizing

Why use dlt for normalization?

  * Automatically detects and evolves schema - No need to define column types manually, no manual maintenance.
  * Flattens nested JSON - Converts complex structures into table-ready formats.
  * Handles data type conversion - Converts dates, numbers, and booleans correctly.
  * Splits lists into child tables - Ensures relational integrity for better analysis.


Step by step

  1. Demonstrating the default normalisation - it happens automatically so there is nothing to do, let's just discuss it.

In [47]:
# One record of data looked like this:

import json

print(json.dumps(record, indent=2))

{
  "id": "5430563520",
  "type": "WatchEvent",
  "actor": {
    "id": 205358434,
    "login": "trucoshan",
    "display_login": "trucoshan",
    "gravatar_id": "",
    "url": "https://api.github.com/users/trucoshan",
    "avatar_url": "https://avatars.githubusercontent.com/u/205358434?"
  },
  "repo": {
    "id": 419661684,
    "name": "DataTalksClub/data-engineering-zoomcamp",
    "url": "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp"
  },
  "payload": {
    "action": "started"
  },
  "public": true,
  "created_at": "2025-12-21T08:13:48Z",
  "org": {
    "id": 72699292,
    "login": "DataTalksClub",
    "gravatar_id": "",
    "url": "https://api.github.com/orgs/DataTalksClub",
    "avatar_url": "https://avatars.githubusercontent.com/u/72699292?"
  }
}


Instead of manually flattening fields and extracting nested lists, we can load it directly into DuckDB and let dlt auto normalise it:

In [48]:
import dlt

# Define a dlt pipeline with automatic normalization
pipeline = dlt.pipeline(
    pipeline_name="github_data",
    destination="duckdb",
    dataset_name="events",
)

# Run the pipeline with raw nested data
info = pipeline.run(paginated_getter(), table_name="events", write_disposition="replace")

# Print the load summary
print(info)

# Print the tables
# print(pipeline.dataset(dataset_type="default").schema.data_table_names()) # old way
print(pipeline.default_schema.data_table_names())

# Print events table
# pipeline.dataset(dataset_type="default").events.df() # old way
pipeline.dataset().events.df()


Pipeline github_data load step completed in 1.23 seconds
1 load package(s) were loaded to destination duckdb and into dataset events
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\github_data.duckdb location to store data
Load package 1767368732.416199 is LOADED and contains no failed jobs
['events', 'github_events']


Unnamed: 0,id,type,actor__id,actor__login,actor__display_login,actor__gravatar_id,actor__url,actor__avatar_url,repo__id,repo__name,...,payload__pull_request__head__ref,payload__pull_request__head__sha,payload__pull_request__head__repo__id,payload__pull_request__head__repo__url,payload__pull_request__head__repo__name,payload__pull_request__base__ref,payload__pull_request__base__sha,payload__pull_request__base__repo__id,payload__pull_request__base__repo__url,payload__pull_request__base__repo__name
0,5579109808,WatchEvent,232700169,reneej6573,reneej6573,,https://api.github.com/users/reneej6573,https://avatars.githubusercontent.com/u/232700...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
1,5578936787,WatchEvent,230978005,eveliinavesala,eveliinavesala,,https://api.github.com/users/eveliinavesala,https://avatars.githubusercontent.com/u/230978...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
2,5578582817,WatchEvent,101908599,claudiacp1999,claudiacp1999,,https://api.github.com/users/claudiacp1999,https://avatars.githubusercontent.com/u/101908...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
3,5578500372,WatchEvent,143253669,kimodri,kimodri,,https://api.github.com/users/kimodri,https://avatars.githubusercontent.com/u/143253...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
4,5578250424,WatchEvent,215745950,awoyemiemmanuel1900,awoyemiemmanuel1900,,https://api.github.com/users/awoyemiemmanuel1900,https://avatars.githubusercontent.com/u/215745...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
293,5432793388,WatchEvent,149037461,Gorghs,Gorghs,,https://api.github.com/users/Gorghs,https://avatars.githubusercontent.com/u/149037...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
294,5431931641,WatchEvent,88236364,adinfahru,adinfahru,,https://api.github.com/users/adinfahru,https://avatars.githubusercontent.com/u/88236364?,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
295,5431743515,WatchEvent,45501530,tushar-punjabi,tushar-punjabi,,https://api.github.com/users/tushar-punjabi,https://avatars.githubusercontent.com/u/45501530?,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
296,5430685374,WatchEvent,84562427,florim-agimi,florim-agimi,,https://api.github.com/users/florim-agimi,https://avatars.githubusercontent.com/u/84562427?,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,


In [49]:
import dlt

# 1. Define the resource with type hints for the empty columns
@dlt.resource(
    name="events",
    columns={
        "payload__forkee__language": {"data_type": "text"},
        "payload__forkee__mirror_url": {"data_type": "text"},
        "payload__forkee__license": {"data_type": "text"},
    }
)

def github_events():
    yield from paginated_getter()

# 2. Setup the pipeline
pipeline = dlt.pipeline(
    pipeline_name="github_data",
    destination="duckdb",
    dataset_name="events",
)

# 3. Run using the resource
info = pipeline.run(github_events, write_disposition="replace")

print(info)

# 4. Correct way to access schema and data
print("Tables found:", pipeline.default_schema.data_table_names())
df = pipeline.dataset().events.df()
#print(df.head())
df.head()

Pipeline github_data load step completed in 1.20 seconds
1 load package(s) were loaded to destination duckdb and into dataset events
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\github_data.duckdb location to store data
Load package 1767368740.5610538 is LOADED and contains no failed jobs
Tables found: ['events', 'github_events']


Unnamed: 0,id,type,actor__id,actor__login,actor__display_login,actor__gravatar_id,actor__url,actor__avatar_url,repo__id,repo__name,...,payload__pull_request__head__ref,payload__pull_request__head__sha,payload__pull_request__head__repo__id,payload__pull_request__head__repo__url,payload__pull_request__head__repo__name,payload__pull_request__base__ref,payload__pull_request__base__sha,payload__pull_request__base__repo__id,payload__pull_request__base__repo__url,payload__pull_request__base__repo__name
0,5579109808,WatchEvent,232700169,reneej6573,reneej6573,,https://api.github.com/users/reneej6573,https://avatars.githubusercontent.com/u/232700...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
1,5578936787,WatchEvent,230978005,eveliinavesala,eveliinavesala,,https://api.github.com/users/eveliinavesala,https://avatars.githubusercontent.com/u/230978...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
2,5578582817,WatchEvent,101908599,claudiacp1999,claudiacp1999,,https://api.github.com/users/claudiacp1999,https://avatars.githubusercontent.com/u/101908...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
3,5578500372,WatchEvent,143253669,kimodri,kimodri,,https://api.github.com/users/kimodri,https://avatars.githubusercontent.com/u/143253...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
4,5578250424,WatchEvent,215745950,awoyemiemmanuel1900,awoyemiemmanuel1900,,https://api.github.com/users/awoyemiemmanuel1900,https://avatars.githubusercontent.com/u/215745...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,


In [50]:
# This lists all tables, including those created for nested lists
print("All tables in schema:", pipeline.default_schema.tables.keys())

All tables in schema: dict_keys(['_dlt_version', '_dlt_loads', 'events', '_dlt_pipeline_state', 'github_events'])


In [51]:
'''
# Print the unnested table
# pipeline.dataset(dataset_type="default").events_payload_commits.df() # "__"events_payload_commits is a table created for the nested 'commits' list in the payload
pipeline.dataset().events__payload__commits.df()
'''

'\n# Print the unnested table\n# pipeline.dataset(dataset_type="default").events_payload_commits.df() # "__"events_payload_commits is a table created for the nested \'commits\' list in the payload\npipeline.dataset().events__payload__commits.df()\n'

In [52]:
# Checks pipeline execution
info = pipeline.run(github_events, write_disposition="replace")
print(info)  # Check if there are any warnings/errors

# Inspect 'github_events' function
def github_events():
    data = paginated_getter()  # Collect data from the API
    for item in data:
        print(item)  # Print each item to check the data
        yield item



Pipeline github_data load step completed in 1.15 seconds
1 load package(s) were loaded to destination duckdb and into dataset events
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\github_data.duckdb location to store data
Load package 1767368748.4869373 is LOADED and contains no failed jobs


In [53]:
# Run the pipeline
info = pipeline.run(github_events, write_disposition="replace")
print(info)

# After running, check for tables again
with pipeline.sql_client() as client:
    with client.execute_query("SELECT * FROM information_schema.tables") as cursor:
        tables = cursor.fetchall()
        print("Tables in database:", tables)


{'id': '5579109808', 'type': 'WatchEvent', 'actor': {'id': 232700169, 'login': 'reneej6573', 'display_login': 'reneej6573', 'gravatar_id': '', 'url': 'https://api.github.com/users/reneej6573', 'avatar_url': 'https://avatars.githubusercontent.com/u/232700169?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2026-01-02T15:43:01Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}
{'id': '5578936787', 'type': 'WatchEvent', 'actor': {'id': 230978005, 'login': 'eveliinavesala', 'display_login': 'eveliinavesala', 'gravatar_id': '', 'url': 'https://api.github.com/users/eveliinavesala', 'avatar_url': 'https://avatars.githubusercontent.com/u/230978005?'}, 'repo': {'id': 419661684, 'nam

  - payload__forkee__language
  - payload__forkee__mirror_url
  - payload__forkee__license

Unless type hints are provided, these columns will not be materialized in the destination.
One way to provide type hints is to use the 'columns' argument in the '@dlt.resource' decorator.  For example:

@dlt.resource(columns={'payload__forkee__language': {'data_type': 'text'}})



Pipeline github_data load step completed in 1.24 seconds
1 load package(s) were loaded to destination duckdb and into dataset events
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\github_data.duckdb location to store data
Load package 1767368756.2321224 is LOADED and contains no failed jobs
Tables in database: [('github_data', 'events', 'events', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None, None), ('github_data', 'events', 'github_events', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None, None), ('github_data', 'events', '_dlt_loads', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None, None), ('github_data', 'events', '_dlt_pipeline_state', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None, None), ('github_data', 'events', '_dlt_version', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None, None)]


In [54]:
# VERIFY TABLE EXISTENCE

sql_check = """
SELECT *
FROM information_schema.tables
WHERE table_name = 'events'
"""
with pipeline.sql_client() as client:
    with client.execute_query(sql_check) as cursor:
        tables = cursor.fetchall()
        print("Tables in database:", tables)


Tables in database: [('github_data', 'events', 'events', 'BASE TABLE', None, None, None, None, None, 'YES', 'NO', None, None)]


In [55]:
sql1 = """ 
SELECT *
FROM events
"""

with pipeline.sql_client() as client:
    with client.execute_query(sql1) as cursor:
        # Get all data from the cursor as a list of tuples
        print(cursor.fetchall())

[('5579109808', 'WatchEvent', 232700169, 'reneej6573', 'reneej6573', '', 'https://api.github.com/users/reneej6573', 'https://avatars.githubusercontent.com/u/232700169?', 419661684, 'DataTalksClub/data-engineering-zoomcamp', 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp', 'started', True, datetime.datetime(2026, 1, 2, 15, 43, 1, tzinfo=<UTC>), 72699292, 'DataTalksClub', '', 'https://api.github.com/orgs/DataTalksClub', 'https://avatars.githubusercontent.com/u/72699292?', '1767368748.4869373', 'Db/UlbhvagUzmQ', None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, 

In [56]:
sql = """ 
SELECT *
FROM events e 
LEFT JOIN events__payload__commits c
ON e._dlt_id = c._dlt_parent_id
"""



In [57]:
# Print the default schema in a pretty YAML format
print(pipeline.default_schema.to_pretty_yaml())

version: 18
version_hash: xYb7halvoDmb/ril91u6X4DPbi9PmcQHUcDvEs+MjMg=
engine_version: 11
name: github_data
tables:
  _dlt_version:
    columns:
      version:
        data_type: bigint
        nullable: false
      engine_version:
        data_type: bigint
        nullable: false
      inserted_at:
        data_type: timestamp
        nullable: false
      schema_name:
        data_type: text
        nullable: false
      version_hash:
        data_type: text
        nullable: false
      schema:
        data_type: text
        nullable: false
    write_disposition: skip
    resource: _dlt_version
    description: Created by DLT. Tracks schema updates
  _dlt_loads:
    columns:
      load_id:
        data_type: text
        nullable: false
        precision: 64
      schema_name:
        data_type: text
        nullable: true
      status:
        data_type: bigint
        nullable: false
      inserted_at:
        data_type: timestamp
        nullable: false
      schema_version_hash

#### 2.2.2 Adjusting the normalization

This can de done in a couple of ways

   * You can customise how resources handle nested data via max_table_nesting https://dlthub.com/docs/general-usage/resource#reduce-the-nesting-level-of-generated-tables 

   * dlt infers and saves a schema from the data, which also contains configuration for the normalizer, which can be adjusted https://dlthub.com/docs/walkthroughs/adjust-a-schema

#### 2.2.3 Setting a `data contract`

Finally, we can use the data contract functionality to fix a schema and use it use as a data contract - this can be configured to prevent the automatic schema evolution. https://dlthub.com/docs/general-usage/schema-contracts

In [58]:
# Example data contract
@dlt.resource(schema_contract={"tables":"discard_row", "columns":"freeze", "data_type":"evolve"}) 
def events():
    ...

Step by step

  * Load data and set a contract
  * Add a column to the incoming data set (append in py) and see it get rejected
  * Remove the contract and see it get accepted

In [59]:
import dlt
import os

API_TOKEN = os.getenv('ACCESS_TOKEN')
BASE_URL = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp" # /events

from dlt.sources.helpers.rest_client import RESTClient  
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator

# We want to set on the second run
# schema_contract={"tables":"discard_row", "columns":"freeze", "data_type":"freeze"}

@dlt.resource(name="events", schema_contract={"tables":"evolve", "columns":"evolve", "data_type":"evolve"}) # evolve, freeze, discard_row, are terms of contract
def paginated_getter():                                                                                     # we could change data_type to "freeze" to avoid data type changes

    client = RESTClient(
        base_url=BASE_URL,
        auth=BearerTokenAuth(token=API_TOKEN),  # type: ignore
        paginator=HeaderLinkPaginator(links_next_key="next"),
    )

    for page in client.paginate("/events"):
        for record in page:
            # We wanna try to add some data that doesn't conform
            record['extra_data33']=123   # the contract is set to evolve so it should accept this new column, so we will see it in the table after running the pipeline as an additional column %%%_v_
            yield record

pipeline = dlt.pipeline(
    pipeline_name="github_data_contract",
    destination="duckdb",
    dataset_name="events",
)

# Run the pipeline with raw nested data
info = pipeline.run(paginated_getter, table_name="events", write_disposition="replace") # <------ LOAD ONLY THE LAST RUN
print(info)

# Explore loaded data
pipeline.dataset().events.df().head()

  - payload__forkee__language
  - payload__forkee__mirror_url
  - payload__forkee__license

Unless type hints are provided, these columns will not be materialized in the destination.
One way to provide type hints is to use the 'columns' argument in the '@dlt.resource' decorator.  For example:

@dlt.resource(columns={'payload__forkee__language': {'data_type': 'text'}})



Pipeline github_data_contract load step completed in 1.13 seconds
1 load package(s) were loaded to destination duckdb and into dataset events
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\github_data_contract.duckdb location to store data
Load package 1767368764.6659305 is LOADED and contains no failed jobs


Unnamed: 0,id,type,actor__id,actor__login,actor__display_login,actor__gravatar_id,actor__url,actor__avatar_url,repo__id,repo__name,...,payload__pull_request__head__ref,payload__pull_request__head__sha,payload__pull_request__head__repo__id,payload__pull_request__head__repo__url,payload__pull_request__head__repo__name,payload__pull_request__base__ref,payload__pull_request__base__sha,payload__pull_request__base__repo__id,payload__pull_request__base__repo__url,payload__pull_request__base__repo__name
0,5579109808,WatchEvent,232700169,reneej6573,reneej6573,,https://api.github.com/users/reneej6573,https://avatars.githubusercontent.com/u/232700...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
1,5578936787,WatchEvent,230978005,eveliinavesala,eveliinavesala,,https://api.github.com/users/eveliinavesala,https://avatars.githubusercontent.com/u/230978...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
2,5578582817,WatchEvent,101908599,claudiacp1999,claudiacp1999,,https://api.github.com/users/claudiacp1999,https://avatars.githubusercontent.com/u/101908...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
3,5578500372,WatchEvent,143253669,kimodri,kimodri,,https://api.github.com/users/kimodri,https://avatars.githubusercontent.com/u/143253...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
4,5578250424,WatchEvent,215745950,awoyemiemmanuel1900,awoyemiemmanuel1900,,https://api.github.com/users/awoyemiemmanuel1900,https://avatars.githubusercontent.com/u/215745...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,


#### 2.2.3 Alerting schema changes (to Slack)

If you are loading your data to a raw staging layer, you might just want to load everything that comes, and be alerted of schema evolution.

docs: https://dlthub.com/docs/general-usage/schema-evolution#alert-schema-changes-to-curate-new-data

Step by step

  1. Visit docs to copy the snippet
  2. Adjust your code-load with a new pipeline to see the schema evolve
  3. Look at the emitted data and discuss how we could leverage that for lineage

In [60]:
import dlt
import os

API_TOKEN = os.getenv('ACCESS_TOKEN')
BASE_URL = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp" # /events

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth        
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator #JSONLinkPaginator

@dlt.resource(name="events")
def paginated_getter():                                                                                    

    client = RESTClient(
        base_url=BASE_URL,
        auth=BearerTokenAuth(token=API_TOKEN),  # type: ignore
        paginator=HeaderLinkPaginator(links_next_key="next"),
    )

    for page in client.paginate("/events"):
        yield page


@dlt.resource(
    name="events",
    columns={
        "payload__forkee__language": {"data_type": "text"},
        "payload__forkee__mirror_url": {"data_type": "text"},
        "payload__forkee__license": {"data_type": "text"},
    }
)

def github_events():
    yield from paginated_getter()





pipeline = dlt.pipeline(
    pipeline_name="github_data_evolved",
    destination="duckdb",
    dataset_name="events",
)

# Run the pipeline with raw nested data
load_info = pipeline.run(paginated_getter, table_name="events", write_disposition="replace")

for package in load_info.load_packages:
    # Iterate over each table in the schema_update of the current package
    for table_name, table in package.schema_update.items():
        # print(f"Table: {table_name}")
        # Iterate over each column in the current table
        for column_name, column in table["columns"].items(): # table["columns"]
            print(column_name, column, table)
            # Send a message to the Slack channel with the table
    

In [61]:
import dlt
import os
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth        
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator

API_TOKEN = os.getenv('ACCESS_TOKEN')
BASE_URL = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp"

@dlt.resource(
    name="events",
    write_disposition="replace",
    columns={
        "payload__forkee__language": {"data_type": "text"},
        "payload__forkee__mirror_url": {"data_type": "text"},
        "payload__forkee__license": {"data_type": "text"},
    }
)
def github_events():
    client = RESTClient(
        base_url=BASE_URL,
        auth=BearerTokenAuth(token=API_TOKEN) if API_TOKEN else None,
        paginator=HeaderLinkPaginator(),
    )

    # Note: GitHub events are often under "/events"
    for page in client.paginate("/events"):
        yield page

pipeline = dlt.pipeline(
    pipeline_name="github_data_evolved",
    destination="duckdb",
    dataset_name="github_events_data",
)

# CRITICAL: Run 'github_events', NOT 'paginated_getter'
#load_info = pipeline.run(github_events)

#print(load_info)

# Run the pipeline with raw nested data
load_info = pipeline.run(paginated_getter, table_name="events", write_disposition="replace")

for package in load_info.load_packages:
    # Iterate over each table in the schema_update of the current package
    for table_name, table in package.schema_update.items():
        # print(f"Table: {table_name}")
        # Iterate over each column in the current table
        for column_name, column in table["columns"].items(): # table["columns"]
            print(column_name, column, table)
            # Send a message to the Slack channel with the table

#### 2.2 Loading data with dlt

#### Loading

#### Configure write disposition in resource, understanding how replace works

To leverage dlt fully, we give it end-to-end control over data by using the @resource decorator at extraction, and the pipeline object for loading.

The pipeline can also load non-resource iterable objects like lists of dicts, panda frames, arrow tables, etc. but you benefit from the full capabilities and scalability of dlt by wrapping the iterable with a @resource decorator. This gives you more configuration options, higher granularity of control, and core functionality.

A simple example: Most data resources have multiple resources. We can package multiple resources together into a @source function. Each @resource can have its own configuration, while the pipeline can load the @source and only configure the behavior for the entire contents of a source.

Step by step:

  1. Copy your code from the previous example.
  2. Change the write disposition declaration from pipeline to resource 
  3. Run it end to end with write disposition "replace"
  4. Discuss how Replace happens under the hood (it can be configured via dlt destination config) https://dlthub.com/docs/general-usage/full-loading#choosing-the-correct-replace-strategy-for-your-full-load

        * Default is "good enough for most cases"
        * You can sometimes set a more effective but risky mode
        * or you can choose a safer but slower and more expensive mode
  
  5. Discuss pipeline last_trace 

In [62]:
import os
import dlt

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator

os.environ["ACCESS_TOKEN"] = os.getenv('ACCESS_TOKEN')

@dlt.resource(name="events", 
              write_disposition="replace", # <----- Set write_disposition here
              columns={
                        "payload__forkee__language": {"data_type": "text"},
                        "payload__forkee__mirror_url": {"data_type": "text"},
                        "payload__forkee__license": {"data_type": "text"},
    }) 
def paginated_getter(
    access_token=dlt.secrets.value,
):
    client = RESTClient(
        base_url="https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp",
        auth=BearerTokenAuth(token=access_token),  # type: ignore
        paginator=HeaderLinkPaginator(links_next_key="next"),
    )

    for page in client.paginate("/events"):
        yield page

# define new dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="github_pipeline",
    destination="duckdb",
    dataset_name="events_dataset",
)

# Run the pipeline with the new resource
load_info = pipeline.run(paginated_getter) # <----- No need to specify write_disposition here ...from here to the resource decorator

print(pipeline.last_trace)

#print the event table
pipeline.dataset().events.df().head()

Run started at 2026-01-02 15:46:29.666058+00:00 and COMPLETED in 8.41 seconds with 4 steps.
Step extract COMPLETED in 6.76 seconds.

Load package 1767368789.8231077 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.31 seconds.
Normalized data for the following tables:
- events: 298 row(s)
- _dlt_pipeline_state: 1 row(s)

Load package 1767368789.8231077 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 1.22 seconds.
Pipeline github_pipeline load step completed in 1.16 seconds
1 load package(s) were loaded to destination duckdb and into dataset events_dataset
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\github_pipeline.duckdb location to store data
Load package 1767368789.8231077 is LOADED and contains no failed jobs

Step run COMPLETED in 8.41 seconds.
Pipeline github_pipeline load step completed in 1.16 seco

Unnamed: 0,id,type,actor__id,actor__login,actor__display_login,actor__gravatar_id,actor__url,actor__avatar_url,repo__id,repo__name,...,payload__pull_request__head__repo__url,payload__pull_request__head__repo__name,payload__pull_request__base__ref,payload__pull_request__base__sha,payload__pull_request__base__repo__id,payload__pull_request__base__repo__url,payload__pull_request__base__repo__name,payload__forkee__language,payload__forkee__mirror_url,payload__forkee__license
0,5579109808,WatchEvent,232700169,reneej6573,reneej6573,,https://api.github.com/users/reneej6573,https://avatars.githubusercontent.com/u/232700...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
1,5578936787,WatchEvent,230978005,eveliinavesala,eveliinavesala,,https://api.github.com/users/eveliinavesala,https://avatars.githubusercontent.com/u/230978...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
2,5578582817,WatchEvent,101908599,claudiacp1999,claudiacp1999,,https://api.github.com/users/claudiacp1999,https://avatars.githubusercontent.com/u/101908...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
3,5578500372,WatchEvent,143253669,kimodri,kimodri,,https://api.github.com/users/kimodri,https://avatars.githubusercontent.com/u/143253...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
4,5578250424,WatchEvent,215745950,awoyemiemmanuel1900,awoyemiemmanuel1900,,https://api.github.com/users/awoyemiemmanuel1900,https://avatars.githubusercontent.com/u/215745...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,


#### 2.3 Incremental loading data with dlt

#### Understanding incremental Extract, State, and Write disposition

To master incremental loading, you need to understand the following concepts and how they relate:

Incremental loading is composed of keeping track of where we left off, incremental extraction and the specific way of adding or updating the increment to the destination - the write disposition.

- **Incremental State:** A marker or checkpoint from the last load (like a timestamp or record ID) is maintained to track where the last extraction ended.

- **Incremental Extract:** Only new or changed data since the last extraction is processed, rather than the whole dataset. It needs the incremental state to know the increment boundaries.

- **Write Disposition:** Defines how new data is written (replace, append, merge)

**How they work together:**

  1. The **incremental state** tracks progress from the **last load.**
  2. On pipeline run, the next **extract** checks **that state** to extract **only new data.** If this is the first run, you might provide starting "defaults" for the state.
  3. After extraction, **write disposition** manages how data is inserted or replaced. The state is updated when the new increment is loaded.

Now, we can mix and match, and configure. Let's have a look at dlt incremental loadings docs https://dlthub.com/docs/general-usage/incremental-loading

As you can see, there are multiple ways to put it together. Let's do it for our example.

**Step by Step: Incremental loading with a cursor field**

One way to configure our incremental extraction together with state is via a **cursor** field.

Most REST APIs and data sources allow you to fetch only new or updated records by sending the last known timestamp or ID, or what we call a **cursor** field. Here's how it works:

   1. Identify the Cursor Field: Find the field that tracks changes (for example, "created_at" or "updated_at").
   2. Use the Last Value: let dlt select the last value from your past incremental and re-use it as a starting point for the next.
   3. Use "append" write disposition, so we append the new data. If we leave replace, we will replace the old data with the new increment.

dlt the automatically updates the cursor value, and manages the state for the next load.

In [63]:
# One record of data looked like this:
# page_data[0]

{'id':'46578901234',
 'type':'WatchEvent',
 'actor':{'id':123456, 
          'login':'eskrav', 
          'display_login':'eskrav',
          'gravatar_id':'',
          'url':'https://api.github.com/users/eskrav',
          'avatar_url':'https://avatars.githubusercontent.com/u/123456?'},
 'repo':{'id':987654,
            'name':'DataTalksClub/data-engineering-zoomcamp',
            'url':'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'},
 'payload':{'action':'started'},
 'public':True,
 'created_at':'2024-06-15T12:34:56Z',   # It will be our point of reference for incremental loading: cursor_date = dlt.sources.incremental(...)
 'org':{'id':1357924,
         'login':'DataTalksClub',
         'gravatar_id':'',
         'url':'https://api.github.com/orgs/DataTalksClub',
         'avatar_url':'https://avatars.githubusercontent.com/u/1357924?'}

}

{'id': '46578901234',
 'type': 'WatchEvent',
 'actor': {'id': 123456,
  'login': 'eskrav',
  'display_login': 'eskrav',
  'gravatar_id': '',
  'url': 'https://api.github.com/users/eskrav',
  'avatar_url': 'https://avatars.githubusercontent.com/u/123456?'},
 'repo': {'id': 987654,
  'name': 'DataTalksClub/data-engineering-zoomcamp',
  'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'},
 'payload': {'action': 'started'},
 'public': True,
 'created_at': '2024-06-15T12:34:56Z',
 'org': {'id': 1357924,
  'login': 'DataTalksClub',
  'gravatar_id': '',
  'url': 'https://api.github.com/orgs/DataTalksClub',
  'avatar_url': 'https://avatars.githubusercontent.com/u/1357924?'}}

In [64]:
import os
import dlt

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator

os.environ["ACCESS_TOKEN"] = os.getenv('ACCESS_TOKEN')

@dlt.resource(name="events", 
              write_disposition="append", # <----- Set write_disposition here replace "append" for "replace" 
              columns={
                        "payload__forkee__language": {"data_type": "text"},
                        "payload__forkee__mirror_url": {"data_type": "text"},
                        "payload__forkee__license": {"data_type": "text"},
    }) 
def paginated_getter(
    access_token=dlt.secrets.value,
    cursor_date = dlt.sources.incremental(
        "created_at",       # 1 <----- field to track, our timestamp field
        initial_value="2025-02-25")     # 2 <----- Set a default for the first run
):
    client = RESTClient(
        base_url="https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp",
        auth=BearerTokenAuth(token=access_token),  # type: ignore
        paginator=HeaderLinkPaginator(links_next_key="next"),
    )

    print(cursor_date.last_value)  # to see the last value of our cursor_date

    for page in client.paginate("/events"):
        yield page

# define new dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="github_pipeline",
    destination="duckdb",
    dataset_name="events_dataset",
)

# Run the pipeline with the new resource
load_info = pipeline.run(paginated_getter) # <----- No need to specify write_disposition here ...from here to the resource decorator

print(pipeline.last_trace)

#print the event table
pipeline.dataset().events.df().head()

2025-02-25
Run started at 2026-01-02 15:46:38.418060+00:00 and COMPLETED in 8.27 seconds with 4 steps.
Step extract COMPLETED in 6.62 seconds.

Load package 1767368798.5697963 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.33 seconds.
Normalized data for the following tables:
- events: 298 row(s)
- _dlt_pipeline_state: 1 row(s)

Load package 1767368798.5697963 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 1.21 seconds.
Pipeline github_pipeline load step completed in 1.15 seconds
1 load package(s) were loaded to destination duckdb and into dataset events_dataset
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\github_pipeline.duckdb location to store data
Load package 1767368798.5697963 is LOADED and contains no failed jobs

Step run COMPLETED in 8.27 seconds.
Pipeline github_pipeline load step completed i

Unnamed: 0,id,type,actor__id,actor__login,actor__display_login,actor__gravatar_id,actor__url,actor__avatar_url,repo__id,repo__name,...,payload__pull_request__head__repo__url,payload__pull_request__head__repo__name,payload__pull_request__base__ref,payload__pull_request__base__sha,payload__pull_request__base__repo__id,payload__pull_request__base__repo__url,payload__pull_request__base__repo__name,payload__forkee__language,payload__forkee__mirror_url,payload__forkee__license
0,5579109808,WatchEvent,232700169,reneej6573,reneej6573,,https://api.github.com/users/reneej6573,https://avatars.githubusercontent.com/u/232700...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
1,5578936787,WatchEvent,230978005,eveliinavesala,eveliinavesala,,https://api.github.com/users/eveliinavesala,https://avatars.githubusercontent.com/u/230978...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
2,5578582817,WatchEvent,101908599,claudiacp1999,claudiacp1999,,https://api.github.com/users/claudiacp1999,https://avatars.githubusercontent.com/u/101908...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
3,5578500372,WatchEvent,143253669,kimodri,kimodri,,https://api.github.com/users/kimodri,https://avatars.githubusercontent.com/u/143253...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,
4,5578250424,WatchEvent,215745950,awoyemiemmanuel1900,awoyemiemmanuel1900,,https://api.github.com/users/awoyemiemmanuel1900,https://avatars.githubusercontent.com/u/215745...,419661684,DataTalksClub/data-engineering-zoomcamp,...,,,,,,,,,,


#### Loading data from SQL database to SQL database

#### 2.4 SQL to SQL copy specifics

Transferring data between SQL databases is a common task in data engineering. Since SQL to SQL transfer is a little different from API calls, let's look at the specifics.

dlt has a built in SQL database source implemented via SQLalchemy, making it compatible with most sql databases. Because it's built into dlt, you don't have to initialize it like other sources, just **import** it from dlt.sources.

**Example: Loading data from MySQL database into DuckDB**

The example below will show how you can use dlt to load data from a SQL Database (PostgreSQL, MySQL, SQLight, Oracle, IBM DB2, etc.) into destination.

To make it easy to reproduce, we will be loading data from the public MySQL RFam database into a local DuckDB instance.

In [65]:
# !pip install dlt pymysql
# !pip install pyarrow

In [66]:
import dlt
from dlt.sources.sql_database import sql_database

source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["family",],
    #backend="pyarrow"
)

pipeline = dlt.pipeline(
    pipeline_name="sql_database_example",
    destination="duckdb",
    dataset_name="sql_data",
    dev_mode=True
)

load_info = pipeline.run(source)
print(load_info)

Pipeline sql_database_example load step completed in 3.30 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data_20260102034652
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\sql_database_example.duckdb location to store data
Load package 1767368812.6184466 is LOADED and contains no failed jobs


#### 2.4.1 Skipping type inference via Arrow or ConnectorX

We previously discussed normalising data. This is possibly the more resource-intensive part of the loading operation - so if you don't have to do it, it would be much more efficient to skip it.

Luckily, we can leverage technologies like Arrow and ConnectorX, to transfer data in a strongly structured format (unlike JSON).

By reading out the structured data as an Arrow table for example, we preserve all the types and can convert them into the appropriate destination types without having to first scan the data.

This makes the loading operation much faster, because this way we no longer scan types value by value.

Let's look at some options that skip deserialisation: https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database/configuration#configuring-the-backend

To exemplify, let's look at this blog post: https://dlthub.com/blog/sql-benchmark-saas#speed-analysis

#### 2.4.2 Backfiling

Sometimes you have to load a very large table, like an event log, from one SQL db to another. The challenge with very large loads is that they take a very long time, and many things might go wrong to limit their duration.

For example, many runners have time limits, or database connections have timeouts and buffers have max capacity - so you cannot just select * from giant_table.

To deal with this problem, we recommend chunking your load into multiple smaller chunks that can be handled. For example, if you need to backfill 10 years of data, you might prefer to chunk your data into days or months, and load 3650 or 120 chunks for example.

There are multiple ways to do chunking on extraction - for example you can use start/end interval dates to be able to do it in parallel, or you could grab a number of rows and continue from where you left off serially.

In [67]:
import dlt
from dlt.sources.sql_database import sql_database

import pandas as pd

if __name__ == "__main__":
    # NOTEe: this is a live table in the rfam database, so the number of final rows may change
    TOTAL_TABLE_ROWS = 4178
    RFAM_CONNECTION_STRING = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"

    # Create sql database source that only loads the family table in chunks of 1000 rows
    source = sql_database(RFAM_CONNECTION_STRING, table_names=["family"], chunk_size=2000) # we change from 1000 to 2000

    # We apply some hints to the table, we know the rfam_id is unique and that we can order 
    # and load incrementally on the created datetime column
    source.family.apply_hints(
        primary_key="rfam_id",
        incremental=dlt.sources.incremental(
            cursor_path="created", initial_value=None, row_order="asc"
            ),
    )

    # with limit we can limit the number of chunks to load, with a chunk size of 1000 and a limit of 1
    # we will load 1000 rows per pipeline run
    source.add_limit(1)

    # create dlt pipeline
    pipeline = dlt.pipeline(
        pipeline_name="rfam",
        destination="duckdb",
        dataset_name="rfam_data_incremental",
    )

    info = pipeline.run(source)
    print(pipeline.last_trace)

    info = pipeline.run(source)
    print(pipeline.last_trace)

    info = pipeline.run(source)
    print(pipeline.last_trace)

    # NOTEE: in a production environment you will likely:
    # * be using much larger chunk sizes and limits
    # * run the pipeline in a loop to load all the rows
    # * and programatically check if the table is fully loaded and abort the loop if this is the case

Run started at 2026-01-02 15:47:21.634219+00:00 and COMPLETED in 4.98 seconds with 4 steps.
Step extract COMPLETED in 4.80 seconds.

Load package 1767368841.7729077 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.03 seconds.
No data found to normalize

Step load COMPLETED in 0.03 seconds.
Pipeline rfam load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\rfam.duckdb location to store data

Step run COMPLETED in 4.98 seconds.
Pipeline rfam load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\rfam.duckdb location to store data
Run started at 2026-01-02 15:47:26.622057+00:00 and COMPLETED in 2.30 seconds with 4 st

#### 2.4.3 SCD2 (SCD=slowly changing dimension)

Slowly changing dimension is way of materialising data for keeping track of changes.

Similar to how Merge write disposition identifies a record to update it SCD identifies a record and creates a new valid version, while invalidating the old version and preserving it for historisation.

`dlt` can create Slowly Changing Dimensions Type 2 (SCD2) destination tables for dimension tables that change in the source.

The resource is expected to provide a full extract of the source table each run.

A row hash is stored in _dlt_id and used as surrogate key to identify source records that have been inserted, updated, or deleted.

Before running the pipeline, let's just create a small toy dataset representing students at Hogwarts:

In [68]:
data = [
    {"name": "Vincent Crabbe", "designation": "student", "date_started": "1991-09-01T09:00:00Z"},
    {"Name": "Gregory Goyle", "designation": "student", "date_started": "1991-09-01T09:00:00Z"},
    {"name": "Draco Malfoy", "designation": "student", "date_started": "1991-09-01T09:00:00Z"},
]

data_updated = [
    {"name": "Vincent Crabbe", "designation": "student", "date_started": "1991-09-01T09:00:00Z"},
    {"Name": "Gregory Goyle", "designation": "student", "date_started": "1991-09-01T09:00:00Z"},
    {"name": "Draco Malfoy", "designation": "expelled", "date_started": "1991-09-01T09:00:00Z"},
]

Now, run the pipeline with merge disposition and SCD2 strategy:

In [69]:
import dlt

pipeline = dlt.pipeline(
    pipeline_name="hogwarts_pipeline",
    destination="duckdb",
    dataset_name="hogwarts",
)

load_info = pipeline.run(
            data, 
            table_name="creatures", 
            write_disposition={
                "disposition":"merge",  # <----- specifies that existing data should be merged rather than replaced
                "strategy":"scd2",  # <----- enables SCD2 tracking, which keeps historical records of changes
            }          
        )

print(load_info)

pipeline.dataset().creatures.df()

Pipeline hogwarts_pipeline load step completed in 0.43 seconds
1 load package(s) were loaded to destination duckdb and into dataset hogwarts
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\hogwarts_pipeline.duckdb location to store data
Load package 1767368851.784491 is LOADED and contains no failed jobs


Unnamed: 0,_dlt_valid_from,_dlt_valid_to,name,designation,date_started,_dlt_load_id,_dlt_id
0,2026-01-02 15:47:31.784491+00:00,NaT,Vincent Crabbe,student,1991-09-01 09:00:00+00:00,1767368851.784491,P94fUt8jUkzx6A
1,2026-01-02 15:47:31.784491+00:00,NaT,Gregory Goyle,student,1991-09-01 09:00:00+00:00,1767368851.784491,dogfjv05onJLRQ
2,2026-01-02 15:47:31.784491+00:00,NaT,Draco Malfoy,student,1991-09-01 09:00:00+00:00,1767368851.784491,r7u0vNojh+gvEQ


New columns were created:

* `_dlt_valid_from` - The timestamp when this record was first inserted into the table.
   * All records have the same value, which is when the pipeline first processed them.
* `_dlt_valid_to` - The timestamp when this record was considered outdated.
   * NaT (Not a Time) means tha these records are currently active and have not been superseded by newer versions.


Now run the pipeline again with the updated data set

In [70]:
load_info = pipeline.run(
    data_updated, 
    table_name="creatures",
    write_disposition={
        "disposition":"merge",  # <----- specifies that existing data should be merged rather than replaced
        "strategy":"scd2",  # <----- enables SCD2 tracking, which keeps historical records of changes
    }
)

print(load_info)

pipeline.dataset().creatures.df()

Pipeline hogwarts_pipeline load step completed in 0.41 seconds
1 load package(s) were loaded to destination duckdb and into dataset hogwarts
The duckdb destination used duckdb:///d:\cg_inter_stuff\Data_engineer\PipeLiningEverythingProjetcs\data_loading\hogwarts_pipeline.duckdb location to store data
Load package 1767368852.6806989 is LOADED and contains no failed jobs


Unnamed: 0,_dlt_valid_from,_dlt_valid_to,name,designation,date_started,_dlt_load_id,_dlt_id
0,2026-01-02 15:47:31.784491+00:00,NaT,Vincent Crabbe,student,1991-09-01 09:00:00+00:00,1767368851.784491,P94fUt8jUkzx6A
1,2026-01-02 15:47:31.784491+00:00,NaT,Gregory Goyle,student,1991-09-01 09:00:00+00:00,1767368851.784491,dogfjv05onJLRQ
2,2026-01-02 15:47:31.784491+00:00,2026-01-02 15:47:32.680699+00:00,Draco Malfoy,student,1991-09-01 09:00:00+00:00,1767368851.784491,r7u0vNojh+gvEQ
3,2026-01-02 15:47:32.680699+00:00,NaT,Draco Malfoy,expelled,1991-09-01 09:00:00+00:00,1767368852.6806989,KCWs+o6jOW/aLQ


SCD2 created a new row for Draco Malfoy with updated designation while keeping the historical record.

1. **Vincent Crabbe & Gregory Goyle (Rows 0 & 1)**
   * Their records remain unchanged because their data did **not** change in the second run.
   * `_dlt_valid_to = NaT`, meaning their records are still active.
   * `_dlt_load_id` remains the same (1741189731.113757), indicating they were loaded in the first run.

2. **Draco Malfoy - Previous Recorc (Row 2)**
   * `_dlt_valid_from` --> This was his original record when he was a **student.**
   * `_dlt_valid_to` --> This timestamp marks when this record was **closed** (i.e. it became outdated due to the update).
   * `_dlt_id` --> This is the original hash, uniquely identifying the record.
   * **This row is now a historical record, no longer active.**

3. **Draco Malfoy - New Record (Row 3)**
   * `_dlt_valid_from` --> Marks when this new version of the record was inserted.
   * `_dlt_valid_to = NaT` --> This means it is the **current active record.**
   * `designation = ""expelled` --> The change is now reflected.
   * `_dlt_load_id` --> This is from the second pipeline run, tracking when this new row was inserted.
   * `_dlt_id` --> A new hash is generated because the record has changed.