# **Understanding Pipeline Metadata and State**

## Pipeline Metadata 
- Data about your data pipeline
    - When your pipeline first ran
    - When your pipeline last ran
    - Information about your source or destination
    - Processing time
    - Or information that you yourself may want to add to the metadata

- You can check the meta data through
    - Load info
    - trace
    - state

In [4]:
import dlt
from dlt.sources.helpers import requests
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
import os

os.environ['SOURCES__SECRET_KEY'] = os.getenv('GITHUB_TOKEN')

@dlt.source
def github_source(secret_key=dlt.secrets.value):
    client = RESTClient(
            base_url="https://api.github.com",
            auth=BearerTokenAuth(token=secret_key),
            paginator=HeaderLinkPaginator(),
    )

    @dlt.resource
    def github_pulls(cursor_date=dlt.sources.incremental("updated_at", initial_value="2025-20-01")):
        params = {
            "since": cursor_date.last_value,
            "status": "open"
        }
        for page in client.paginate("repos/dlt-hub/dlt/pulls", params=params):
            yield page


    return github_pulls


# define new dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="lesson_8",
    destination="duckdb",
    dataset_name="github_data",
)


# run the pipeline with the new resource
load_info = pipeline.run(github_source())
print(load_info)

Pipeline lesson_8 load step completed in 0.34 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data
The duckdb destination used duckdb:///c:\Users\HP\OneDrive\Desktop\Data Engg\dlt\Lesson Notebooks\lesson_8.duckdb location to store data
Load package 1740566505.9691741 is LOADED and contains no failed jobs


## **Load Info**

`Load info`: collection of useful info for the recently loaded data - details like the pipeline and dataset name, destination information, and a list of loaded packages with their statuses, file sizes, types, and error messages (if any).

`Load Package` : collection of jobs with specific data specific tables, generated during each execution of the pipeline. Each package is uniquely identified by a `load_id`.

> The `load_id` of a particular package is added to the top data tables (parent tables) and to the special `_dlt_loads` table with a status of 0 when the load process is fully completed. The `_dlt_loads` table tracks complete loads and allows chaining transformations on top of them. We can also see load package info with a specific load id:

`!dlt pipeline lesson_8 load-package 1734535481.3936043`

In [5]:
print(load_info.load_packages[0])

The package with load id 1740566505.9691741 for schema github_source is in LOADED state. It updated schema for 3 tables. The package was LOADED at 2025-02-26 10:41:47.804483+00:00.
Jobs details:
Job: _dlt_pipeline_state.e9f9555659.insert_values, table: _dlt_pipeline_state in completed_jobs. File type: insert_values, size: 586B. Started on: 2025-02-26 10:41:47.418064+00:00 and completed in 0.39 seconds.


In [6]:
# This code snippet just prints out the public methods and attributes of the schema object in load info
all_attributes_methods = dir(load_info.load_packages[0])
public_attributes_methods = [attr for attr in all_attributes_methods if not attr.startswith('_')]

print(f"{'Attribute/Method':<50} {'Type':<10}")
print("-" * 40)
for attr in public_attributes_methods:
    attr_value = getattr(load_info.load_packages[0], attr)
    if callable(attr_value):
        print(f"{attr:<50} {'method':<10}")
    else:
        print(f"{attr:<50} {'attribute':<10}")

Attribute/Method                                   Type      
----------------------------------------
asdict                                             method    
asstr                                              method    
completed_at                                       attribute 
count                                              method    
index                                              method    
jobs                                               attribute 
load_id                                            attribute 
package_path                                       attribute 
schema                                             attribute 
schema_hash                                        attribute 
schema_name                                        attribute 
schema_update                                      attribute 
state                                              attribute 


## **Trace**

- A trace is a detailed record of the execution of a pipeline. 
- It provides rich information on the pipeline processing steps: extract, normalize, and load. 
- It also shows the last load_info.

### Accessing Trac

**1. CLI**

`!dlt pipeline <pipeline_name> trace`


**2. Python**

```python
print(pipeline.last_trace)

print(pipeline.last_trace.last_extract_info) # prints trace for extract stage

print(pipeline.last_trace.last_normalize_info) # prints trace for normalize stage
# access row counts dictionary of normalize info
print(pipeline.last_trace.last_normalize_info.row_counts)

print(pipeline.last_trace.last_load_info) # prints trace for load stage
```

## **State**

- It's a Python dictionary that lives alongside the data
- It can store values during the pipeline run and then retreive it for the next run
- It's used for tasks like preserving the "last value" or similar loading checkpoints, and it gets committed atomically with the data
- The state is stored locally in the pipeline working directory and is also stored at the destination for future runs
- The pipeline state is stored locally in the pipeline working directory and, as a consequence, it cannot be shared with pipelines with different names.

**When to use pipeline state**
- dlt uses the state internally to implement last value incremental loading. This use case should cover around 90% of your needs to use the pipeline state.
- Store a list of already requested entities if the list is not much bigger than 100k elements.
- Store large dictionaries of last values if you are not able to implement it with the standard incremental construct.
- Store the custom fields dictionaries, dynamic configurations and other source-scoped state.

**When not to use pipeline state**

Do not use dlt state when it may grow to millions of elements. Do you plan to store modification timestamps of all of your millions of user records? This is probably a bad idea! In that case you could:

- Store the state in dynamo-db, redis etc. taking into the account that if the extract stage fails you'll end with invalid state.
- Use your loaded data as the state. dlt exposes the current pipeline via dlt.current.pipeline() from which you can obtain sqlclient and load the data of interest. In that case try at least to process your user records in batches.

### Accessing State

**1. CLI**

`!dlt pipeline -v <pipeline_name> info`

**2. Python**

```python
# helper function to read state
import json

def read_state(filepath):
    with open(filepath, 'r') as file:
        data = json.load(file)
        pretty_json = json.dumps(data, indent=4)
        return pretty_json


# stored in your default pipelines folder
print(read_state("/var/dlt/pipelines/<pipeline_name>/state.json"))
```

In [10]:
import dlt
from dlt.sources.helpers import requests
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator

import os

os.environ["SOURCES__SECRET_KEY"] = os.getenv("GITHUB_TOKEN")


@dlt.source
def github_source(secret_key=dlt.secrets.value):
    client = RESTClient(
            base_url="https://api.github.com",
            auth=BearerTokenAuth(token=secret_key),
            paginator=HeaderLinkPaginator(),
    )

    @dlt.resource
    def github_pulls(cursor_date=dlt.sources.incremental("updated_at", initial_value="2024-12-01")):

        # Let's set some custom state information
        dlt.current.resource_state().setdefault("new_key", ["first_value", "second_value"]) # <--- new item in the state

        params = {
            "since": cursor_date.last_value,
            "status": "open"
        }
        for page in client.paginate("repos/dlt-hub/dlt/pulls", params=params):
            yield page


    return github_pulls


# define new dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="lesson_8_state_pipeline",
    destination="duckdb",
    dataset_name="github_data",
)


# run the pipeline with the new resource
load_info = pipeline.run(github_source())
print(load_info)

Pipeline lesson_8_state_pipeline load step completed in 0.94 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data
The duckdb destination used duckdb:///c:\Users\HP\OneDrive\Desktop\Data Engg\dlt\Lesson Notebooks\lesson_8_state_pipeline.duckdb location to store data
Load package 1740569632.9830878 is LOADED and contains no failed jobs


### Source scoped states

You can access the source-scoped state with `dlt.current.source_state()` which can be shared across resources of a particular source and is also available **read-only** in the source-decorated functions. The most common use case for the source-scoped state is to store mapping of custom fields to their displayable names.


Let's read some custom keys from the state:
```python
# Let's read some custom state information
source_new_keys = dlt.current.source_state().get("resources", {}).get("github_pulls", {}).get("new_key")
```
Full example: