# First steps with Kedro

<img src="static/kedro-horizontal-color-on-light.png" width="400" alt="Kedro">

**Goal**: Create a classifier that predicts whether a flight will be delayed or not, using the [nycflights13 data](https://github.com/hadley/nycflights13).

To see the end result,

```
$ cd demo/delay-prediction
$ kedro viz run
```

<img src="static/kedro-final-pipeline.png" width="600" alt="Kedro final pipeline">

In [None]:
import ibis

ibis.options.interactive = True

## The `DataCatalog`

Kedro’s [Data Catalog](https://docs.kedro.org/en/latest/data/) is a registry of all data sources available for use by the project. It offers a separate place to declare details of the datasets your projects use. Kedro provides built-in datasets for different file types and file systems so you don’t have to write any of the logic for reading or writing data.

Kedro offers a range of datasets, including CSV, Excel, Parquet, Feather, HDF5, JSON, Pickle, SQL Tables, SQL Queries, Spark DataFrames, and more. They are supported with the APIs of pandas, spark, networkx, matplotlib, yaml, and beyond. It relies on fsspec to read and save data from a variety of data stores including local file systems, network file systems, cloud object stores, and Hadoop. You can pass arguments in to load and save operations, and use versioning and credentials for data access.

To start using the Data Catalog, create an instance of the `DataCatalog` class with a dictionary configuration as follows:

In [None]:
from kedro.io import DataCatalog

In [None]:
catalog = DataCatalog.from_config(
    {
        "flights": {
            "type": "ibis.TableDataset",
            "table_name": "flights",
            "connection": {
                "backend": "duckdb",
                "database": "nycflights13.ddb",
                "read_only": True,
            },
        }
    }
)

Each entry in the dictionary represents a **dataset**, and each dataset has a **type** as well as some extra properties. Datasets are Python classes that take care of all the I/O needs in Kedro. In this case, we're using `kedro_datasets.ibis.TableDataset`, you can read [its full documentation](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-3.0.1/api/kedro_datasets.ibis.TableDataset.html) online.

After the catalog is created, `catalog.list()` will yield a list of the available dataset names, which you can load using the `catalog.load(<dataset_name>)` method:

In [None]:
catalog.list()

In [None]:
flights = catalog.load("flights")

Notice that the resulting object is the exact same Ibis table we were using in the previous tutorial!

In [None]:
type(flights)

In [None]:
flights

## The `OmegaConfigLoader`

Instead of creating the Data Catalog by hand like this, Kedro usually stores configuration in YAML files. To load them, Kedro offers a [configuration loader](https://docs.kedro.org/en/latest/configuration/configuration_basics.html) based on the [Omegaconf](https://omegaconf.readthedocs.io/) library called the `OmegaConfigLoader`. This adds several interesting features, such as

- Consolidating different configuration files into one
- Substitution, templating
- [Resolvers](https://omegaconf.readthedocs.io/en/2.3_branch/custom_resolvers.html)
- And [much more](https://docs.kedro.org/en/latest/configuration/advanced_configuration.html)

To start using it, first dump the catalog configuration to a `catalog.yml` file, and then use `OmegaConfigLoader` as follows:

In [None]:
%%writefile catalog.yml
flights:
  type: ibis.TableDataset
  table_name: flights
  connection:
    backend: duckdb
    database: nycflights13.ddb
    read_only: true

In [None]:
from kedro.config import OmegaConfigLoader

config_loader = OmegaConfigLoader(
    conf_source=".",  # Directory where configuration files are located
    config_patterns={"catalog": ["catalog.yml"]},  # For simplicity for this demo
)

In [None]:
catalog_config = config_loader.get("catalog")
catalog_config

As you can see, `config_loader.get("catalog")` gets you the same dictionary we crafted by hand earlier.

However, hardcoding the database path like that seems like an invitation to trouble. Let's declare a variable `_root` inside the YAML file using Omegaconf syntax and load the catalog config again:

In [None]:
%%writefile catalog.yml
_root: /workspaces/kedro-ibis-tutorial

flights:
  type: ibis.TableDataset
  table_name: flights
  connection:
    backend: duckdb
    database: "${_root}/nycflights13.ddb"
    read_only: true

In [None]:
catalog_config = config_loader.get("catalog")
catalog_config

In [None]:
catalog = DataCatalog.from_config(catalog_config)

In [None]:
catalog.load("flights")

## Nodes and pipelines

Now comes the interesting part. Kedro structures the computation on Directed Acyclic Graphs (DAGs), which are created by instantiating `Pipeline` objects with a list of `Node`s. By linking the inputs and outpus of each node, Kedro is then able to perform a topological sort and produce a graph.

Let's start creating a trivial pipeline with 1 node. That 1 node will be a preprocessing function that will manipulate the `dep_time`, `arr_delay`, and `air_time` columns.

In [None]:
def preprocess_flights(table):
    return table.mutate(
        dep_time=(
            table.dep_time.lpad(4, "0").substr(0, 2)
            + ":"
            + table.dep_time.substr(-2, 2)
            + ":00"
        ).try_cast("time"),
        arr_delay=table.arr_delay.try_cast(int),
        air_time=table.air_time.try_cast(int),
    )

In [None]:
flights.select("year", "month", "day", "dep_time")

In [None]:
preprocess_flights(flights).select("year", "month", "day", "dep_time")

Notice that this is a plain Python function, receiving an Ibis table and returning another Ibis table.

Now, let's wrap it using the `node` convenience function from Kedro:

In [None]:
from kedro.pipeline import node

n0 = node(func=preprocess_flights, inputs="flights", outputs="preprocessed_flights")
n0

Conceptually, a `Node` is a wrapper around a Python function that defines a single step in a pipeline. It has inputs and outputs, which are the names of the Data Catalog datasets that the function will receive and return, respectively. Therefore, you could execute it as follows:

```python
n0.func(
    *[catalog.load(input_dataset) for input_dataset in n0.inputs],
)
```

Let's not do that though, Kedro will take care of it.

The next step is to assemble the pipeline. In this case, it will only have 1 node:

In [None]:
from kedro.pipeline import pipeline

pipe = pipeline([n0])
pipe

And finally, you can now execute the pipeline. For the purposes of this tutorial, you can use Kedro's `SequentialRunner` directly:

In [None]:
from kedro.runner import SequentialRunner

outputs = SequentialRunner().run(pipe, catalog=catalog)

The output of the `.run(...)` method will be "Any node outputs that cannot be processed by the `DataCatalog`". Since `preprocessed_flights` is not declared in the Data Catalog, it's right there in the dictionary:

In [None]:
outputs.keys()

In [None]:
outputs["preprocessed_flights"]

## Exercises

### Exercise 1

Complete the `catalog.yml` so that `weather` is included as well.

_Extra points_ if you factor the connection details in a variable.

In [None]:
%load solutions/catalog.yml


### Exercise 2

Complete the data processing pipeline by defining a `create_model_input_table_function`:

```python
def create_model_input_table(flights, weather) -> ir.Table:
    ...
```

(see the `join` explanation in the Ibis notebook)

and then recreate the pipeline so that it has two nodes.

_Extra points_ if your node drops the null values of the resulting table and selects only a subset of the columns.

In [None]:
%load solutions/nb03_ex01.py