# Extracción sistemática

Una de las principales albores de los ingenieros de datos es la obtención sistemática de datos para poblar nuestros sistemas de analítica. Para ello existen herramientas especializadas que no solo pueden leer de sistemas corporativos (Bases de Datos) si no también de APIs. Aquí veremos una solución de nueva generación que nos ofrece multitud de opciones.

In [3]:
# !pip install dlt

**Data Load Tool** se emplea para la obtención e inserción de los datos en sistemas que controlemos nosotros. A modo sencillo podemos ver cómo podemos por ejemplo obtener datos de una API (hemos limitado el número de datos para que no se haga largo).

En este caso consultaremos una versión paginada de la API de pokemon... https://pokeapi.co/docs/v2#info

In [4]:
from dlt.sources.helpers.rest_client import RESTClient

# Initialize the RESTClient with the Pokémon API base URL
client = RESTClient(base_url="https://pokeapi.co/api/v2")

# Using the paginate method to automatically handle pagination
num = 10
poke_list = []
for page in client.paginate("/pokemon"):
    poke_list += page
    num -= 1

    # Paramos al obtener 10
    if num == 0:
        break

Con esto podemos montar nuestro dataframe, operar sobre él y cargarlo en un sistema destino.

In [5]:
import pandas as pd

poke_df = pd.DataFrame(poke_list)
poke_df

Unnamed: 0,name,url
0,bulbasaur,https://pokeapi.co/api/v2/pokemon/1/
1,ivysaur,https://pokeapi.co/api/v2/pokemon/2/
2,venusaur,https://pokeapi.co/api/v2/pokemon/3/
3,charmander,https://pokeapi.co/api/v2/pokemon/4/
4,charmeleon,https://pokeapi.co/api/v2/pokemon/5/
...,...,...
195,espeon,https://pokeapi.co/api/v2/pokemon/196/
196,umbreon,https://pokeapi.co/api/v2/pokemon/197/
197,murkrow,https://pokeapi.co/api/v2/pokemon/198/
198,slowking,https://pokeapi.co/api/v2/pokemon/199/


Los pipelines son la forma de definir esas tuberías de entrada y almacenamiento de datos de manera que se ejecuten de forma continua. Todavía no hemos hablado de las bases de datos pero son un destino frecuente a la hora de definir nuestros destinos.

In [6]:
# !pip install duckdb



In [7]:
import dlt

pipeline = dlt.pipeline(destination="duckdb", dataset_name="country_data")

data = [
    {'country': 'USA', 'population': 331449281, 'capital': 'Washington, D.C.'},
    {'country': 'Canada', 'population': 38005238, 'capital': 'Ottawa'},
    {'country': 'Germany', 'population': 83019200, 'capital': 'Berlin'}
]

info = pipeline.run(data, table_name="countries")

print(info)

Pipeline dlt_ipykernel_launcher load step completed in 0.24 seconds
1 load package(s) were loaded to destination duckdb and into dataset country_data
The duckdb destination used duckdb:////home/iraitz/TheBridge/FEB24 -BIL  DS PT/TheBridge_DSPT/2-Data Analysis/5-Fuentes de datos/Web/Teoría/dlt_ipykernel_launcher.duckdb location to store data
Load package 1717434393.5724628 is LOADED and contains no failed jobs


In [8]:
info.dataset_name

'country_data'

In [9]:
import duckdb

db = duckdb.connect(database="dlt_ipykernel_launcher.duckdb")
db.sql("DESCRIBE;")

┌──────────────────────┬──────────────┬─────────────────────┬──────────────────────┬───────────────────────┬───────────┐
│       database       │    schema    │        name         │     column_names     │     column_types      │ temporary │
│       varchar        │   varchar    │       varchar       │      varchar[]       │       varchar[]       │  boolean  │
├──────────────────────┼──────────────┼─────────────────────┼──────────────────────┼───────────────────────┼───────────┤
│ dlt_ipykernel_laun…  │ country_data │ _dlt_loads          │ [load_id, schema_n…  │ [VARCHAR, VARCHAR, …  │ false     │
│ dlt_ipykernel_laun…  │ country_data │ _dlt_pipeline_state │ [version, engine_v…  │ [BIGINT, BIGINT, VA…  │ false     │
│ dlt_ipykernel_laun…  │ country_data │ _dlt_version        │ [version, engine_v…  │ [BIGINT, BIGINT, TI…  │ false     │
│ dlt_ipykernel_laun…  │ country_data │ countries           │ [country, populati…  │ [VARCHAR, BIGINT, V…  │ false     │
└──────────────────────┴────────

In [10]:
db.sql("SELECT * FROM country_data.countries;")

┌─────────┬────────────┬──────────────────┬────────────────────┬────────────────┐
│ country │ population │     capital      │    _dlt_load_id    │    _dlt_id     │
│ varchar │   int64    │     varchar      │      varchar       │    varchar     │
├─────────┼────────────┼──────────────────┼────────────────────┼────────────────┤
│ USA     │  331449281 │ Washington, D.C. │ 1717434393.5724628 │ iPmaGNV8zAfgww │
│ Canada  │   38005238 │ Ottawa           │ 1717434393.5724628 │ cYKUg3FqlL213g │
│ Germany │   83019200 │ Berlin           │ 1717434393.5724628 │ J07WykKqamHeVQ │
└─────────┴────────────┴──────────────────┴────────────────────┴────────────────┘

In [11]:
data_df = db.sql("SELECT * FROM country_data.countries;").df()
data_df

Unnamed: 0,country,population,capital,_dlt_load_id,_dlt_id
0,USA,331449281,"Washington, D.C.",1717434393.5724628,iPmaGNV8zAfgww
1,Canada,38005238,Ottawa,1717434393.5724628,cYKUg3FqlL213g
2,Germany,83019200,Berlin,1717434393.5724628,J07WykKqamHeVQ


In [12]:
type(data_df)

pandas.core.frame.DataFrame

Podemos montar estructuras complejas que se hagan cargo de ir obteniendo los datos periodicamente y formando nuestra base de datos.

In [13]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator

github_client = RESTClient(
    base_url="https://pokeapi.co/api/v2",
    paginator=JSONResponsePaginator(next_url_path="next"),
    data_selector="results"
)

@dlt.resource
def get_pokemons():
    for page in github_client.paginate(
        "/pokemon",
        params={
            "limit": 100,
        },
    ):
        yield page

pipeline = dlt.pipeline(
    pipeline_name="get_pokemons",
    destination="duckdb",
    dataset_name="pokemons",
    progress='log'
)
load_info = pipeline.run(get_pokemons)
print(load_info)

----------------------------- Extract get_pokemons -----------------------------
Resources: 0/1 (0.0%) | Time: 1.01s | Rate: 0.00/s
get_pokemons: 600  | Time: 0.54s | Rate: 1101.31/s
Memory usage: 233.46 MB (20.70%) | CPU usage: 0.00%

----------------------------- Extract get_pokemons -----------------------------
Resources: 1/1 (100.0%) | Time: 1.71s | Rate: 0.58/s
get_pokemons: 1302  | Time: 1.24s | Rate: 1048.72/s
Memory usage: 233.93 MB (20.70%) | CPU usage: 0.00%

----------------------------- Extract get_pokemons -----------------------------
Resources: 1/1 (100.0%) | Time: 0.01s | Rate: 83.02/s
_dlt_pipeline_state: 1  | Time: 0.01s | Rate: 89.71/s
Memory usage: 233.93 MB (20.70%) | CPU usage: 0.00%

----------------- Normalize get_pokemons in 1717434546.0892334 -----------------
Files: 2/2 (100.0%) | Time: 0.38s | Rate: 5.23/s
Items: 1303  | Time: 0.38s | Rate: 3409.21/s
Memory usage: 234.09 MB (20.70%) | CPU usage: 0.00%

------------------- Load get_pokemons in 1717434546.089

In [14]:
db = duckdb.connect(database="get_pokemons.duckdb")
db.sql("DESCRIBE;")

┌──────────────┬──────────┬─────────────────────┬──────────────────────┬───────────────────────────────────┬───────────┐
│   database   │  schema  │        name         │     column_names     │           column_types            │ temporary │
│   varchar    │ varchar  │       varchar       │      varchar[]       │             varchar[]             │  boolean  │
├──────────────┼──────────┼─────────────────────┼──────────────────────┼───────────────────────────────────┼───────────┤
│ get_pokemons │ pokemons │ _dlt_loads          │ [load_id, schema_n…  │ [VARCHAR, VARCHAR, BIGINT, TIME…  │ false     │
│ get_pokemons │ pokemons │ _dlt_pipeline_state │ [version, engine_v…  │ [BIGINT, BIGINT, VARCHAR, VARCH…  │ false     │
│ get_pokemons │ pokemons │ _dlt_version        │ [version, engine_v…  │ [BIGINT, BIGINT, TIMESTAMP WITH…  │ false     │
│ get_pokemons │ pokemons │ get_pokemons        │ [name, url, _dlt_l…  │ [VARCHAR, VARCHAR, VARCHAR, VAR…  │ false     │
└──────────────┴──────────┴─────

In [15]:
response = db.sql("SELECT * FROM pokemons.get_pokemons;")
response.to_df()

Unnamed: 0,name,url,_dlt_load_id,_dlt_id
0,bulbasaur,https://pokeapi.co/api/v2/pokemon/1/,1717434546.0892334,cEd2QuCGsub8PQ
1,ivysaur,https://pokeapi.co/api/v2/pokemon/2/,1717434546.0892334,YjgDluXTjo+sAw
2,venusaur,https://pokeapi.co/api/v2/pokemon/3/,1717434546.0892334,0yUyMJkDO+TbVA
3,charmander,https://pokeapi.co/api/v2/pokemon/4/,1717434546.0892334,/3wTICZDapiWLQ
4,charmeleon,https://pokeapi.co/api/v2/pokemon/5/,1717434546.0892334,nzUz6co0cBX/bQ
...,...,...,...,...
1297,ogerpon-wellspring-mask,https://pokeapi.co/api/v2/pokemon/10273/,1717434546.0892334,/6uBh127nrhy+A
1298,ogerpon-hearthflame-mask,https://pokeapi.co/api/v2/pokemon/10274/,1717434546.0892334,hNr1RDjalSWfPg
1299,ogerpon-cornerstone-mask,https://pokeapi.co/api/v2/pokemon/10275/,1717434546.0892334,IbxuJIik0pSUtw
1300,terapagos-terastal,https://pokeapi.co/api/v2/pokemon/10276/,1717434546.0892334,6nYsFjKpJ3CeOA
