**Story:**

Let us simulate the following case:
I am a Data (ETL) Engineer and my colleague from Data Science department asks me to work with some data, they provide me with the data source, transformation rules and asks to load processed data into another database for further processing.
Then my colleague creates a ticket in Jira and assigns it to me.
I am starting...

*Another variation of the above case*: Data Scientist can perform functions of Data Engineer as well.

**Use Case:**

*Data (ETL) Engineer:*
1.  As source in this particular case uses sklearn.datasets library in Python;
2.  Works with Titanic dataset. This training set is widely used in education purposes;
3.  Performs transformation of data according to rules provided by Data Scientist;
4.  Prepares transformed data for the pipeline;
5.  Creates a pipeline;
6.  Runs it;
7.  Checks DuckDB as a destination: load is successful;
7.  Shares streamlit link with Data Scientist for further exploration;
8.  Monitors Pipeline's health

*Data Scientist:*
1. Defines source and destination;
2. Creates a set of transformation rules: convert categoricals to strings, remove certain columns, removes records with 0 values, change types of columns
2. Works with the loaded data via shared Streamlit link or directly in DuckDB

For the case I use dlt library


Now let us move to practice


Let us install the following libraries:

In [None]:
!pip install dlt
!pip install streamlit
!pip install duckdb
!pip install scikit-learn
!pip install numpy pandas


**In case one needs to UNINSTALL duckdb run the cell below. Otherwise skip!**

Just in case how to  uninstall the package, it might be needed during the below path:

In [None]:
!pip uninstall -y duckdb

Let us import all the libraries we need:

In [None]:
import pandas as pd # we use it for data pre-processing
from sklearn.datasets import fetch_openml # using fetch_openml we download titanic data set from OpenML repository
import dlt # data load tool
import duckdb # destination, where we load our pre-processed data
import streamlit # this is UI framework to have an access to results in DuckDB

Fetch dataset and make accessible for further operations:

In [10]:
titanic = fetch_openml('titanic', version=1, as_frame=True) # let us fix the version of dataset. So the data is constant all the way down.
titanic_data = titanic.frame # now it is pandas dataframe

Let us implement pre-processing with means of pandas library. With this step we prepare data according to request of Data Scientist.

At this stage also Feature Engineering could be applied.

Let us declare the source, for the case we use decorators and yield operator:


In [None]:
@dlt.resource #declaring resource, as I understand this decorator wraps up output as an input for the pipeline, at least it works like this without exceptions
def transform_data_with_pandas(titanic_data):

    # Here we do a few tricks on data: converting categorical columns to strings
    titanic_data['survived'] = titanic_data['survived'].astype(str)
    titanic_data['sex'] = titanic_data['sex'].astype(str)

    # Below we define columns to removed from the dataframe, we consider them non meaningful, just because I remember this data, but it also can be proved with scietific means
    columns_to_drop = ['embarked', 'parch', 'sibsp', 'ticket', 'boat', 'body', 'home.dest', 'name', 'cabin']
    titanic_data.drop(columns=columns_to_drop, inplace=True)

    # Here we convert age and fare into int64 for accuracy purpose, before it had double datatype and looked not friendly to human
    titanic_data['age'] = titanic_data['age'].fillna(0).astype('int64')
    titanic_data['fare'] = titanic_data['fare'].fillna(0).astype('int64')

    #This construction is required for resource declare, to feed it into pipeline it has to be a list of dictionaries.
    for record in titanic_data.to_dict(orient='records'):
        yield record

    #Initially I had this code instead yield with no return and it also worked:
    #titanic_list = titanic_data.to_dict(orient='records')
    #return titanic_list


Create and run the pipeline. At this moment schema qiuck_start and table passengers in duckDB are created, data is loaded:

In [None]:
# Create. I used name from example in docs as well as the code itself is taken from docs
pipeline = dlt.pipeline(
    pipeline_name="quick_start", destination="duckdb", dataset_name="titanic"
)
#load happens here. I use 'replace' which re-writes the whole dataset. Dataset is small compare to real-life, so it is ok.
load_info = pipeline.run(transform_data_with_pandas(titanic_data), table_name="passengers", write_disposition='replace')
print(load_info)

Present result in streamlit framework. It is a cool feature!

In [None]:
# Show the pipeline status. Here you can interact with duckDB via SQL queries, check he content of loaded data and other load info.
!dlt pipeline quick_start show