In [None]:
# required installations: dlt with duckDB as endpoint
!pip install "dlt[duckdb]"

In [3]:
# Sample data containing pokemon details
data = [
    {"id": "1", "name": "bulbasaur", "size": {"weight": 6.9, "height": 0.7}},
    {"id": "4", "name": "charmander", "size": {"weight": 8.5, "height": 0.6}},
    {"id": "25", "name": "pikachu", "size": {"weight": 6, "height": 0.4}},
]

In [4]:
import dlt

# Set pipeline name, destination, and dataset name
pipeline = dlt.pipeline(
    pipeline_name="quick_start",
    destination="duckdb",
    dataset_name="mydata",
)

> **What just happened?**:  
> The first run of a pipeline will scan the data that goes through it and generate a schema. To convert nested data into a relational format, dlt flattens dictionaries and unpacks nested lists into sub-tables.
>
> For this example `dlt` created a schema called 'mydata' with the table 'pokemon' in it and stored it in DuckDB.
>
>For detailed instructions on running a pipeline, see the documentation [here](https://dlthub.com/docs/walkthroughs/run-a-pipeline).


## **What is a `dlt` Pipeline?**

A [pipeline](https://dlthub.com/docs/general-usage/pipeline) is a connection that moves data from your Python code to a destination. The pipeline accepts dlt sources or resources, as well as generators, async generators, lists, and any iterables. Once the pipeline runs, all resources are evaluated and the data is loaded at the destination.

You instantiate a pipeline by calling the `dlt.pipeline` function with the following arguments:

* **`pipeline_name`**: This is the name you give to your pipeline. It helps you track and monitor your pipeline, and also helps to bring back its state and data structures for future runs. If you don't provide a name, dlt will use the name of the Python file you're running as the pipeline name.
* **`destination`**: a name of the destination to which dlt will load the data. It may also be provided to the run method of the pipeline.
* **`dataset_name`**: This is the name of the group of tables (or dataset) where your data will be sent. You can think of a dataset like a folder that holds many files, or a schema in a relational database. You can also specify this later when you run or load the pipeline. If you don't provide a name, it will default to the name of your pipeline.
* **`dev_mode`**: If you set this to True, dlt will add a timestamp to your dataset name every time you create a pipeline. This means a new dataset will be created each time you create a pipeline.

There are more arguments, but they are for advanced use, we skip it for now.

---

## **Run method**

To load the data, you call the `run()` method and pass your data in the data argument.

In [None]:
# Run the pipeline with data and table name
load_info = pipeline.run(data, table_name="pokemon")

print(load_info)

Commonly used arguments:

* **`data`** (the first argument) may be a dlt source, resource, generator function, or any Iterator or Iterable (i.e., a list or the result of the map function).
* **`write_disposition`** controls how to write data to a table. Defaults to the value "append".
  * `append` will always add new data at the end of the table.
  * `replace` will replace existing data with new data.
  * `skip` will prevent data from loading.
  * `merge` will deduplicate and merge data based on `primary_key` and `merge_key` hints.
* **`table_name`**: specified in cases when the table name cannot be inferred, i.e., from the resources or name of the generator function.

 ---
 ## **Explore the loaded data**

---
### **(1) DuckDB Connection**

Start a connection to your database using native `duckdb` connection and look what tables were generated:

!pip install pandas numpy matplotlib

In [9]:
import duckdb
import pandas as pd

# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,quick_start,mydata,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,quick_start,mydata,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,quick_start,mydata,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,quick_start,mydata,pokemon,"[id, name, size__weight, size__height, _dlt_lo...","[VARCHAR, VARCHAR, DOUBLE, DOUBLE, VARCHAR, VA...",False


You can see:
-  `pokemon` table,

and 3 special `dlt` tables (we will discuss them later):
- `_dlt_loads`,
- `_dlt_pipeline_state`,
- `_dlt_version`.

Let's execute a query to get all data from the `pokemon` table:

In [10]:
# Fetch all data from 'pokemon' as a DataFrame
table = conn.sql("SELECT * FROM pokemon").df()

# Display the DataFrame
table

Unnamed: 0,id,name,size__weight,size__height,_dlt_load_id,_dlt_id
0,1,bulbasaur,6.9,0.7,1736674586.39347,esXz5E/o3Mulmw
1,4,charmander,8.5,0.6,1736674586.39347,whgI7vqqbzeWmA
2,25,pikachu,6.0,0.4,1736674586.39347,52fwcBTFvbfupw


 ---
 ### **(2) `dlt`'s [sql_client](https://dlthub.com/docs/general-usage/dataset-access/sql-client)**

 Most dlt destinations (even filesystem) use an implementation of the `SqlClientBase` class to connect to the physical destination to which your data is loaded. You can access the SQL client of your destination via the `sql_client` method on your pipeline.

Start a connection to your database with `pipeline.sql_client()` and execute a query to get all data from the `pokemon` table:

In [11]:
# Query data from 'pokemon' using the SQL client
with pipeline.sql_client() as client:
    with client.execute_query("SELECT * FROM pokemon") as cursor:
        data = cursor.df()

# Display the data
data

Unnamed: 0,id,name,size__weight,size__height,_dlt_load_id,_dlt_id
0,1,bulbasaur,6.9,0.7,1736674586.39347,esXz5E/o3Mulmw
1,4,charmander,8.5,0.6,1736674586.39347,whgI7vqqbzeWmA
2,25,pikachu,6.0,0.4,1736674586.39347,52fwcBTFvbfupw


---
### **(3) dlt [datasets](https://dlthub.com/docs/general-usage/dataset-access/dataset)**

Here's an example of how to retrieve data from a pipeline and load it into a Pandas DataFrame or a PyArrow Table.

In [12]:
dataset = pipeline.dataset(dataset_type="default")
dataset.pokemon.df()

Unnamed: 0,id,name,size__weight,size__height,_dlt_load_id,_dlt_id
0,1,bulbasaur,6.9,0.7,1736674586.39347,esXz5E/o3Mulmw
1,4,charmander,8.5,0.6,1736674586.39347,whgI7vqqbzeWmA
2,25,pikachu,6.0,0.4,1736674586.39347,52fwcBTFvbfupw


---
# **Exercise 1**

Using the code from the previous cell, fetch the data from the `pokemon` table into a dataframe and count the number of columns in the table `pokemon`.

In [14]:
data

# Count the number of columns
num_columns = data.shape[1] 
print(f"The number of columns in the table is: {num_columns}")

The number of columns in the table is: 6
