---
## **`dlt` resources**

---
### List of dicts


In the previous lesson, we simply used a list of dictionaries that essentially represents the `pokemon` table.



In [1]:
import dlt

# Sample data containing pokemon details
data = [
    {"id": "1", "name": "bulbasaur", "size": {"weight": 6.9, "height": 0.7}},
    {"id": "4", "name": "charmander", "size": {"weight": 8.5, "height": 0.6}},
    {"id": "25", "name": "pikachu", "size": {"weight": 6, "height": 0.4}},
]


# Set pipeline name, destination, and dataset name
pipeline = dlt.pipeline(
    pipeline_name="quick_start",
    destination="duckdb",
    dataset_name="mydata",
)

# Run the pipeline with data and table name
load_info = pipeline.run(data, table_name="pokemon")

print(load_info)



Pipeline quick_start load step completed in 0.06 seconds
1 load package(s) were loaded to destination duckdb and into dataset mydata
The duckdb destination used duckdb:////Users/m/code/dlt_course/quick_start.duckdb location to store data
Load package 1735310051.4616551 is LOADED and contains no failed jobs


A better way is to wrap it in the `@dlt.resource` decorator which denotes a logical grouping of data within a data source, typically holding data of similar structure and origin:

In [2]:
import dlt

# Create a dlt resource from the data
@dlt.resource(table_name='pokemon_new') # <--- we set new table name
def my_dict_list():
    yield data

Commonly used arguments:

* **`name`**: The resource name and the name of the table generated by this resource. Defaults to the decorated function name.
* **`table_name`**: the name of the table, if different from the resource name.
* **`write_disposition`**: controls how to write data to a table. Defaults to "append".

> **Why is it a better way?** This allows you to use `dlt` functionalities to the fullest that follow Data Engineering best practices, including incremental loading and data contracts.

Try running the pipeline with the my_dict_list resource:


In [3]:
# Run the pipeline and print load info
load_info = pipeline.run(my_dict_list)
print(load_info)

Pipeline quick_start load step completed in 0.04 seconds
1 load package(s) were loaded to destination duckdb and into dataset mydata
The duckdb destination used duckdb:////Users/m/code/dlt_course/quick_start.duckdb location to store data
Load package 1735311026.163569 is LOADED and contains no failed jobs


Check what was loaded to the `pokemon_new` table:

In [4]:
pipeline.dataset(dataset_type="default").pokemon_new.df()

Unnamed: 0,id,name,size__weight,size__height,_dlt_load_id,_dlt_id
0,1,bulbasaur,6.9,0.7,1735311026.163569,H8+bdTXD3lYgwg
1,4,charmander,8.5,0.6,1735311026.163569,ZoK0Da6E9IvKaA
2,25,pikachu,6.0,0.4,1735311026.163569,SX21ArFjDmvzVQ


Instead of a dict list, the data could also be a/an:
- dataframe
- database query response
- API request response
- Anything you can transform into JSON/dict format

---
### Dataframes
For creating a pipeline using dataframes, you would do:

In [5]:
import pandas as pd

# Define a resource to load data from a CSV
@dlt.resource(table_name='df_data')
def my_df():
  sample_df = pd.read_csv("https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv")
  yield sample_df


# Run the pipeline with the defined resource
load_info = pipeline.run(my_df)
print(load_info)

# Query the loaded data from 'df_data'
pipeline.dataset(dataset_type="default").df_data.df()

Pipeline quick_start load step completed in 0.05 seconds
1 load package(s) were loaded to destination duckdb and into dataset mydata
The duckdb destination used duckdb:////Users/m/code/dlt_course/quick_start.duckdb location to store data
Load package 1735311569.376041 is LOADED and contains no failed jobs


Unnamed: 0,index,height_inchesx,_weight_poundsx
0,1,65.78,112.99
1,2,71.52,136.49
2,3,69.40,153.03
3,4,68.22,142.34
4,5,67.79,144.30
...,...,...,...
195,196,65.80,120.84
196,197,66.11,115.78
197,198,68.24,128.30
198,199,68.02,127.47


In [6]:
new_df = pd.read_csv("https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv")

In [7]:
new_df.head()

Unnamed: 0,Index,"Height(Inches)""","""Weight(Pounds)"""
0,1,65.78,112.99
1,2,71.52,136.49
2,3,69.4,153.03
3,4,68.22,142.34
4,5,67.79,144.3


---
### Database

For creating a pipeline from an SQL database query you would do:

1. Install the PyMySQL package:

In [None]:
%%capture
uv add pymysql

2. Create and run a pipeline to fetch data from an SQL resource and query the loaded data as follows:

In [10]:
import dlt
from sqlalchemy import create_engine

# Define a resource to fetch genome data from the database
@dlt.resource(table_name='genome_data')
def get_genome_data():
  engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam")
  with engine.connect() as conn:
      query = "SELECT * FROM genome LIMIT 1000"
      rows = conn.execution_options(yield_per=100).exec_driver_sql(query)
      yield from map(lambda row: dict(row._mapping), rows)

# Run the pipeline with the genome data resource
load_info = pipeline.run(get_genome_data)
print(load_info)

# Query the loaded data from 'genome_data'
pipeline.dataset(dataset_type="default").genome_data.df()

Pipeline quick_start load step completed in 0.16 seconds
1 load package(s) were loaded to destination duckdb and into dataset mydata
The duckdb destination used duckdb:////Users/m/code/dlt_course/quick_start.duckdb location to store data
Load package 1735312042.8845701 is LOADED and contains no failed jobs


Unnamed: 0,upid,description,total_length,ncbi_id,scientific_name,kingdom,num_rfam_regions,num_families,is_reference,is_representative,...,assembly_acc,assembly_version,assembly_level,ungapped_length,assembly_name,study_ref,circular,wgs_acc,wgs_version,common_name
0,RG000000001,Potato spindle tuber viroid,4591,12892,Potato spindle tuber viroid,viroids,0,0,1,0,...,,,,,,,,,,
1,RG000000002,Columnea latent viroid,370,12901,Columnea latent viroid,viroids,0,0,1,0,...,,,,,,,,,,
2,RG000000003,Tomato apical stunt viroid-S,360,53194,Tomato apical stunt viroid-S,viroids,0,0,1,0,...,,,,,,,,,,
3,RG000000004,Tomato apical stunt viroid,360,12885,Tomato apical stunt viroid,viroids,0,0,1,0,...,,,,,,,,,,
4,RG000000005,Cucumber yellows virus,7899,32618,Cucumber yellows virus,viruses,0,0,1,0,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,UP000001264,,40275,673838,Enterococcus phage phiFL3B,viruses,0,0,0,1,...,GCA_002630665.1,1.0,complete-genome,,ASM263066v1,,,,,
996,UP000001265,Enterococcus phage phiFL4A,37856,673839,Enterococcus phage phiFL4A,viruses,2,2,0,1,...,,0.0,,0.0,,,0.0,,0.0,
997,UP000001266,Staphylococcus phage SA1,147303,694060,Staphylococcus phage SA1,viruses,0,0,0,1,...,,0.0,,0.0,,,0.0,,0.0,
998,UP000001270,Mycobacterium phage AnnaL29,53253,1076630,Mycobacterium phage AnnaL29,viruses,0,0,0,1,...,,0.0,,0.0,,,0.0,,0.0,


---
### REST API

For REST API endpoints, create a pipeline as follows:

In [None]:
from dlt.sources.helpers import requests


# Define a resource to fetch pokemons from PokeAPI
@dlt.resource(table_name='pokemon_api')
def get_pokemon():
    url = "https://pokeapi.co/api/v2/pokemon"
    response = requests.get(url)
    yield response.json()["results"]


# Run the pipeline using the defined resource
load_info = pipeline.run(get_pokemon)
print(load_info)

# Query the loaded data from 'pokemon_api' table
pipeline.dataset(dataset_type="default").pokemon_api.df()