# First Steps with Kedro on Databricks

<img src="static/kedro-horizontal-color-on-light.png" alt="Kedro" width=400 />

First, let's install `uv` (a super fast `pip` replacement written in Rust) and our dependencies:

In [None]:
%pip install uv

In [None]:
!uv pip install -r requirements.txt

In [None]:
import sys
sys.version

'3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]'

## An example with the NYC Taxi dataset

The Unity Catalog contains an example dataset:

In [None]:
%sql
USE CATALOG samples;
USE SCHEMA nyctaxi;
SELECT * FROM trips LIMIT 5;

tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,fare_amount,pickup_zip,dropoff_zip
2016-02-14T16:52:13Z,2016-02-14T17:16:04Z,4.94,19.0,10282,10171
2016-02-04T18:44:19Z,2016-02-04T18:46:00Z,0.28,3.5,10110,10110
2016-02-17T17:13:57Z,2016-02-17T17:17:55Z,0.7,5.0,10103,10023
2016-02-18T10:36:07Z,2016-02-18T10:41:45Z,0.8,6.0,10022,10017
2016-02-22T14:14:41Z,2016-02-22T14:31:52Z,4.51,17.0,10110,10282


## The `DataCatalog`

Kedro’s [Data Catalog](https://docs.kedro.org/en/latest/data/) is a registry of all data sources available for use by the project. It offers a separate place to declare details of the datasets your projects use. Kedro provides built-in datasets for different file types and file systems so you don’t have to write any of the logic for reading or writing data.

Kedro offers a range of datasets, including CSV, Excel, Parquet, Feather, HDF5, JSON, Pickle, SQL Tables, SQL Queries, Spark DataFrames, and more. They are supported with the APIs of pandas, spark, networkx, matplotlib, yaml, and beyond. It relies on fsspec to read and save data from a variety of data stores including local file systems, network file systems, cloud object stores, and Hadoop. You can pass arguments in to load and save operations, and use versioning and credentials for data access.

To start using the Data Catalog, create an instance of the `DataCatalog` class with a dictionary configuration as follows:

In [None]:
import logging

logging.getLogger().setLevel(logging.INFO)

In [None]:
from kedro.io import DataCatalog

INFO:py4j.clientserver:Received command c on object id p0


In [None]:
catalog = DataCatalog.from_config(
    {
        "nyctaxi_trips": {
            "type": "databricks.ManagedTableDataset",
            "catalog": "samples",
            "database": "nyctaxi",
            "table": "trips",
        }
    }
)

INFO:py4j.clientserver:Received command c on object id p0


Each entry in the dictionary represents a **dataset**, and each dataset has a **type** as well as some extra properties. Datasets are Python classes that take care of all the I/O needs in Kedro. In this case, we're using `kedro_datasets.ibis.TableDataset`, you can read [its full documentation](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-3.0.1/api/kedro_datasets.ibis.TableDataset.html) online.

After the catalog is created, `catalog.list()` will yield a list of the available dataset names, which you can load using the `catalog.load(<dataset_name>)` method:

In [None]:
catalog.list()

INFO:py4j.clientserver:Received command c on object id p0


['nyctaxi_trips']

In [None]:
nyctaxi_trips = catalog.load("nyctaxi_trips")

INFO:kedro.io.data_catalog:Loading data from [dark_orange]nyctaxi_trips[/dark_orange] (ManagedTableDataset)...


Notice that the resulting object is the exact same Ibis table we were using in the previous tutorial!

In [None]:
type(nyctaxi_trips)

INFO:py4j.clientserver:Received command c on object id p0


pyspark.sql.dataframe.DataFrame

In [None]:
nyctaxi_trips.show(n=5)

INFO:py4j.clientserver:Received command c on object id p0


+--------------------+---------------------+-------------+-----------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
+--------------------+---------------------+-------------+-----------+----------+-----------+
| 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
| 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
| 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
| 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
| 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
+--------------------+---------------------+-------------+-----------+----------+-----------+
only showing top 5 rows



## The `OmegaConfigLoader`

Instead of creating the Data Catalog by hand like this, Kedro usually stores configuration in YAML files. To load them, Kedro offers a [configuration loader](https://docs.kedro.org/en/latest/configuration/configuration_basics.html) based on the [Omegaconf](https://omegaconf.readthedocs.io/) library called the `OmegaConfigLoader`. This adds several interesting features, such as

- Consolidating different configuration files into one
- Substitution, templating
- [Resolvers](https://omegaconf.readthedocs.io/en/2.3_branch/custom_resolvers.html)
- And [much more](https://docs.kedro.org/en/latest/configuration/advanced_configuration.html)

To start using it, first dump the catalog configuration to a `catalog.yml` file, and then use `OmegaConfigLoader` as follows:

In [None]:
%%writefile catalog.yml
nyctaxi_trips:
  type: databricks.ManagedTableDataset
  catalog: samples
  database: nyctaxi
  table: trips

INFO:py4j.clientserver:Received command c on object id p0


Overwriting catalog.yml


In [None]:
from kedro.config import OmegaConfigLoader

config_loader = OmegaConfigLoader(
    conf_source=".",  # Directory where configuration files are located
)

INFO:py4j.clientserver:Received command c on object id p0


In [None]:
catalog_config = config_loader.get("catalog")
catalog_config

INFO:py4j.clientserver:Received command c on object id p0


{'nyctaxi_trips': {'type': 'databricks.ManagedTableDataset',
  'catalog': 'samples',
  'database': 'nyctaxi',
  'table': 'trips'}}

As you can see, `config_loader.get("catalog")` gets you the same dictionary we crafted by hand earlier.

However, hardcoding the database path like that seems like an invitation to trouble. Let's declare a variable `_root` inside the YAML file using Omegaconf syntax and load the catalog config again:

In [None]:
%%writefile catalog.yml
_type: databricks.ManagedTableDataset

nyctaxi_trips:
  type: ${_type}
  catalog: samples
  database: nyctaxi
  table: trips

# Adding an extra dataset for demonstration purposes
tpch_orders:
  type: ${_type}
  catalog: samples
  database: tpch
  table: orders

INFO:py4j.clientserver:Received command c on object id p0


Overwriting catalog.yml


In [None]:
catalog_config = config_loader.get("catalog")
catalog_config

INFO:py4j.clientserver:Received command c on object id p0


{'nyctaxi_trips': {'type': 'databricks.ManagedTableDataset',
  'catalog': 'samples',
  'database': 'nyctaxi',
  'table': 'trips'},
 'tpch_orders': {'type': 'databricks.ManagedTableDataset',
  'catalog': 'samples',
  'database': 'tpch',
  'table': 'orders'}}

In [None]:
catalog = DataCatalog.from_config(catalog_config)

In [None]:
catalog.load("nyctaxi_trips")

INFO:py4j.clientserver:Received command c on object id p0
INFO:kedro.io.data_catalog:Loading data from [dark_orange]nyctaxi_trips[/dark_orange] (ManagedTableDataset)...


DataFrame[tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, trip_distance: double, fare_amount: double, pickup_zip: int, dropoff_zip: int]

In [None]:
tpch_orders = catalog.load("tpch_orders")
tpch_orders.show(n=5)

INFO:py4j.clientserver:Received command c on object id p0
INFO:kedro.io.data_catalog:Loading data from [dark_orange]tpch_orders[/dark_orange] (ManagedTableDataset)...


+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+--------------------+
|o_orderkey|o_custkey|o_orderstatus|o_totalprice|o_orderdate|o_orderpriority|        o_clerk|o_shippriority|           o_comment|
+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+--------------------+
|  13710944|   227285|            O|   162169.66| 1995-10-11|       1-URGENT|Clerk#000000432|             0|accounts. ruthles...|
|  13710945|   225010|            O|   252273.67| 1997-09-29|          5-LOW|Clerk#000002337|             0|ironic platelets ...|
|  13710946|   238820|            O|   179947.16| 1997-10-31|         2-HIGH|Clerk#000004135|             0|ole requests. reg...|
|  13710947|   581233|            O|    33843.49| 1995-05-25|         2-HIGH|Clerk#000000138|             0|arefully final pl...|
|  13710948|    10033|            O|    42500.65| 1995-09-04|4-NOT SPECIFIED|Clerk#0000033

## Nodes and pipelines

Now comes the interesting part. Kedro structures the computation on Directed Acyclic Graphs (DAGs), which are created by instantiating `Pipeline` objects with a list of `Node`s. By linking the inputs and outpus of each node, Kedro is then able to perform a topological sort and produce a graph.

Let's start creating a trivial pipeline with 1 node. That 1 node will be a preprocessing function that will manipulate the `dep_time`, `arr_delay`, and `air_time` columns.

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when

def preprocess_nyc_taxi_data(df: DataFrame) -> DataFrame:
    # Remove trips with no distance or fare amount
    cleaned_df = df.filter((col("trip_distance") > 0) & (col("fare_amount") > 0))

    # Add a new column for trip duration in minutes
    cleaned_df = cleaned_df.withColumn(
        "trip_duration_minutes",
        (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 60
    )

    # Filter out trips with unrealistic duration (> 0 and <= 600 minutes)
    cleaned_df = cleaned_df.filter(
        (col("trip_duration_minutes") > 0) & (col("trip_duration_minutes") <= 600)
    )

    return cleaned_df

INFO:py4j.clientserver:Received command c on object id p0


In [None]:
nyctaxi_trips.show(n=5)

INFO:py4j.clientserver:Received command c on object id p0


+--------------------+---------------------+-------------+-----------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
+--------------------+---------------------+-------------+-----------+----------+-----------+
| 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
| 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
| 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
| 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
| 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
+--------------------+---------------------+-------------+-----------+----------+-----------+
only showing top 5 rows



In [None]:
preprocess_nyc_taxi_data(nyctaxi_trips).show(n=5)

INFO:py4j.clientserver:Received command c on object id p0


+--------------------+---------------------+-------------+-----------+----------+-----------+---------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|trip_duration_minutes|
+--------------------+---------------------+-------------+-----------+----------+-----------+---------------------+
| 2016-02-16 22:40:45|  2016-02-16 22:59:25|         5.35|       18.5|     10003|      11238|   18.666666666666668|
| 2016-02-05 16:06:44|  2016-02-05 16:26:03|          6.5|       21.5|     10282|      10001|   19.316666666666666|
| 2016-02-08 07:39:25|  2016-02-08 07:44:14|          0.9|        5.5|     10119|      10003|    4.816666666666666|
| 2016-02-29 22:25:33|  2016-02-29 22:38:09|          3.5|       13.5|     10001|      11222|                 12.6|
| 2016-02-03 17:21:02|  2016-02-03 17:23:24|          0.3|        3.5|     10028|      10028|   2.3666666666666667|
+--------------------+---------------------+-------------+-----------+--

Notice that this is a plain Python function, receiving an Ibis table and returning another Ibis table.

Now, let's wrap it using the `node` convenience function from Kedro:

In [None]:
from kedro.pipeline import node

n0 = node(
    func=preprocess_nyc_taxi_data,
    inputs="nyctaxi_trips",
    outputs="preprocessed_nyctaxi_trips"
)
n0

INFO:py4j.clientserver:Received command c on object id p0


Node(preprocess_nyc_taxi_data, 'nyctaxi_trips', 'preprocessed_nyctaxi_trips', None)

Conceptually, a `Node` is a wrapper around a Python function that defines a single step in a pipeline. It has inputs and outputs, which are the names of the Data Catalog datasets that the function will receive and return, respectively. Therefore, you could execute it as follows:

```python
n0.func(
    *[catalog.load(input_dataset) for input_dataset in n0.inputs],
)
```

Let's not do that though; Kedro will take care of it.

The next step is to assemble the pipeline. In this case, it will only have 1 node:

In [None]:
from kedro.pipeline import pipeline

pipe = pipeline([n0])
pipe

INFO:py4j.clientserver:Received command c on object id p0


Pipeline([
Node(preprocess_nyc_taxi_data, 'nyctaxi_trips', 'preprocessed_nyctaxi_trips', None)
])

And finally, you can now execute the pipeline. For the purposes of this tutorial, you can use Kedro's `SequentialRunner` directly:

In [None]:
# Workaround: Restore logging handlers, see https://github.com/kedro-org/kedro/issues/3985

import logging

_old_handlers = logging.getLogger().handlers.copy()

import kedro.runner

logging.getLogger().handlers = _old_handlers

INFO:py4j.clientserver:Received command c on object id p0


In [None]:
from kedro.runner import SequentialRunner

outputs = SequentialRunner().run(pipe, catalog=catalog)

INFO:kedro.runner.sequential_runner:Using synchronous mode for loading and saving data. Use the --async flag for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously
INFO:kedro.io.data_catalog:Loading data from [dark_orange]nyctaxi_trips[/dark_orange] (ManagedTableDataset)...
INFO:kedro.pipeline.node:Running node: preprocess_nyc_taxi_data([nyctaxi_trips]) -> [preprocessed_nyctaxi_trips]
INFO:kedro.io.data_catalog:Saving data to [dark_orange]preprocessed_nyctaxi_trips[/dark_orange] (MemoryDataset)...
INFO:kedro.runner.sequential_runner:Completed 1 out of 1 tasks
INFO:kedro.runner.sequential_runner:Pipeline execution completed successfully.
INFO:kedro.io.data_catalog:Loading data from [dark_orange]preprocessed_nyctaxi_trips[/dark_orange] (MemoryDataset)...


The output of the `.run(...)` method will be "Any node outputs that cannot be processed by the `DataCatalog`". Since `preprocessed_nyctaxi_trips` is not declared in the Data Catalog, it's right there in the dictionary:

In [None]:
outputs.keys()

INFO:py4j.clientserver:Received command c on object id p0


dict_keys(['preprocessed_nyctaxi_trips'])

In [None]:
outputs["preprocessed_nyctaxi_trips"].show(n=5)

INFO:py4j.clientserver:Received command c on object id p0


+--------------------+---------------------+-------------+-----------+----------+-----------+---------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|trip_duration_minutes|
+--------------------+---------------------+-------------+-----------+----------+-----------+---------------------+
| 2016-02-16 22:40:45|  2016-02-16 22:59:25|         5.35|       18.5|     10003|      11238|   18.666666666666668|
| 2016-02-05 16:06:44|  2016-02-05 16:26:03|          6.5|       21.5|     10282|      10001|   19.316666666666666|
| 2016-02-08 07:39:25|  2016-02-08 07:44:14|          0.9|        5.5|     10119|      10003|    4.816666666666666|
| 2016-02-29 22:25:33|  2016-02-29 22:38:09|          3.5|       13.5|     10001|      11222|                 12.6|
| 2016-02-03 17:21:02|  2016-02-03 17:23:24|          0.3|        3.5|     10028|      10028|   2.3666666666666667|
+--------------------+---------------------+-------------+-----------+--