# First Steps with Kedro

<img src="static/kedro-horizontal-color-on-light.png" alt="Kedro" width=400 />

First, let's install `uv` (a super fast `pip` replacement written in Rust) and our dependencies:

In [1]:
%pip install uv

Note: you may need to restart the kernel to use updated packages.


In [2]:
!uv pip install -r requirements.txt

[2mAudited [1m87 packages[0m [2min 54ms[0m[0m


In [3]:
import sys
sys.version

'3.11.9 (main, Apr  2 2024, 08:25:04) [Clang 15.0.0 (clang-1500.3.9.4)]'

## An example with the PyPI stats dataset

https://pypi.org/help/#statistics

> You can analyze PyPI project/package metadata and download usage statistics via our public dataset on Google BigQuery.

https://packaging.python.org/en/latest/guides/analyzing-pypi-package-downloads/

The dataset can be found at https://console.cloud.google.com/bigquery?p=bigquery-public-data&d=pypi&page=dataset

First, make sure credentials are in place:

In [4]:
import os
import pathlib

if not os.getenv("GOOGLE_APPLICATION_CREDENTIALS") or not pathlib.Path(os.environ["GOOGLE_APPLICATION_CREDENTIALS"]).exists():
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "kedro-pypi-stats-xxx.json"

Then, check that the BigQuery client is properly authenticated:

In [5]:
from google.cloud import bigquery

client = bigquery.Client()

query_job = client.query("""
SELECT COUNT(*) AS num_downloads
FROM `bigquery-public-data.pypi.file_downloads`
WHERE file.project = 'kedro'
  -- Only query the last day of history
  AND DATE(timestamp)
    BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
    AND CURRENT_DATE()""")

results = query_job.result()  # Waits for job to complete.
for row in results:
    print("{} downloads".format(row.num_downloads))

27007 downloads


## The `DataCatalog`

Kedro’s [Data Catalog](https://docs.kedro.org/en/latest/data/) is a registry of all data sources available for use by the project. It offers a separate place to declare details of the datasets your projects use. Kedro provides built-in datasets for different file types and file systems so you don’t have to write any of the logic for reading or writing data.

Kedro offers a range of datasets, including CSV, Excel, Parquet, Feather, HDF5, JSON, Pickle, SQL Tables, SQL Queries, Spark DataFrames, and more. They are supported with the APIs of pandas, spark, networkx, matplotlib, yaml, and beyond. It relies on fsspec to read and save data from a variety of data stores including local file systems, network file systems, cloud object stores, and Hadoop. You can pass arguments in to load and save operations, and use versioning and credentials for data access.

To start using the Data Catalog, create an instance of the `DataCatalog` class with a dictionary configuration as follows:

In [6]:
import logging

logging.getLogger().setLevel(logging.INFO)

In [7]:
from kedro.io import DataCatalog

In [8]:
catalog = DataCatalog.from_config(
    {
        "pypi_kedro_demo": {
            "type": "polars.EagerPolarsDataset",
            "file_format": "parquet",
            "filepath": "data/00_demo/pypi_kedro_demo.pq",
        }
    }
)

Each entry in the dictionary represents a **dataset**, and each dataset has a **type** as well as some extra properties. Datasets are Python classes that take care of all the I/O needs in Kedro. In this case, we're using `kedro_datasets.polars.EagerPolarsDataset`, you can read [its full documentation](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-4.0.0/api/kedro_datasets.polars.EagerPolarsDataset.html) online.

After the catalog is created, `catalog.list()` will yield a list of the available dataset names, which you can load using the `catalog.load(<dataset_name>)` method:

In [9]:
catalog.list()

['pypi_kedro_demo']

In [10]:
import warnings

warnings.filterwarnings("ignore")

In [11]:
pypi_kedro_demo = catalog.load("pypi_kedro_demo")

In [12]:
type(pypi_kedro_demo)

polars.dataframe.frame.DataFrame

In [13]:
pypi_kedro_demo.head()

timestamp,project_name,version,major_version,type,installer,python,major_python,implementation,distro,system,cpu
"datetime[μs, UTC]",str,str,str,str,struct[2],str,str,struct[2],struct[4],struct[2],str
2024-07-01 23:32:58 UTC,"""kedro""","""0.17.7""","""0.17""","""bdist_wheel""","{""pip"",""22.0.3""}","""3.8.5""","""3.8""","{""CPython"",""3.8.5""}","{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}}","{""Linux"",""4.14.262-200.489.amzn2.x86_64""}","""x86_64"""
2024-07-01 23:33:01 UTC,"""kedro""","""0.17.7""","""0.17""","""bdist_wheel""","{""pip"",""22.0.3""}","""3.8.5""","""3.8""","{""CPython"",""3.8.5""}","{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}}","{""Linux"",""4.14.262-200.489.amzn2.x86_64""}","""x86_64"""
2024-07-01 23:30:20 UTC,"""kedro""","""0.17.7""","""0.17""","""bdist_wheel""","{""pip"",""22.0.3""}","""3.8.5""","""3.8""","{""CPython"",""3.8.5""}","{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}}","{""Linux"",""4.14.262-200.489.amzn2.x86_64""}","""x86_64"""
2024-07-01 23:33:48 UTC,"""kedro""","""0.17.7""","""0.17""","""bdist_wheel""","{""pip"",""22.0.3""}","""3.8.5""","""3.8""","{""CPython"",""3.8.5""}","{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}}","{""Linux"",""4.14.262-200.489.amzn2.x86_64""}","""x86_64"""
2024-07-01 23:21:22 UTC,"""kedro""","""0.18.0""","""0.18""","""bdist_wheel""","{""pip"",""24.1.1""}","""3.8.15""","""3.8""","{""CPython"",""3.8.15""}","{""Debian GNU/Linux"",""10"",""buster"",{""glibc"",""2.28""}}","{""Linux"",""5.10.0-0.deb10.16-amd64""}","""x86_64"""


## The `OmegaConfigLoader`

Instead of creating the Data Catalog by hand like this, Kedro usually stores configuration in YAML files. To load them, Kedro offers a [configuration loader](https://docs.kedro.org/en/latest/configuration/configuration_basics.html) based on the [Omegaconf](https://omegaconf.readthedocs.io/) library called the `OmegaConfigLoader`. This adds several interesting features, such as

- Consolidating different configuration files into one
- Substitution, templating
- [Resolvers](https://omegaconf.readthedocs.io/en/2.3_branch/custom_resolvers.html)
- And [much more](https://docs.kedro.org/en/latest/configuration/advanced_configuration.html)

To start using it, first dump the catalog configuration to a `catalog.yml` file, and then use `OmegaConfigLoader` as follows:

In [14]:
%%writefile catalog.yml
pypi_kedro_demo:
  type: polars.EagerPolarsDataset
  file_format: parquet
  filepath: data/00_demo/pypi_kedro_demo.pq

Overwriting catalog.yml


In [15]:
from kedro.config import OmegaConfigLoader

config_loader = OmegaConfigLoader(
    conf_source=".",  # Directory where configuration files are located
    config_patterns={"catalog": ["catalog.yml"]},  # To avoid conflict with catalogs in conf/ for now
)

In [16]:
catalog_config = config_loader.get("catalog")
catalog_config

{'pypi_kedro_demo': {'type': 'polars.EagerPolarsDataset',
  'file_format': 'parquet',
  'filepath': 'data/00_demo/pypi_kedro_demo.pq'}}

As you can see, `config_loader.get("catalog")` gets you the same dictionary we crafted by hand earlier.

However, hardcoding the local data path like that seems like an invitation to trouble. Let's declare a variable `_local_data` inside the YAML file using Omegaconf syntax and load the catalog config again:

In [17]:
%%writefile catalog.yml
_local_data: data

pypi_kedro_demo:
  type: polars.EagerPolarsDataset
  file_format: parquet
  filepath: ${_local_data}/00_demo/pypi_kedro_demo.pq

Overwriting catalog.yml


In [18]:
catalog_config = config_loader.get("catalog")
catalog_config

{'pypi_kedro_demo': {'type': 'polars.EagerPolarsDataset',
  'file_format': 'parquet',
  'filepath': 'data/00_demo/pypi_kedro_demo.pq'}}

In [19]:
catalog = DataCatalog.from_config(catalog_config)

In [20]:
catalog.load("pypi_kedro_demo")

timestamp,project_name,version,major_version,type,installer,python,major_python,implementation,distro,system,cpu
"datetime[μs, UTC]",str,str,str,str,struct[2],str,str,struct[2],struct[4],struct[2],str
2024-07-01 23:32:58 UTC,"""kedro""","""0.17.7""","""0.17""","""bdist_wheel""","{""pip"",""22.0.3""}","""3.8.5""","""3.8""","{""CPython"",""3.8.5""}","{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}}","{""Linux"",""4.14.262-200.489.amzn2.x86_64""}","""x86_64"""
2024-07-01 23:33:01 UTC,"""kedro""","""0.17.7""","""0.17""","""bdist_wheel""","{""pip"",""22.0.3""}","""3.8.5""","""3.8""","{""CPython"",""3.8.5""}","{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}}","{""Linux"",""4.14.262-200.489.amzn2.x86_64""}","""x86_64"""
2024-07-01 23:30:20 UTC,"""kedro""","""0.17.7""","""0.17""","""bdist_wheel""","{""pip"",""22.0.3""}","""3.8.5""","""3.8""","{""CPython"",""3.8.5""}","{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}}","{""Linux"",""4.14.262-200.489.amzn2.x86_64""}","""x86_64"""
2024-07-01 23:33:48 UTC,"""kedro""","""0.17.7""","""0.17""","""bdist_wheel""","{""pip"",""22.0.3""}","""3.8.5""","""3.8""","{""CPython"",""3.8.5""}","{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}}","{""Linux"",""4.14.262-200.489.amzn2.x86_64""}","""x86_64"""
2024-07-01 23:21:22 UTC,"""kedro""","""0.18.0""","""0.18""","""bdist_wheel""","{""pip"",""24.1.1""}","""3.8.15""","""3.8""","{""CPython"",""3.8.15""}","{""Debian GNU/Linux"",""10"",""buster"",{""glibc"",""2.28""}}","{""Linux"",""5.10.0-0.deb10.16-amd64""}","""x86_64"""


## Nodes and pipelines

Now comes the interesting part. Kedro structures the computation on Directed Acyclic Graphs (DAGs), which are created by instantiating `Pipeline` objects with a list of `Node`s. By linking the inputs and outpus of each node, Kedro is then able to perform a topological sort and produce a graph.

Let's start creating a trivial pipeline with 1 node. That 1 node will be a preprocessing function that will manipulate the `dep_time`, `arr_delay`, and `air_time` columns.

In [21]:
import polars as pl

def count_by_major_version(df: pl.DataFrame) -> pl.DataFrame:
    return df.group_by("major_version").agg(pl.len())

In [22]:
count_by_major_version(pypi_kedro_demo)

major_version,len
str,u32
"""0.17""",4
"""0.18""",1


Notice that this is a plain Python function, receiving an Polars DataFrame and returning another Polars DataFrame.

Now, let's wrap it using the `node` convenience function from Kedro:

In [23]:
from kedro.pipeline import node

n0 = node(
    func=count_by_major_version,
    inputs="pypi_kedro_demo",
    outputs="aggregate_pypi_kedro_demo"
)
n0

Node(count_by_major_version, 'pypi_kedro_demo', 'aggregate_pypi_kedro_demo', None)

Conceptually, a `Node` is a wrapper around a Python function that defines a single step in a pipeline. It has inputs and outputs, which are the names of the Data Catalog datasets that the function will receive and return, respectively. Therefore, you could execute it as follows:

```python
n0.func(
    *[catalog.load(input_dataset) for input_dataset in n0.inputs],
)
```

Let's not do that though; Kedro will take care of it.

The next step is to assemble the pipeline. In this case, it will only have 1 node:

In [24]:
from kedro.pipeline import pipeline

pipe = pipeline([n0])
pipe

Pipeline([
Node(count_by_major_version, 'pypi_kedro_demo', 'aggregate_pypi_kedro_demo', None)
])

And finally, you can now execute the pipeline. For the purposes of this tutorial, you can use Kedro's `SequentialRunner` directly:

In [25]:
from kedro.runner import SequentialRunner

outputs = SequentialRunner().run(pipe, catalog=catalog)

The output of the `.run(...)` method will be "Any node outputs that cannot be processed by the `DataCatalog`". Since `preprocessed_nyctaxi_trips` is not declared in the Data Catalog, it's right there in the dictionary:

In [26]:
outputs.keys()

[1;35mdict_keys[0m[1m([0m[1m[[0m[32m'aggregate_pypi_kedro_demo'[0m[1m][0m[1m)[0m

In [27]:
outputs["aggregate_pypi_kedro_demo"]

major_version,len
str,u32
"""0.17""",4
"""0.18""",1


## Connect to BigQuery

### a) Running a custom SQL query and retrieving the result as a Polars DataFrame

One way of connecting to `BigQuery` is writing the SQL query directly in the catalog. For that, a custom `PolarsBigQueryDataset` is available in `kedro_pypi_monitor`:

In [28]:
%%writefile catalog.yml
_local_data: data

pypi_kedro_raw:
  type: kedro_pypi_monitor.datasets.PolarsBigQueryDataset
  sql: >
    SELECT
      *
    FROM
      `bigquery-public-data.pypi.file_downloads`
    WHERE
      project = 'kedro'
      AND TIMESTAMP("2024-07-01") <= timestamp
      AND timestamp < TIMESTAMP("2024-07-02")
    LIMIT 5

pypi_kedro_demo:
  type: polars.EagerPolarsDataset
  file_format: parquet
  filepath: ${_local_data}/00_demo/pypi_kedro_demo.pq


Overwriting catalog.yml


In [29]:
catalog = DataCatalog.from_config(config_loader.get("catalog"))
catalog.list()

[1m[[0m[32m'pypi_kedro_raw'[0m, [32m'pypi_kedro_demo'[0m[1m][0m

In [30]:
pypi_kedro_raw = catalog.load("pypi_kedro_raw")
pypi_kedro_raw.head()

timestamp,country_code,url,project,file,details,tls_protocol,tls_cipher
"datetime[μs, UTC]",str,str,str,struct[4],struct[10],str,str
2024-07-01 23:32:58 UTC,"""US""","""/packages/d5/0e/4009cbb8d0d391…","""kedro""","{""kedro-0.17.7-py3-none-any.whl"",""kedro"",""0.17.7"",""bdist_wheel""}","{{""pip"",""22.0.3""},""3.8.5"",{""CPython"",""3.8.5""},{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}},{""Linux"",""4.14.262-200.489.amzn2.x86_64""},""x86_64"",""OpenSSL 1.0.2k-fips 26 Jan 2017"",""60.6.0"",null,null}","""TLSv1.2""","""ECDHE-RSA-AES128-GCM-SHA256"""
2024-07-01 23:33:01 UTC,"""US""","""/packages/d5/0e/4009cbb8d0d391…","""kedro""","{""kedro-0.17.7-py3-none-any.whl"",""kedro"",""0.17.7"",""bdist_wheel""}","{{""pip"",""22.0.3""},""3.8.5"",{""CPython"",""3.8.5""},{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}},{""Linux"",""4.14.262-200.489.amzn2.x86_64""},""x86_64"",""OpenSSL 1.0.2k-fips 26 Jan 2017"",""60.6.0"",null,null}","""TLSv1.2""","""ECDHE-RSA-AES128-GCM-SHA256"""
2024-07-01 23:30:20 UTC,"""US""","""/packages/d5/0e/4009cbb8d0d391…","""kedro""","{""kedro-0.17.7-py3-none-any.whl"",""kedro"",""0.17.7"",""bdist_wheel""}","{{""pip"",""22.0.3""},""3.8.5"",{""CPython"",""3.8.5""},{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}},{""Linux"",""4.14.262-200.489.amzn2.x86_64""},""x86_64"",""OpenSSL 1.0.2k-fips 26 Jan 2017"",""60.6.0"",null,null}","""TLSv1.2""","""ECDHE-RSA-AES128-GCM-SHA256"""
2024-07-01 23:33:48 UTC,"""US""","""/packages/d5/0e/4009cbb8d0d391…","""kedro""","{""kedro-0.17.7-py3-none-any.whl"",""kedro"",""0.17.7"",""bdist_wheel""}","{{""pip"",""22.0.3""},""3.8.5"",{""CPython"",""3.8.5""},{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}},{""Linux"",""4.14.262-200.489.amzn2.x86_64""},""x86_64"",""OpenSSL 1.0.2k-fips 26 Jan 2017"",""60.6.0"",null,null}","""TLSv1.2""","""ECDHE-RSA-AES128-GCM-SHA256"""
2024-07-01 23:21:22 UTC,"""US""","""/packages/db/2b/1a6f24002485d1…","""kedro""","{""kedro-0.18.0-py3-none-any.whl"",""kedro"",""0.18.0"",""bdist_wheel""}","{{""pip"",""24.1.1""},""3.8.15"",{""CPython"",""3.8.15""},{""Debian GNU/Linux"",""10"",""buster"",{""glibc"",""2.28""}},{""Linux"",""5.10.0-0.deb10.16-amd64""},""x86_64"",""OpenSSL 1.1.1u 30 May 2023"",""59.8.0"",null,null}","""TLSv1.3""","""TLS_AES_128_GCM_SHA256"""


### b) Returning a lazy view of the dataset thanks to Ibis

Kedro recently introduced support for Ibis thanks to its `kedro_datasets.ibis.TableDataset` ([documentation](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-4.0.0/api/kedro_datasets.ibis.TableDataset.html)), which allows you to leverage the rich ecosystem of SQL backends that Ibis has.

In [31]:
%%writefile catalog.yml
pypi_kedro_raw_lazy:
  type: ibis.TableDataset
  table_name: file_downloads
  connection:
    backend: bigquery
    dataset_id: bigquery-public-data.pypi

Overwriting catalog.yml


In [32]:
catalog = DataCatalog.from_config(config_loader.get("catalog"))
catalog.list()

['pypi_kedro_raw_lazy']

In [33]:
pypi_kedro_raw_lazy = catalog.load("pypi_kedro_raw_lazy")
pypi_kedro_raw_lazy.head()

I0000 00:00:1722581971.800550   36485 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
I0000 00:00:1722581971.823424   36485 check_gcp_environment_no_op.cc:29] ALTS: Platforms other than Linux and Windows are not supported


With Ibis, table manipulation happens in a way that's familiar for SQL and Python users, and a SQL query is generated under the hood:

In [34]:
import ibis

In [35]:
final_table = (
    pypi_kedro_raw_lazy
    .filter([
        pypi_kedro_raw_lazy.project == "kedro",
        ibis.timestamp("2024-07-01", timezone="UTC") <= pypi_kedro_raw_lazy.timestamp,
        pypi_kedro_raw_lazy.timestamp < ibis.timestamp("2024-07-02", timezone="UTC"),
    ])
    .head(5)
)

In [36]:
ibis.to_sql(final_table)

```sql
SELECT
  *
FROM `bigquery-public-data`.`pypi`.`file_downloads` AS `t0`
WHERE
  `t0`.`project` = 'kedro'
  AND TIMESTAMP('2024-07-01T00:00:00+00:00') <= `t0`.`timestamp`
  AND `t0`.`timestamp` < TIMESTAMP('2024-07-02T00:00:00+00:00')
LIMIT 5
```

Finally, you can return the result of your query using the `.to_polars()` method:

In [37]:
final_table.to_polars()

timestamp,country_code,url,project,file,details,tls_protocol,tls_cipher
"datetime[ns, UTC]",str,str,str,struct[4],struct[10],str,str
2024-07-01 23:32:58 UTC,"""US""","""/packages/d5/0e/4009cbb8d0d391…","""kedro""","{""kedro-0.17.7-py3-none-any.whl"",""kedro"",""0.17.7"",""bdist_wheel""}","{{""pip"",""22.0.3""},""3.8.5"",{""CPython"",""3.8.5""},{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}},{""Linux"",""4.14.262-200.489.amzn2.x86_64""},""x86_64"",""OpenSSL 1.0.2k-fips 26 Jan 2017"",""60.6.0"",null,null}","""TLSv1.2""","""ECDHE-RSA-AES128-GCM-SHA256"""
2024-07-01 23:33:01 UTC,"""US""","""/packages/d5/0e/4009cbb8d0d391…","""kedro""","{""kedro-0.17.7-py3-none-any.whl"",""kedro"",""0.17.7"",""bdist_wheel""}","{{""pip"",""22.0.3""},""3.8.5"",{""CPython"",""3.8.5""},{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}},{""Linux"",""4.14.262-200.489.amzn2.x86_64""},""x86_64"",""OpenSSL 1.0.2k-fips 26 Jan 2017"",""60.6.0"",null,null}","""TLSv1.2""","""ECDHE-RSA-AES128-GCM-SHA256"""
2024-07-01 23:30:20 UTC,"""US""","""/packages/d5/0e/4009cbb8d0d391…","""kedro""","{""kedro-0.17.7-py3-none-any.whl"",""kedro"",""0.17.7"",""bdist_wheel""}","{{""pip"",""22.0.3""},""3.8.5"",{""CPython"",""3.8.5""},{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}},{""Linux"",""4.14.262-200.489.amzn2.x86_64""},""x86_64"",""OpenSSL 1.0.2k-fips 26 Jan 2017"",""60.6.0"",null,null}","""TLSv1.2""","""ECDHE-RSA-AES128-GCM-SHA256"""
2024-07-01 23:33:48 UTC,"""US""","""/packages/d5/0e/4009cbb8d0d391…","""kedro""","{""kedro-0.17.7-py3-none-any.whl"",""kedro"",""0.17.7"",""bdist_wheel""}","{{""pip"",""22.0.3""},""3.8.5"",{""CPython"",""3.8.5""},{""Amazon Linux"",""2"",null,{""glibc"",""2.26""}},{""Linux"",""4.14.262-200.489.amzn2.x86_64""},""x86_64"",""OpenSSL 1.0.2k-fips 26 Jan 2017"",""60.6.0"",null,null}","""TLSv1.2""","""ECDHE-RSA-AES128-GCM-SHA256"""
2024-07-01 23:21:22 UTC,"""US""","""/packages/db/2b/1a6f24002485d1…","""kedro""","{""kedro-0.18.0-py3-none-any.whl"",""kedro"",""0.18.0"",""bdist_wheel""}","{{""pip"",""24.1.1""},""3.8.15"",{""CPython"",""3.8.15""},{""Debian GNU/Linux"",""10"",""buster"",{""glibc"",""2.28""}},{""Linux"",""5.10.0-0.deb10.16-amd64""},""x86_64"",""OpenSSL 1.1.1u 30 May 2023"",""59.8.0"",null,null}","""TLSv1.3""","""TLS_AES_128_GCM_SHA256"""
