In [None]:
%%capture
!pip install dlt[duckdb]

# Pipeline Metadata

Metadata is basically data about data.

Pipeline Metadata is data about your data pipeline. This can be useful if you want to know things like:

- When your pipeline first ran
- When your pipeline last ran
- Information about your source or destination
- Processing time
- Or information that you yourself may want to add to the metadata
- And much more!



## `dlt` allows you to be able to view all this metadata through various options!

This notebook will walk you through those options. Namely:

- Load info
- Trace
- State

# Crafting a pipeline to view its metadata

In [None]:
import dlt
from dlt.sources.helpers import requests

BASE_URL = "https://api.github.com/repos/dlt-hub/dlt/issues"

def pagination(url):
    while True:
        response = requests.get(url)
        response.raise_for_status()
        yield response.json()

        # Get next page
        if "next" not in response.links:
            break
        url = response.links["next"]["url"]


@dlt.resource(
    table_name="issues",
    write_disposition="merge",
    primary_key="id",
)
def get_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    url = (
        f"{BASE_URL}?since={updated_at.last_value}&per_page=100&sort=updated"
        "&directions=desc&state=open"
    )
    yield pagination(url)



pipeline = dlt.pipeline(
    pipeline_name="github_issues_merge",
    destination="duckdb",
    dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
print(load_info)

Pipeline github_issues_merge load step completed in 1.64 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data_merge
The duckdb destination used duckdb:////content/github_issues_merge.duckdb location to store data
Load package 1723206930.5175498 is LOADED and contains no failed jobs


## Look into the data

In [None]:
import duckdb
from google.colab import data_table
data_table.enable_dataframe_formatter()

# a database 'chess_pipeline.duckdb' was created in working directory so just connect to it
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
stats_table = conn.sql("SELECT * FROM issues LIMIT 5").df()
display(stats_table)



Unnamed: 0,url,repository_url,labels_url,comments_url,events_url,html_url,id,node_id,number,title,...,performed_via_github_app__permissions__actions,performed_via_github_app__permissions__checks,performed_via_github_app__permissions__contents,performed_via_github_app__permissions__deployments,performed_via_github_app__permissions__discussions,performed_via_github_app__permissions__issues,performed_via_github_app__permissions__metadata,performed_via_github_app__permissions__pull_requests,performed_via_github_app__permissions__repository_projects,performed_via_github_app__permissions__statuses
0,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://github.com/dlt-hub/dlt/issues/1530,2383150738,I_kwDOGvRYu86OC_6S,1530,"Additional paginator for offset without a ""tot...",...,,,,,,,,,,
1,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://github.com/dlt-hub/dlt/pull/1677,2457631625,PR_kwDOGvRYu8537vNZ,1677,RESTClient: stops pagination after empty page ...,...,,,,,,,,,,
2,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://github.com/dlt-hub/dlt/pull/1674,2455801852,PR_kwDOGvRYu8531niS,1674,adds full ci for motherduck and updates docs,...,,,,,,,,,,
3,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://github.com/dlt-hub/dlt/pull/1576,2400541203,PR_kwDOGvRYu8509MPc,1576,Fix/1571 Incremental: Optionally load or ignor...,...,,,,,,,,,,
4,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://api.github.com/repos/dlt-hub/dlt/issue...,https://github.com/dlt-hub/dlt/issues/1637,2428525113,I_kwDOGvRYu86QwFo5,1637,Update RESTClient range paginators to support ...,...,,,,,,,,,,


# Introducing Load Info

In the package information you can also see the list of all tables and columns created at the destination during loading of that package. The code below displays all tables and schemas.



## Inspect Load Info

### CLI

To see the most recent load package info:

In [None]:
!dlt pipeline github_issues_merge_trace load-package

Found pipeline [1mgithub_issues_merge_trace[0m in [1m/var/dlt/pipelines[0m
Package [1m1722343054.148941[0m found in [1m/var/dlt/pipelines/github_issues_merge_trace/load/loaded/1722343054.148941[0m
The package with load id 1722343054.148941 for schema github_issues_merge_trace is in LOADED state. It updated schema for 7 tables. The package was LOADED at 2024-07-30 12:37:38.147890+00:00.
Jobs details:
Job: issues__assignees.ea84ca5dd6.insert_values, table: issues__assignees in completed_jobs. File type: insert_values, size: 4.0K. Started on: 2024-07-30 12:37:35.785707+00:00 and completed in 2.36 seconds.
Job: issues.9a51a5dcc7.sql, table: issues in completed_jobs. File type: sql, size: 9.2K. Started on: 2024-07-30 12:37:37.309825+00:00 and completed in 0.84 seconds.
Job: issues__labels.3fc08a7c7c.insert_values, table: issues__labels in completed_jobs. File type: insert_values, size: 3.8K. Started on: 2024-07-30 12:37:35.776706+00:00 and completed in 2.37 seconds.
Job: _dlt_pipeli

The `load_id` of a particular package is added to the top data tables and to the special `_dlt_loads` table with a status of 0 when the load process is fully completed. The `_dlt_loads` table tracks complete loads and allows chaining transformations on top of them.

To see package info with a given load id:

In [None]:
!dlt pipeline github_issues_merge_trace load-package 1722343054.148941

Found pipeline [1mgithub_issues_merge_trace[0m in [1m/var/dlt/pipelines[0m
Package [1m1722343054.148941[0m found in [1m/var/dlt/pipelines/github_issues_merge_trace/load/loaded/1722343054.148941[0m
The package with load id 1722343054.148941 for schema github_issues_merge_trace is in LOADED state. It updated schema for 7 tables. The package was LOADED at 2024-07-30 12:37:38.147890+00:00.
Jobs details:
Job: issues__assignees.ea84ca5dd6.insert_values, table: issues__assignees in completed_jobs. File type: insert_values, size: 4.0K. Started on: 2024-07-30 12:37:35.785707+00:00 and completed in 2.36 seconds.
Job: issues.9a51a5dcc7.sql, table: issues in completed_jobs. File type: sql, size: 9.2K. Started on: 2024-07-30 12:37:37.309825+00:00 and completed in 0.84 seconds.
Job: issues__labels.3fc08a7c7c.insert_values, table: issues__labels in completed_jobs. File type: insert_values, size: 3.8K. Started on: 2024-07-30 12:37:35.776706+00:00 and completed in 2.37 seconds.
Job: _dlt_pipeli

# Introducing dlt State

The pipeline state is a Python dictionary which lives alongside your data; you can store values in it and, on next pipeline run, request them back.

[Documentation](https://dlthub.com/docs/general-usage/state)


## When to use pipeline state
- dlt uses the state internally to implement last value incremental loading. This use case should cover around 90% of your needs to use the pipeline state.
- Store a list of already requested entities if the list is not much bigger than 100k elements.
- Store large dictionaries of last values if you are not able to implement it with the standard incremental construct.
- Store the custom fields dictionaries, dynamic configurations and other source-scoped state.

## When not to use pipeline state
Do not use dlt state when it may grow to millions of elements. Do you plan to store modification timestamps of all of your millions of user records? This is probably a bad idea! In that case you could:

- Store the state in dynamo-db, redis etc. taking into the account that if the extract stage fails you'll end with invalid state.
- Use your loaded data as the state. dlt exposes the current pipeline via dlt.current.pipeline() from which you can obtain sqlclient and load the data of interest. In that case try at least to process your user records in batches.

## Inspect state

### Method 1: Through CLI

In [None]:
!dlt pipeline -v github_issues_merge info

Attaching to pipeline [1mgithub_issues_merge[0m
Found pipeline [1mgithub_issues_merge[0m in [1m/var/dlt/pipelines[0m
Synchronized state:
[32m_state_version[0m: 1
[32m_state_engine_version[0m: 4
[32mpipeline_name[0m: github_issues_merge
[32mdataset_name[0m: github_data_merge
[32mdefault_schema_name[0m: github_issues_merge
[32mschema_names[0m: ['github_issues_merge']
[32mdestination_type[0m: dlt.destinations.duckdb
[32mdestination_name[0m: duckdb
[32m_version_hash[0m: 9zLE+fA8W+p7TO9tXuwvTS1rb3aPturqGfHGvOsZVGs=

[32msources:[0m
{
  "github_issues_merge": {
    "resources": {
      "get_issues": {
        "incremental": {
          "updated_at": {
            "initial_value": "1970-01-01T00:00:00Z",
            "last_value": "2024-08-09T12:27:54Z",
            "unique_hashes": [
              "Aj6+vMs5fyGEOqGyW1cY"
            ]
          }
        }
      }
    }
  }
}

Local state:
[32mfirst_run[0m: False
[32m_last_extracted_at[0m: 2024-08-09 12:35:31.2111

### Method 2: Through Python

In [None]:
import json

def read_state(filepath):
    with open(filepath, 'r') as file:
        data = json.load(file)
        pretty_json = json.dumps(data, indent=4)
        return pretty_json

In [None]:
# stored in your default pipelines folder
print(read_state("/var/dlt/pipelines/github_issues_merge/state.json"))

{
    "_state_version": 1,
    "_state_engine_version": 4,
    "_local": {
        "first_run": false,
        "_last_extracted_at": "\uf0272024-08-09T12:35:31.211177+00:00",
        "_last_extracted_hash": "9zLE+fA8W+p7TO9tXuwvTS1rb3aPturqGfHGvOsZVGs="
    },
    "pipeline_name": "github_issues_merge",
    "dataset_name": "github_data_merge",
    "default_schema_name": "github_issues_merge",
    "schema_names": [
        "github_issues_merge"
    ],
    "destination_type": "dlt.destinations.duckdb",
    "destination_name": "duckdb",
    "_version_hash": "9zLE+fA8W+p7TO9tXuwvTS1rb3aPturqGfHGvOsZVGs=",
    "sources": {
        "github_issues_merge": {
            "resources": {
                "get_issues": {
                    "incremental": {
                        "updated_at": {
                            "initial_value": "1970-01-01T00:00:00Z",
                            "last_value": "2024-08-09T12:27:54Z",
                            "unique_hashes": [
                       

## Modify state

### Resource state

In [None]:
@dlt.resource(
    table_name="issues",
    write_disposition="merge",
    primary_key="id",
)
def get_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    url = (
        f"{BASE_URL}?since={updated_at.last_value}&per_page=100&sort=updated"
        "&directions=desc&state=open"
    )
    # get a value from the state
    last_value = dlt.current.resource_state().get("incremental", {}).get("updated_at", {}).get("last_value")
    # Let's set some custom state information
    dlt.current.resource_state().setdefault("new_key", ["first_value", "second_value"])

    yield pagination(url)


load_info = pipeline.run(get_issues)

In [None]:
print(read_state("/var/dlt/pipelines/github_issues_merge/state.json"))

{
    "_state_version": 2,
    "_state_engine_version": 4,
    "_local": {
        "first_run": false,
        "_last_extracted_at": "\uf0272024-08-09T12:44:09.515788+00:00",
        "_last_extracted_hash": "RZy6JLHlAe7bQFvOFEtb/AN/HUDeqvK9kPrfFYsMUVY="
    },
    "pipeline_name": "github_issues_merge",
    "dataset_name": "github_data_merge",
    "default_schema_name": "github_issues_merge",
    "schema_names": [
        "github_issues_merge"
    ],
    "destination_type": "dlt.destinations.duckdb",
    "destination_name": "duckdb",
    "_version_hash": "RZy6JLHlAe7bQFvOFEtb/AN/HUDeqvK9kPrfFYsMUVY=",
    "sources": {
        "github_issues_merge": {
            "resources": {
                "get_issues": {
                    "incremental": {
                        "updated_at": {
                            "initial_value": "1970-01-01T00:00:00Z",
                            "last_value": "2024-08-09T12:27:54Z",
                            "unique_hashes": [
                       

In [None]:
@dlt.resource(
    table_name="issues",
    write_disposition="merge",
    primary_key="id",
)
def get_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    url = (
        f"{BASE_URL}?since={updated_at.last_value}&per_page=100&sort=updated"
        "&directions=desc&state=open"
    )
    # Let's set some custom state information
    new_keys = dlt.current.resource_state().setdefault("new_key", ["first_value", "second_value"])
    if "something_happend":
        new_keys.append("third_value")

    incremental_dict = dlt.current.resource_state().get("incremental")
    incremental_dict.update({"second_new_key": "forth_value"})

    yield pagination(url)


load_info = pipeline.run(get_issues)

In [None]:
print(read_state("/var/dlt/pipelines/github_issues_merge/state.json"))

{
    "_state_version": 3,
    "_state_engine_version": 4,
    "_local": {
        "first_run": false,
        "_last_extracted_at": "\uf0272024-08-09T12:44:32.905886+00:00",
        "_last_extracted_hash": "qANGdwkvFkah9tfXIm75yJ+h3XeV7vBtAJAoYExaKMg="
    },
    "pipeline_name": "github_issues_merge",
    "dataset_name": "github_data_merge",
    "default_schema_name": "github_issues_merge",
    "schema_names": [
        "github_issues_merge"
    ],
    "destination_type": "dlt.destinations.duckdb",
    "destination_name": "duckdb",
    "_version_hash": "qANGdwkvFkah9tfXIm75yJ+h3XeV7vBtAJAoYExaKMg=",
    "sources": {
        "github_issues_merge": {
            "resources": {
                "get_issues": {
                    "incremental": {
                        "updated_at": {
                            "initial_value": "1970-01-01T00:00:00Z",
                            "last_value": "2024-08-09T12:27:54Z",
                            "unique_hashes": [
                       

### Source state

You can also access the source-scoped state with `dlt.current.source_state()` which can be shared across resources of a particular source and is also available read-only in the source-decorated functions. The most common use case for the source-scoped state is to store mapping of custom fields to their displayable names. You can take a look at our pipedrive source for an example of state passed across resources.

In [None]:
@dlt.resource(
    table_name="issues",
    write_disposition="merge",
    primary_key="id",
)
def get_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    url = (
        f"{BASE_URL}?since={updated_at.last_value}&per_page=100&sort=updated"
        "&directions=desc&state=open"
    )
    # Let's set some custom state information
    source_new_keys = dlt.current.source_state().setdefault("source_new_key", "fifth_value")

    yield pagination(url)


load_info = pipeline.run(get_issues)

In [None]:
print(read_state("/var/dlt/pipelines/github_issues_merge/state.json"))

{
    "_state_version": 4,
    "_state_engine_version": 4,
    "_local": {
        "first_run": false,
        "_last_extracted_at": "\uf0272024-08-09T12:45:43.078262+00:00",
        "_last_extracted_hash": "o6cvM8PJKtfOourJeM4oTIfDCaKChS1rCkDgwdMNwbI="
    },
    "pipeline_name": "github_issues_merge",
    "dataset_name": "github_data_merge",
    "default_schema_name": "github_issues_merge",
    "schema_names": [
        "github_issues_merge"
    ],
    "destination_type": "dlt.destinations.duckdb",
    "destination_name": "duckdb",
    "_version_hash": "o6cvM8PJKtfOourJeM4oTIfDCaKChS1rCkDgwdMNwbI=",
    "sources": {
        "github_issues_merge": {
            "resources": {
                "get_issues": {
                    "incremental": {
                        "updated_at": {
                            "initial_value": "1970-01-01T00:00:00Z",
                            "last_value": "2024-08-09T12:27:54Z",
                            "unique_hashes": [
                       

## Syncing state with destination

What if you run your pipeline on, for example, Airflow where every task gets a clean filesystem and pipeline working directory is always deleted?

dlt loads your state into the destination together with all other data and when faced with a clean start, it will try to restore state from the destination.

The remote state is identified by pipeline name, the destination location (as given by the credentials) and destination dataset. To re-use the same state, use the same pipeline name and destination.

The state is stored in the `_dlt_pipeline_state` table at the destination and contains information about the pipeline, pipeline run (that the state belongs to) and state blob.

dlt has `dlt pipeline <pipeline name> sync` command where you can request the state back from that table.

💡 If you can keep the pipeline working directory across the runs, you can disable the state sync by setting `restore_from_destination=false` i.e. in your `config.toml`.

In [None]:
import duckdb
from google.colab import data_table
data_table.enable_dataframe_formatter()

# a database 'chess_pipeline.duckdb' was created in working directory so just connect to it
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
stats_table = conn.sql("SELECT * FROM _dlt_pipeline_state").df()
display(stats_table)

Unnamed: 0,version,engine_version,pipeline_name,state,created_at,version_hash,_dlt_load_id,_dlt_id
0,1,4,github_issues_merge,eNp9kUFPwkAQhf/LXi24bYFqEy/EgxzUQAhGDNkM7UBX2q...,2024-07-30 09:01:42.700012+00:00,3ryk81R3QO6V52DWm/TQdyV2iap2BRmevzYcHInIq7o=,1722330101.7498035,7FG7K+UjKN5kOg
1,2,4,github_issues_merge,eNp9kW9PwjAQxr9LXw/sBmSwxDcmRowEFwQFjGnKerCGrs...,2024-07-30 09:16:04.439816+00:00,pOpUXPxEfLnv50RHdGc21k17npPt2shXN8KPLYyYRZ4=,1722330964.0833151,ZBw2yBMiBTg3vA
2,3,4,github_issues_merge,eNp9kVFPwjAUhf9LXx3QweJkiQ/qgySaYHCozJCmrHdbQ1...,2024-07-30 09:21:51.795841+00:00,X4TiZf3emGBHZRxs/UekZKYURnLh96f7Ta+0Y2SOAvw=,1722331311.4542563,anfzTXtTUiuIKw
3,4,4,github_issues_merge,eNp9kV9rwjAUxb9LXq0urTKxsJfJYOxh1eE2dUhIm2sTbG...,2024-07-30 09:29:57.942690+00:00,C2XkOy+erOyhEQTPQ+XRJZN1hbq2mUY2YOssB9i6y1c=,1722331797.610587,IuU3rynqij5gIg
4,5,4,github_issues_merge,eNqdkVtPwkAQhf/LvlpwuYW0iQ+KD2gURIkGDNks7dCubZ...,2024-07-30 09:34:07.673203+00:00,rDHQOpvJvuYCBQzkAjRxkLYHlreMSd7+fYRiqz9NNIk=,1722332047.2697608,vOcAIREiySHGRQ


Column "state" is compressed json dictionary.

In [None]:
!dlt --non-interactive pipeline github_issues_merge sync

usage: dlt pipeline [-h] [--list-pipelines] [--hot-reload] [--pipelines-dir PIPELINES_DIR]
                    [--verbose]
                    [pipeline_name]
                    {info,show,failed-jobs,drop-pending-packages,sync,trace,schema,drop,load-package}
                    ...


In [None]:
print(read_state("/var/dlt/pipelines/github_issues_merge/state.json"))

{
    "_state_version": 4,
    "_state_engine_version": 4,
    "_local": {
        "first_run": false,
        "_last_extracted_at": "\uf0272024-07-30T09:34:07.885072+00:00",
        "_last_extracted_hash": "rDHQOpvJvuYCBQzkAjRxkLYHlreMSd7+fYRiqz9NNIk="
    },
    "dataset_name": "github_data_merge",
    "schema_names": [
        "github_issues_merge"
    ],
    "default_schema_name": "github_issues_merge",
    "pipeline_name": "github_issues_merge",
    "destination_type": "dlt.destinations.duckdb",
    "destination_name": "duckdb",
    "_version_hash": "rDHQOpvJvuYCBQzkAjRxkLYHlreMSd7+fYRiqz9NNIk=",
    "sources": {
        "github_issues_merge": {
            "resources": {
                "get_issues": {
                    "incremental": {
                        "updated_at": {
                            "initial_value": "1970-01-01T00:00:00Z",
                            "last_value": "2024-07-30T09:32:59Z",
                            "unique_hashes": [
                       

## Reset the pipeline state: full or partial
**To fully reset the state:**

Drop the destination dataset to fully reset the pipeline.
Set the `dev_mode` flag when creating pipeline.
Use the `dlt pipeline drop --drop-all` command to drop state and tables for a given schema name.

**To partially reset the state:**

Use the `dlt pipeline drop <resource_name>` command to drop state and tables for a given resource.
Use the `dlt pipeline drop --state-paths` command to reset the state at given path without touching the tables and data.

# Trace

In [None]:
pipeline = dlt.pipeline(
    pipeline_name="github_issues_merge_trace",
    destination="duckdb",
    dataset_name="github_data_merge_trace",
)
load_info = pipeline.run(get_issues)
print(load_info)

{'fifth_value': 'sixth_value'}
Pipeline github_issues_merge_trace load step completed in 1.98 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data_merge_trace
The duckdb destination used duckdb:////content/github_issues_merge_trace.duckdb location to store data
Load package 1722343054.148941 is LOADED and contains no failed jobs


## Inspect trace

### CLI

dlt stores the trace of the most recent data load. The trace contains information on the pipeline processing steps: extract, normalize, and load. It also shows the last load_info. You can access this information using the command `dlt pipeline <pipeline_name> trace`.

In [None]:
!dlt pipeline github_issues_merge_trace trace

Found pipeline [1mgithub_issues_merge_trace[0m in [1m/var/dlt/pipelines[0m
Run started at 2024-07-30 12:37:34.049247+00:00 and COMPLETED in 4.17 seconds with 4 steps.
Step extract COMPLETED in 1.41 seconds.

Load package 1722343054.148941 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.58 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- issues: 169 row(s)
- issues__labels: 112 row(s)
- issues__assignees: 75 row(s)
- issues__performed_via_github_app__events: 25 row(s)

Load package 1722343054.148941 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 2.08 seconds.
Pipeline github_issues_merge_trace load step completed in 1.98 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data_merge_trace
The duckdb destination used duckdb:////content/github_issues_merge_trace.duckdb location to store data
Load pack

### Python

In [None]:
# print human friendly trace information
print(pipeline.last_trace)

Run started at 2024-07-30 12:37:34.049247+00:00 and COMPLETED in 4.17 seconds with 4 steps.
Step extract COMPLETED in 1.41 seconds.

Load package 1722343054.148941 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.58 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- issues: 169 row(s)
- issues__labels: 112 row(s)
- issues__assignees: 75 row(s)
- issues__performed_via_github_app__events: 25 row(s)

Load package 1722343054.148941 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 2.08 seconds.
Pipeline github_issues_merge_trace load step completed in 1.98 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data_merge_trace
The duckdb destination used duckdb:////content/github_issues_merge_trace.duckdb location to store data
Load package 1722343054.148941 is LOADED and contains no failed jobs

Step run COMPLETED

In [None]:
# print human friendly extract information
print(pipeline.last_trace.last_extract_info)


Load package 1722343054.148941 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs


In [None]:
# print human friendly normalization information
print(pipeline.last_trace.last_normalize_info)

Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- issues: 169 row(s)
- issues__labels: 112 row(s)
- issues__assignees: 75 row(s)
- issues__performed_via_github_app__events: 25 row(s)

Load package 1722343054.148941 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs


In [None]:
 # access row counts dictionary of normalize info
print(pipeline.last_trace.last_normalize_info.row_counts)

{'_dlt_pipeline_state': 1, 'issues': 169, 'issues__labels': 112, 'issues__assignees': 75, 'issues__performed_via_github_app__events': 25}


In [None]:
# print human friendly load information
print(pipeline.last_trace.last_load_info)

Pipeline github_issues_merge_trace load step completed in 1.98 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data_merge_trace
The duckdb destination used duckdb:////content/github_issues_merge_trace.duckdb location to store data
Load package 1722343054.148941 is LOADED and contains no failed jobs


### Python

You can access all this information in your pipeline script, save `load_info` and trace to the destination etc.

For each load, you can test and alert on anomalies (e.g., no data, too much loaded to a table).

You can save complete lineage info for a particular `load_id` including a list of loaded files, error messages (if any), elapsed times, schema changes. This can be helpful, for example, when troubleshooting problems.

In [None]:
print(load_info.load_packages[0])

The package with load id 1722343054.148941 for schema github_issues_merge_trace is in LOADED state. It updated schema for 7 tables. The package was LOADED at 2024-07-30 12:37:38.147890+00:00.
Jobs details:
Job: issues__assignees.ea84ca5dd6.insert_values, table: issues__assignees in completed_jobs. File type: insert_values, size: 4.0K. Started on: 2024-07-30 12:37:35.785707+00:00 and completed in 2.36 seconds.
Job: issues.9a51a5dcc7.sql, table: issues in completed_jobs. File type: sql, size: 9.2K. Started on: 2024-07-30 12:37:37.309825+00:00 and completed in 0.84 seconds.
Job: issues__labels.3fc08a7c7c.insert_values, table: issues__labels in completed_jobs. File type: insert_values, size: 3.8K. Started on: 2024-07-30 12:37:35.776706+00:00 and completed in 2.37 seconds.
Job: _dlt_pipeline_state.3273b65a40.insert_values, table: _dlt_pipeline_state in completed_jobs. File type: insert_values, size: 680B. Started on: 2024-07-30 12:37:35.700700+00:00 and completed in 2.45 seconds.
Job: issue

In [None]:
all_attributes_methods = dir(load_info.load_packages[0])
public_attributes_methods = [attr for attr in all_attributes_methods if not attr.startswith('_')]

print("Public attributes and methods with their types:")
for attr in public_attributes_methods:
    attr_value = getattr(load_info.load_packages[0], attr)
    if callable(attr_value):
        print(f"{attr} (method)")
    else:
        print(f"{attr} (attribute)")

Public attributes and methods with their types:
asdict (method)
asstr (method)
completed_at (attribute)
count (method)
index (method)
jobs (attribute)
load_id (attribute)
package_path (attribute)
schema (attribute)
schema_hash (attribute)
schema_name (attribute)
schema_update (attribute)
state (attribute)


In [None]:
load_info.load_packages[0].asdict()