# **Transform Data Before and After Load**

dlt provides several ways of transforming data during the ingestion:
1. With custom query (applicable for `sql_database` source)
2. With dlt special functions (`add_map` and `add_filter`)
3. Via `@dlt.transformers`
4. With `pipeline.dataset()`

In [2]:
%%capture
!pip install "dlt[sql_database, duckdb]"
!pip install pymysql

## Setup

We will be using the `sql_database` source as an example and will connect to the public [MySQL RFam](https://www.google.com/url?q=https%3A%2F%2Fwww.google.com%2Furl%3Fq%3Dhttps%253A%252F%252Fdocs.rfam.org%252Fen%252Flatest%252Fdatabase.html) database. The RFam database contains publicly accessible scientific data on RNA structures.

Let's perform an initial load:

In [3]:
import dlt
from dlt.sources.sql_database import sql_database

source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["family","genome"]
)

pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline",
    destination="duckdb",
    dataset_name="sql_data"
)

load_info = pipeline.run(source)
print(load_info)

Pipeline sql_database_pipeline load step completed in 11.00 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline.duckdb location to store data
Load package 1745859540.164666 is LOADED and contains no failed jobs


In [4]:
# usng sql_client to query the data
with pipeline.sql_client() as client:
  with client.execute_query("SELECT * FROM genome") as table:
    print(table.df())

              upid     assembly_acc  assembly_version wgs_acc  wgs_version  \
0      RG000000001             None              <NA>    None         <NA>   
1      RG000000002             None              <NA>    None         <NA>   
2      RG000000003             None              <NA>    None         <NA>   
3      RG000000004             None              <NA>    None         <NA>   
4      RG000000005             None              <NA>    None         <NA>   
...            ...              ...               ...     ...          ...   
32164  UP001295980  GCA_963583275.1                 1    None         <NA>   
32165  UP001296009  GCA_963583145.1                 1    None         <NA>   
32166  UP001296230  GCA_963583065.1                 1    None         <NA>   
32167  UP001296237  GCA_963583445.1                 1    None         <NA>   
32168            x             None              <NA>    None         <NA>   

      assembly_name assembly_level study_ref  \
0              

## 1. Transformation using `query_adapter_callback`

Using the `query_adapter_callback` arguement in a sql_database source, you can modify the SELECT statement, for example, by adding a WHERE clause

In [5]:
# step 1 - define the query adapter callback function
def query_adapter_callback(query, table):
  if table.name == "genome":
    # only select rows with kingdom = "bacteria"
        return query.where(table.c.kingdom=="bacteria")
    # Use the original query for other tables
  return query

In [8]:
# step 2 - pass the function using the query_adapter_callback argument
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome", "clan"],
    query_adapter_callback=query_adapter_callback
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data"
)

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)

Run started at 2025-04-28 17:01:51.824455+00:00 and COMPLETED in 14.89 seconds with 4 steps.
Step extract COMPLETED in 8.76 seconds.

Load package 1745859711.9093106 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 2.46 seconds.
Normalized data for the following tables:
- genome: 13921 row(s)
- clan: 147 row(s)

Load package 1745859711.9093106 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 3.62 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 3.59 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_filtered.duckdb location to store data
Load package 1745859711.9093106 is LOADED and contains no failed jobs

Step run COMPLETED in 14.89 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 3.59 seconds
1 load package(s) were loaded t

## 2. Transformation Using dlt Special Methods

- `resource.add_map()` - transform the data item with a mapped function
- `resource.add_filter()` - filter data item
- `resource.add_yield_map()` - a map that generates an iterator
- `limit` - limits the no. of records processed by a resource

### 2.1. `add_map`

In [9]:
# using add_map to anonymize data
with pipeline.sql_client() as client:

    with client.execute_query("SELECT DISTINCT author FROM clan LIMIT 5") as table:
        print("Table clan:")
        print(table.df())

Table clan:
                            author
0                       Gardner PP
1  Gardner PP; 0000-0002-7808-1213
2                           Daub J
3                          Brown C
4                         Burge SW


In [10]:
import hashlib

def pseudonymize_name(row):
  # add a constant salt to generate
  salt = 'WI@N57%zZrmk#88c'
  salted_string = row['author'] + salt
  sh = hashlib.sha256()
  sh.update(salted_string.encode())
  hashed_string = sh.digest().hex()
  row['author'] = hashed_string
  return row

In [11]:
import dlt
import hashlib
from dlt.sources.sql_database import sql_database


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_anonymized",
    destination="duckdb",
    dataset_name="sql_data"
)

source = sql_database("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", table_names=["clan"])

source.clan.add_map(pseudonymize_name) # Apply the anonymization function to the extracted data

info = pipeline.run(source)
print(info)

Pipeline sql_database_pipeline_anonymized load step completed in 0.30 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_anonymized.duckdb location to store data
Load package 1745859877.4171293 is LOADED and contains no failed jobs


In [12]:
with pipeline.sql_client() as client:

    with client.execute_query("SELECT DISTINCT author FROM clan LIMIT 5") as table:
        print("Table clan:")
        print(table.df())

Table clan:
                                              author
0  eb00a968f270b436a506949ba46fd70a66591c5acf2b57...
1  c6723bd5737f14a8d4b2e447a063ce547be1f88c7b6450...
2  9766eaa9533fcb3684649b2e4a5db9573f1bf10f103ed4...
3  bee07cbb64f4e771a4bd9327afd9244e9a5dbc3aa957d9...
4  9d281938612b0b9646c75049e817208c236431dee86583...


### 2.2. `add_filter()`

Expects a function that returns a boolean value for each item

In [13]:
import time
import dlt
from dlt.sources.sql_database import sql_database

source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome"]
)

pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data"
)
# filter records where column kingdom has value bacteria
source.genome.add_filter(lambda item: item["kingdom"] == "bacteria")

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)

Run started at 2025-04-28 17:08:10.721430+00:00 and COMPLETED in 15.41 seconds with 4 steps.
Step extract COMPLETED in 9.04 seconds.

Load package 1745860090.8078704 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 1.75 seconds.
Normalized data for the following tables:
- genome: 13921 row(s)

Load package 1745860090.8078704 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 4.56 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 4.54 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_filtered.duckdb location to store data
Load package 1745860090.8078704 is LOADED and contains no failed jobs

Step run COMPLETED in 15.41 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 4.54 seconds
1 load package(s) were loaded to destination duckd

In [14]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT COUNT(*) AS total_rows, MAX(_dlt_load_id) as latest_load_id FROM genome") as table:
        print("Table genome:")
        genome_count = table.df()
genome_count

Table genome:


Unnamed: 0,total_rows,latest_load_id
0,13921,1745860090.8078704


### Question 1:

What is a `total_rows` in the example above?

> 13921

### 2.3. `add_limit()`

In [15]:
import time
import dlt
from dlt.sources.sql_database import sql_database


source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    table_names=["genome"],
    chunk_size=10,
)


pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_filtered",
    destination="duckdb",
    dataset_name="sql_data"
)
source.genome.add_limit(1)

load_info = pipeline.run(source, write_disposition="replace")

print(pipeline.last_trace)



Run started at 2025-04-28 17:10:48.335219+00:00 and COMPLETED in 9.15 seconds with 4 steps.
Step extract COMPLETED in 8.82 seconds.

Load package 1745860248.4216487 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.04 seconds.
Normalized data for the following tables:
- genome: 10 row(s)

Load package 1745860248.4216487 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 0.22 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 0.20 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data
The duckdb destination used duckdb:////content/sql_database_pipeline_filtered.duckdb location to store data
Load package 1745860248.4216487 is LOADED and contains no failed jobs

Step run COMPLETED in 9.15 seconds.
Pipeline sql_database_pipeline_filtered load step completed in 0.20 seconds
1 load package(s) were loaded to destination duckdb and

In [16]:
with pipeline.sql_client() as client:
  with client.execute_query("SELECT * FROM genome") as table:
      genome_limited = table.df()
genome_limited

Unnamed: 0,upid,assembly_acc,assembly_version,wgs_acc,wgs_version,assembly_name,assembly_level,study_ref,description,total_length,...,common_name,kingdom,num_rfam_regions,num_families,is_reference,is_representative,created,updated,_dlt_load_id,_dlt_id
0,RG000000001,,,,,,,,Potato spindle tuber viroid,4591,...,,viroids,0,0,1,0,2017-06-06 15:11:02+00:00,2020-04-23 11:46:08+00:00,1745860248.4216487,5rq9yl1hbgyUOg
1,RG000000002,,,,,,,,Columnea latent viroid,370,...,,viroids,0,0,1,0,2017-06-06 15:11:07+00:00,2020-04-23 11:46:08+00:00,1745860248.4216487,paUqIlwOwbyB9g
2,RG000000003,,,,,,,,Tomato apical stunt viroid-S,360,...,,viroids,0,0,1,0,2017-06-06 15:11:12+00:00,2020-04-23 11:47:10+00:00,1745860248.4216487,4ueCDwAQjmm2Dw
3,RG000000004,,,,,,,,Tomato apical stunt viroid,360,...,,viroids,0,0,1,0,2017-06-06 15:11:17+00:00,2020-04-23 11:46:08+00:00,1745860248.4216487,tFY0tmzd6LzvOQ
4,RG000000005,,,,,,,,Cucumber yellows virus,7899,...,,viruses,0,0,1,0,2017-06-06 15:11:21+00:00,2020-04-23 11:46:28+00:00,1745860248.4216487,Xr5UU7hRvtToRg
5,RG000000006,,,,,,,,Caprine arthritis encephalitis virus Roccaverano,8418,...,,viruses,0,0,1,0,2017-06-06 15:11:24+00:00,2020-04-23 11:52:32+00:00,1745860248.4216487,Mm+HLljwJ3h90g
6,RG000000007,GCA_000413255.1,1.0,,,,,,Physarum polycephalum,189961418,...,,eukaryota,0,0,1,0,2017-06-06 15:11:28+00:00,2020-04-23 11:44:55+00:00,1745860248.4216487,DbbNd0WYO8tkEw
7,RG000000009,GCF_000836845.1,1.0,,,,,,Arabis mosaic virus small satellite RNA,300,...,,viruses,0,0,1,0,2017-06-06 15:11:43+00:00,2020-04-23 11:49:54+00:00,1745860248.4216487,TRQJqpU3fwEeDw
8,RG000000010,GCF_000840125.1,1.0,,,,,,Tobacco ringspot virus satellite RNA,359,...,,viruses,0,0,1,0,2017-06-06 15:11:48+00:00,2020-04-23 11:46:26+00:00,1745860248.4216487,rmcRpiBMNptxQg
9,RG000000011,GCF_000847605.1,1.0,,,,,,Equine infectious anemia virus,8359,...,,viruses,0,0,1,0,2017-06-06 15:11:52+00:00,2020-04-23 11:46:01+00:00,1745860248.4216487,1nlURMHalXyfDg


## Transforimg Using `@dlt.transformer`

In [18]:
import dlt
import hashlib
from dlt.sources.sql_database import sql_database

@dlt.transformer()
def batch_stats(items):
  yield {"batch_lenght": len(items),
         "max_lenght":max([item["total_length"] for item in items])
         }

source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam",
    chunk_size=10000
    ).genome

pipeline = dlt.pipeline(
    pipeline_name="sql_database_pipeline_with_transformers1",
    destination="duckdb",
    dataset_name="sql_data",
    dev_mode=True
)

info = pipeline.run([source, source | batch_stats])
print(info)

Pipeline sql_database_pipeline_with_transformers1 load step completed in 10.64 seconds
1 load package(s) were loaded to destination duckdb and into dataset sql_data_20250428052217
The duckdb destination used duckdb:////content/sql_database_pipeline_with_transformers1.duckdb location to store data
Load package 1745860937.442921 is LOADED and contains no failed jobs


In [19]:
with pipeline.sql_client() as client:
  with client.execute_query("SELECT * FROM batch_stats") as table:
        res = table.df()
res

Unnamed: 0,batch_lenght,max_lenght,_dlt_load_id,_dlt_id
0,10000,13427940695,1745860937.442921,WN4sMP7LKeUefA
1,10000,6250353185,1745860937.442921,xJ9oZSo3x+gCKg
2,10000,10237952243,1745860937.442921,MucZ//r0URDoJA
3,2169,3527038434,1745860937.442921,b9rOqnnpQNButg


## Transformation After Load
You can transform data after load in the following ways
1. using the sql_client
2. using `pipeline.dataset()` or ibis integration
3. using the dbt integration

In [20]:
# NOTE: this is the duckdb sql dialect, other destinations may use different expressions
with pipeline.sql_client() as client:
    client.execute_sql(
        """ CREATE OR REPLACE TABLE genome_length AS
            SELECT
                SUM(total_length) AS total_total_length,
                AVG(total_length) AS average_total_length
            FROM
                genome
    """)
    with client.execute_query("SELECT * FROM genome_length") as table:
        genome_length = table.df()

genome_length

Unnamed: 0,total_total_length,average_total_length
0,1519769000000.0,47243290.0


In [22]:
# using pipeline.dataset()

dataset = pipeline.dataset()

# list the tables and their row counts
dataset.row_counts().df()

Unnamed: 0,table_name,row_count
0,batch_stats,4
1,genome,32169


In [24]:
# convert to a pandas dataframe
dataset.genome.df().head()

Unnamed: 0,upid,assembly_acc,assembly_version,wgs_acc,wgs_version,assembly_name,assembly_level,study_ref,description,total_length,...,common_name,kingdom,num_rfam_regions,num_families,is_reference,is_representative,created,updated,_dlt_load_id,_dlt_id
0,RG000000001,,,,,,,,Potato spindle tuber viroid,4591,...,,viroids,0,0,1,0,2017-06-06 15:11:02+00:00,2020-04-23 11:46:08+00:00,1745860937.442921,x/PDmyv+afeDKA
1,RG000000002,,,,,,,,Columnea latent viroid,370,...,,viroids,0,0,1,0,2017-06-06 15:11:07+00:00,2020-04-23 11:46:08+00:00,1745860937.442921,i387fwl31kHwzg
2,RG000000003,,,,,,,,Tomato apical stunt viroid-S,360,...,,viroids,0,0,1,0,2017-06-06 15:11:12+00:00,2020-04-23 11:47:10+00:00,1745860937.442921,s1OvzpfNPusFHQ
3,RG000000004,,,,,,,,Tomato apical stunt viroid,360,...,,viroids,0,0,1,0,2017-06-06 15:11:17+00:00,2020-04-23 11:46:08+00:00,1745860937.442921,N5JoNiElv+9d2Q
4,RG000000005,,,,,,,,Cucumber yellows virus,7899,...,,viruses,0,0,1,0,2017-06-06 15:11:21+00:00,2020-04-23 11:46:28+00:00,1745860937.442921,PWp3pXBQ7O2QsQ


In [25]:
# convert to Apache Arrow
dataset.genome.arrow()

pyarrow.Table
upid: string
assembly_acc: string
assembly_version: int64
wgs_acc: string
wgs_version: int64
assembly_name: string
assembly_level: string
study_ref: string
description: string
total_length: int64
ungapped_length: int64
circular: int64
ncbi_id: int64
scientific_name: string
common_name: string
kingdom: string
num_rfam_regions: int64
num_families: int64
is_reference: int64
is_representative: int64
created: timestamp[us, tz=UTC]
updated: timestamp[us, tz=UTC]
_dlt_load_id: string
_dlt_id: string
----
upid: [["RG000000001","RG000000002","RG000000003","RG000000004","RG000000005",...,"UP001295980","UP001296009","UP001296230","UP001296237","x"]]
assembly_acc: [[null,null,null,null,null,...,"GCA_963583275.1","GCA_963583145.1","GCA_963583065.1","GCA_963583445.1",null]]
assembly_version: [[null,null,null,null,null,...,1,1,1,1,null]]
wgs_acc: [[null,null,null,null,null,...,null,null,null,null,null]]
wgs_version: [[null,null,null,null,null,...,null,null,null,null,null]]
assembly_name