# Transforming and filtering the data [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dlt-hub/dlt/blob/master/docs/education/dlt-advanced-course/lesson_5_transform_data_before_and_after_loading.ipynb) [![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)](https://github.com/dlt-hub/dlt/blob/master/docs/education/dlt-advanced-course/lesson_5_transform_data_before_and_after_loading.ipynb)

In this lesson, we will take a look at various ways of doing data transformations and filtering of the data during and after the ingestion.

dlt provides several ways of doing it during the ingestion:
1. With custom query (applicable for `sql_database` source).
2. With dlt special functions (`add_map` and `add_filter`).
3. Via `@dlt.transformers`.
4. With `pipeline.dataset()`.

Let's review and compare those methods.

##  What you’ll learn:

- How to limit rows at the source with SQL queries.
- How to apply custom Python logic per record.
- How to write transformations using functional and declarative APIs.
- How to access and query your loaded data using `.dataset()`.

## Setup and initial Load

We will be using the `sql_database` source as an example and will connect to the public [MySQL RFam](https://www.google.com/url?q=https%3A%2F%2Fwww.google.com%2Furl%3Fq%3Dhttps%253A%252F%252Fdocs.rfam.org%252Fen%252Flatest%252Fdatabase.html) database. The RFam database contains publicly accessible scientific data on RNA structures.

Let's perform an initial load:

In [None]:
%%capture
!pip install -U "dlt[sql_database, duckdb]"
!pip install pymysql

In [None]:
import dlt

from dlt.sources.sql_database import sql_database

source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["family", "genome"],
)

pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline",
    destination="duckdb",
    dataset_name="sql_data",
)
load_info = pipeline.run(source)
print(load_info)

Pipeline sql_database_pipeline load step completed in 9.91 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline.duckdb location to store data
Load package 1753090463.4197385 is LOADED and contains no failed jobs


In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT * FROM genome") as table:
        genome = table.df()
genome

You can check your data count using `sql_client`:

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT COUNT(*) AS total_rows FROM genome") as table:
        print(table.df())

## **1. Filtering the data during the ingestion with `query_adapter_callback`**

Imagine a use-case where we're only interested in getting the genome data for bacterias. In this case, ingesting the whole `genome` table would be an unnecessary use of time and compute resources.

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query(
        "SELECT COUNT(*) AS total_rows FROM genome WHERE kingdom='bacteria'"
    ) as table:
        print(table.df())

When ingesting data using the `sql_database` source, dlt runs a `SELECT` statement in the back, and using the `query_adapter_callback` parameter makes it possible to pass a `WHERE` clause inside the underlying `SELECT` statement.  
  
In this example, only the table `genome` is filtered on the column `kingdom`

In [None]:
from dlt.sources.sql_database.helpers import Table, SelectAny, SelectClause


def query_adapter_callback(query: SelectAny, table: Table) -> SelectAny:
    if table.name == "genome":
        # Only select rows where the column kingdom has value "bacteria"
        return query.where(table.c.kingdom == "bacteria")
    # Use the original query for other tables
    return query

Attach it:

In [None]:
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome"],
    query_adapter_callback=query_adapter_callback,
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data",
)

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)

In the snippet above we created an SQL VIEW in your source database and extracted data from it. In that case, dlt will infer all column types and read data in shape you define in a view without any further customization.

If creating a view is not feasible, you can fully rewrite the automatically generated query with an extended version of `query_adapter_callback`:

In [None]:
import dlt
import sqlalchemy as sa

from dlt.sources.sql_database import sql_database


def query_adapter_callback(query: SelectAny, table: Table) -> SelectClause:
    if table.name == "genome":
        return sa.text(f"SELECT * FROM {table.fullname} WHERE kingdom='bacteria'")
    return query


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome", "clan"],
    query_adapter_callback=query_adapter_callback,
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data",
)

load_info = pipeline.run(source, write_disposition="replace")

print(load_info)

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query(
        "SELECT COUNT(*) AS total_rows, MAX(_dlt_load_id) as latest_load_id FROM clan"
    ) as table:
        print("Table clan:")
        print(table.df())
        print("\n")

    with client.execute_query(
        "SELECT COUNT(*) AS total_rows, MAX(_dlt_load_id) as latest_load_id FROM genome"
    ) as table:
        print("Table genome:")
        print(table.df())

## **2. Transforming the data after extract and before load**

Since dlt is a Python library, it gives you a lot of control over the extracted data.  

You can attach any number of transformations that are evaluated on an item-per-item basis to your resource. The available transformation types:

* `map` - transform the data item (resource.add_map).
* `filter` - filter the data item (resource.add_filter).
* `yield map` - a map that returns an iterator (so a single row may generate many rows - resource.add_yield_map).
* `limit` - limits the number of records processed by a resource. Useful for testing or reducing data volume during development.
  
For example, if we wanted to anonymize sensitive data before it is loaded into the destination, then we can write a python function for it and apply it to source or resource using the `.add_map()` method.

[dlt documentation.](https://dlthub.com/docs/general-usage/resource#filter-transform-and-pivot-data)

### Using `add_map`

In the table `clan`, we notice that there is a column `author` that we would like to anonymize.

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT DISTINCT author FROM clan LIMIT 5") as table:
        print("Table clan:")
        print(table.df())

We write a function in python that anonymizes a string

In [None]:
import hashlib
from dlt.common.typing import TDataItem


def pseudonymize_name(row: TDataItem) -> TDataItem:
    """
    Pseudonymization is a deterministic type of PII-obscuring.
    Its role is to allow identifying users by their hash,
    without revealing the underlying info.
    """
    # add a constant salt to generate
    salt = "WI@N57%zZrmk#88c"
    salted_string = row["author"] + salt
    sh = hashlib.sha256()
    sh.update(salted_string.encode())
    hashed_string = sh.digest().hex()
    row["author"] = hashed_string
    return row

In [None]:
import dlt
import hashlib
from dlt.sources.sql_database import sql_database


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_anonymized",
    destination="duckdb",
    dataset_name="sql_data",
)


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", table_names=["clan"]
)

source.clan.add_map(
    pseudonymize_name
)  # Apply the anonymization function to the extracted data

info = pipeline.run(source)
print(info)

After the pipeline has run, we can observe that the author column has been anonymized.

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT DISTINCT author FROM clan LIMIT 5") as table:
        print("Table clan:")
        clan = table.df()

clan

**Note:** If you're using the `pyarrow` or `connectorx` backend, the data is not processed item-by-item. Instead they're processed in batches, therefore your function should be adjusted. For example, for PyArrow chunks the function could be changed as follows:

In [None]:
import hashlib
import pyarrow as pa
import pyarrow.compute as pc


def pseudonymize_name_pyarrow(table: pa.Table) -> pa.Table:
    """
    Pseudonymizes the 'author' column in a PyArrow Table.
    """
    salt = "WI@N57%zZrmk#88c"

    # Convert PyArrow Table to Pandas DataFrame for hashing
    df = table.to_pandas()

    # Apply SHA-256 hashing
    df["author"] = (
        df["author"]
        .astype(str)
        .apply(lambda x: hashlib.sha256((x + salt).encode()).hexdigest())
    )

    # Convert back to PyArrow Table
    new_table = pa.Table.from_pandas(df)

    return new_table


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_anonymized1",
    destination="duckdb",
    dataset_name="sql_data",
)


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["clan"],
    backend="pyarrow",
)

source.clan.add_map(
    pseudonymize_name_pyarrow
)  # Apply the anonymization function to the extracted data

info = pipeline.run(source)
print(info)

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT DISTINCT author FROM clan LIMIT 5") as table:
        print("Table clan:")
        print(table.df())

### `add_map` vs `add_yield_map`

The difference between `add_map` and `add_yield_map` matters when a transformation returns multiple records from a single input.

#### **`add_map`**
- Use `add_map` when you want to transform each item into exactly one item.
- Think of it like modifying or enriching a row.
- You use a regular function that returns one modified item.
- Great for adding fields or changing structure.

#### Example




In [None]:
import dlt
from dlt.common.typing import TDataItems


@dlt.resource
def resource() -> TDataItems:
    yield [{"name": "Alice"}, {"name": "Bob"}]


def add_greeting(item: TDataItem) -> TDataItem:
    item["greeting"] = f"Hello, {item['name']}!"
    return item


resource.add_map(add_greeting)

for row in resource():
    print(row)

#### **`add_yield_map`**
- Use `add_yield_map` when you want to turn one item into multiple items, or possibly no items.
- Your function is a generator that uses yield.
- Great for pivoting nested data, flattening lists, or filtering rows.

#### Example


In [None]:
import dlt


@dlt.resource
def resource() -> TDataItems:
    yield [
        {"name": "Alice", "hobbies": ["reading", "chess"]},
        {"name": "Bob", "hobbies": ["cycling"]},
    ]


def expand_hobbies(item: TDataItem) -> TDataItem:
    for hobby in item["hobbies"]:
        yield {"name": item["name"], "hobby": hobby}


resource.add_yield_map(expand_hobbies)

for row in resource():
    print(row)

### Using `add_filter`
`add_filter` function can be used similarly. The difference is that `add_filter` expects a function that returns a boolean value for each item. For example, to implement the same filtering we did with a query callback, we can use:

In [None]:
import time
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome"],
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data",
)
source.genome.add_filter(lambda item: item["kingdom"] == "bacteria")

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query(
        "SELECT COUNT(*) AS total_rows, MAX(_dlt_load_id) as latest_load_id FROM genome"
    ) as table:
        print("Table genome:")
        genome_count = table.df()
genome_count

### Question 1:

What is a `total_rows` in the example above?

### Using `add_limit`

If your resource loads thousands of pages of data from a REST API or millions of rows from a database table, you may want to sample just a fragment of it in order to quickly see the dataset with example data and test your transformations, etc.

To do this, you limit how many items will be yielded by a resource (or source) by calling the `add_limit` method. This method will close the generator that produces the data after the limit is reached.

In [None]:
import time
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome"],
    chunk_size=10,
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data",
)
source.genome.add_limit(1)

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT * FROM genome") as table:
        genome_limited = table.df()
genome_limited

## **3. Transforming data with `@dlt.transformer`**

The main purpose of transformers is to create children tables with additional data requests, but they can also be used for data transformations especially if you want to keep the original data as well.

In [None]:
import time

import dlt
import hashlib
from dlt.sources.sql_database import sql_database


@dlt.transformer()
def batch_stats(items: TDataItems) -> TDataItem:
    """
    Pseudonymization is a deterministic type of PII-obscuring.
    Its role is to allow identifying users by their hash,
    without revealing the underlying info.
    """
    # add a constant salt to generate
    yield {
        "batch_length": len(items),
        "max_length": max([item["total_length"] for item in items]),
    }


genome_resource = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", chunk_size=10000
).genome


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_with_transformers1",
    destination="duckdb",
    dataset_name="sql_data",
    dev_mode=True,
)

info = pipeline.run([genome_resource, genome_resource | batch_stats])
print(pipeline.last_trace)

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT * FROM batch_stats") as table:
        res = table.df()
res

## **4. Transforming data after the load**

Another possibility for data transformation is transforming data after the load. dlt provides several way of doing it:

* using `sql_client`,
* via `.dataset()` and ibis integration,
* via [dbt integration](https://dlthub.com/docs/dlt-ecosystem/transformations/dbt/).


### SQL client

You already saw examples of using dlt's SQL client. dlt gives you an opportunity to connect to your destination and execute any SQL query.

In [None]:
# NOTE: this is the duckdb sql dialect, other destinations may use different expressions
with pipeline.sql_client() as client:
    client.execute_sql(
        """ CREATE OR REPLACE TABLE genome_length AS
            SELECT
                SUM(total_length) AS total_total_length,
                AVG(total_length) AS average_total_length
            FROM
                genome
    """
    )
    with client.execute_query("SELECT * FROM genome_length") as table:
        genome_length = table.df()

genome_length

### Accessing loaded data with `pipeline.dataset()`

Use `pipeline.dataset()` to inspect and work with your data in Python after loading.


In [None]:
dataset = pipeline.dataset()

# List tables
dataset.row_counts().df()

Note that `row_counts` didn't return the new table `genome_length`,

In [None]:
# Access as pandas
df = dataset["genome"].df()
df

In [None]:
# Access as Arrow
arrow_table = dataset["genome_length"].arrow()
arrow_table

You can also filter, limit, and select columns:

In [None]:
df = dataset["genome"].select("kingdom", "ncbi_id").limit(10).df()
df

To iterate over large data:

In [None]:
for chunk in dataset["genome"].iter_df(chunk_size=500):
    print(chunk.head())

For more advanced users, this interface supports **Ibis expressions**, joins, and subqueries.

### Ibis integration

Ibis is a powerful portable Python dataframe library. Learn more about what it is and how to use it in the [official documentation](https://ibis-project.org/).

[dlt provides a way to use Ibis expressions natively](https://dlthub.com/docs/general-usage/dataset-access/ibis-backend) with a lot of destinations. Supported ones are:
* Snowflake
* DuckDB
* MotherDuck
* Postgres
* Redshift
* Clickhouse
* MSSQL (including Synapse)
* BigQuery

In [None]:
!pip install ibis-framework[duckdb]

In [None]:
# get the dataset from the pipeline
dataset = pipeline.dataset()
dataset_name = pipeline.dataset_name

# get the native ibis connection from the dataset
ibis_connection = dataset.ibis()

# list all tables in the dataset
# NOTE: You need to provide the dataset name to ibis, in ibis datasets are named databases
print(ibis_connection.list_tables(database=dataset_name))

# get the items table
table = ibis_connection.table("batch_stats", database=dataset_name)  #

# print the first 2 rows
print(table.limit(2).execute())

✅ ▶ Proceed to the [next lesson](https://colab.research.google.com/drive/1XT1xUIQIWj0nPWOmTixThgdXzi4vudce#forceEdit=true&sandboxMode=true)!