# 3. Data and ML pipelines with Kedro

## The `DataCatalog`

Kedro’s [Data Catalog](https://docs.kedro.org/en/latest/data/) is a registry of all data sources available for use by the project. It offers a separate place to declare details of the datasets your projects use. Kedro provides built-in datasets for different file types and file systems so you don’t have to write any of the logic for reading or writing data.

Kedro offers a range of datasets, including CSV, Excel, Parquet, Feather, HDF5, JSON, Pickle, SQL Tables, SQL Queries, Spark DataFrames, and more. They are supported with the APIs of pandas, spark, networkx, matplotlib, yaml, and beyond. It relies on fsspec to read and save data from a variety of data stores including local file systems, network file systems, cloud object stores, and Hadoop. You can pass arguments in to load and save operations, and use versioning and credentials for data access.

To start using the Data Catalog, create an instance of the `DataCatalog` class with a dictionary configuration as follows:

In [1]:
from kedro.io import DataCatalog

In [2]:
# Using the Parquet file for now for simplicity
catalog = DataCatalog.from_config(
    {
        "reddit_submissions_raw": {
            "type": "polars.EagerPolarsDataset",
            "file_format": "parquet",
            "filepath": "submissions.pq",
        }
    },
)

Each entry in the dictionary represents a **dataset**, and each dataset has a **type** as well as some extra properties. Datasets are Python classes that take care of all the I/O needs in Kedro. In this case, we're using `kedro_datasets.ibis.TableDataset`, you can read [its full documentation](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-3.0.1/api/kedro_datasets.ibis.TableDataset.html) online.

After the catalog is created, `catalog.list()` will yield a list of the available dataset names, which you can load using the `catalog.load(<dataset_name>)` method:

In [3]:
catalog.list()

['reddit_submissions_raw']

In [4]:
df = catalog.load("reddit_submissions_raw")

Notice that the resulting object is the exact same Polars `DataFrame` we were using previously!

In [5]:
type(df)

polars.dataframe.frame.DataFrame

In [6]:
df.head()

title,author_name,creation_datetime,subreddit_name,num_comments,sfw,score,upvote_ratio,is_self,permalink,selftext,flair_text
str,str,"datetime[μs, UTC]",str,i64,bool,i64,f64,bool,str,str,str
"""WIBTA if I got back at my bfs …","""Gioyxp3ez3z11""",2024-07-08 05:16:00 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""WIBTA  So my Bf 20s was in a b…",
"""AITA for getting snarky with m…","""nointroductionssosa""",2024-07-08 05:15:57 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""Hi all. It’s a long one. I app…",
"""AITA for going on a holiday wh…","""Many-Yogurt2479""",2024-07-08 05:14:38 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""Me and a group of three friend…",
"""AITAH For arguing with family …","""PaleontologistJust39""",2024-07-08 05:14:05 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""Hello, I'm 19M and my mother h…",
"""AITA for trying to convince my…","""Throw-whyevenstart""",2024-07-08 05:11:09 UTC,"""r/AmItheAsshole""",4,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""My (38f) husband (40M) recentl…",


## The `OmegaConfigLoader`

Instead of creating the Data Catalog by hand like this, Kedro usually stores configuration in YAML files. To load them, Kedro offers a [configuration loader](https://docs.kedro.org/en/latest/configuration/configuration_basics.html) based on the [Omegaconf](https://omegaconf.readthedocs.io/) library called the `OmegaConfigLoader`. This adds several interesting features, such as

- Consolidating different configuration files into one
- Substitution, templating
- [Resolvers](https://omegaconf.readthedocs.io/en/2.3_branch/custom_resolvers.html)
- And [much more](https://docs.kedro.org/en/latest/configuration/advanced_configuration.html)

To start using it, first dump the catalog configuration to a `catalog.yml` file, and then use `OmegaConfigLoader` as follows:

In [7]:
%%writefile catalog.yml
reddit_submissions_raw:
  type: polars.EagerPolarsDataset
  file_format: parquet
  filepath: submissions.pq

Writing catalog.yml


In [8]:
from kedro.config import OmegaConfigLoader

config_loader = OmegaConfigLoader(
    conf_source=".",  # Directory where configuration files are located
    config_patterns={"catalog": ["catalog.yml"]},  # For simplicity for this notebook
)

In [9]:
catalog_config = config_loader.get("catalog")
catalog_config

{'reddit_submissions_raw': {'type': 'polars.EagerPolarsDataset',
  'file_format': 'parquet',
  'filepath': 'submissions.pq'}}

In [10]:
catalog = DataCatalog.from_config(catalog_config)

In [11]:
catalog.load("reddit_submissions_raw").head(1)

title,author_name,creation_datetime,subreddit_name,num_comments,sfw,score,upvote_ratio,is_self,permalink,selftext,flair_text
str,str,"datetime[μs, UTC]",str,i64,bool,i64,f64,bool,str,str,str
"""WIBTA if I got back at my bfs …","""Gioyxp3ez3z11""",2024-07-08 05:16:00 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""WIBTA  So my Bf 20s was in a b…",


## Nodes and pipelines

Now comes the interesting part. Kedro structures the computation on Directed Acyclic Graphs (DAGs), which are created by instantiating `Pipeline` objects with a list of `Node`s. By linking the inputs and outpus of each node, Kedro is then able to perform a topological sort and produce a graph.

Let's start creating a trivial pipeline with 2 nodes.

In [12]:
import polars as pl


def enrich_submissions(df: pl.DataFrame) -> pl.DataFrame:
    # Two types of posts: AITA and WIBTA https://www.reddit.com/r/AmItheAsshole/wiki/howtopost/
    enriched_df = (
        df.with_columns(
            pl.col("title").str.extract(r"^(AITA|WIBTA)", 1).alias("post_type"),
            pl.col("selftext").str.len_chars().alias("text_length"),
        )
    )
    return enriched_df

In [13]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer


def sentiment_analysis_by_sentences(df: pl.DataFrame) -> pl.DataFrame:
    sia = SentimentIntensityAnalyzer()
    sentences = (
        df.with_columns(
            pl.col("selftext").str.split(".").list.eval(pl.element().str.strip_chars()).alias("sentences")
        )
        .select(pl.col("permalink", "sentences"))
        .explode("sentences")
        .with_columns(
            pl.col("sentences").map_elements(
                lambda s: sia.polarity_scores(s),
                return_dtype=pl.Struct({"neg": pl.Float64, "neu": pl.Float64, "pos": pl.Float64, "compound": pl.Float64}),
            ).alias("sentiment_scores"),
        )
    )
    return sentences

In [14]:

def create_model_input_table(df: pl.DataFrame, sentences: pl.DataFrame) -> pl.DataFrame:
    return df.join(
        (
            sentences
            .group_by("permalink")
            .agg(
                pl.col("sentiment_scores").struct.field("compound").mean().alias("compound_sentiment"),
            )
        ),
        on="permalink",
        how="left",
    )

In [15]:
enrich_submissions(df).head(n=1)

title,author_name,creation_datetime,subreddit_name,num_comments,sfw,score,upvote_ratio,is_self,permalink,selftext,flair_text,post_type,text_length
str,str,"datetime[μs, UTC]",str,i64,bool,i64,f64,bool,str,str,str,str,u32
"""WIBTA if I got back at my bfs …","""Gioyxp3ez3z11""",2024-07-08 05:16:00 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""WIBTA  So my Bf 20s was in a b…",,"""WIBTA""",1373


In [16]:
sentiment_analysis_by_sentences(df).head(1)

permalink,sentences,sentiment_scores
str,str,struct[4]
"""/r/AmItheAsshole/comments/1dy0…","""WIBTA  So my Bf 20s was in a b…","{0.0,1.0,0.0,0.0}"


Notice that these are plain Python functions, receiving Polars DataFrames and returning more Polars DataFrames.

Now, let's wrap them using the `node` convenience function from Kedro:

In [17]:
from kedro.pipeline import node

node(
    func=enrich_submissions,
    inputs="reddit_submissions_raw",
    outputs="reddit_submissions_enriched",
)

Node(enrich_submissions, 'reddit_submissions_raw', 'reddit_submissions_enriched', None)

Conceptually, a `Node` is a wrapper around a Python function that defines a single step in a pipeline. It has inputs and outputs, which are the names of the Data Catalog datasets that the function will receive and return, respectively. Therefore, you could execute it as follows:

```python
n0.func(
    *[catalog.load(input_dataset) for input_dataset in n0.inputs],
)
```

Let's not do that though; Kedro will take care of it.

The next step is to assemble the pipeline:

In [18]:
from kedro.pipeline import pipeline


pipe = pipeline([
    node(
        func=enrich_submissions,
        inputs="reddit_submissions_raw",
        outputs="reddit_submissions_enriched",
    ),
    node(
        func=sentiment_analysis_by_sentences,
        inputs="reddit_submissions_raw",
        outputs="reddit_sentiment_by_sentences",
    ),
    node(
        func=create_model_input_table,
        inputs=["reddit_submissions_enriched", "reddit_sentiment_by_sentences"],
        outputs="reddit_model_input",
    ),
])
pipe

Pipeline([
Node(enrich_submissions, 'reddit_submissions_raw', 'reddit_submissions_enriched', None),
Node(sentiment_analysis_by_sentences, 'reddit_submissions_raw', 'reddit_sentiment_by_sentences', None),
Node(create_model_input_table, ['reddit_submissions_enriched', 'reddit_sentiment_by_sentences'], 'reddit_model_input', None)
])

And finally, you can now execute the pipeline. For the purposes of this tutorial, you can use Kedro's `SequentialRunner` directly:

In [19]:
import logging

logging.basicConfig(level=logging.INFO)

# Workaround: Restore logging handlers, see https://github.com/kedro-org/kedro/issues/3985
_old_handlers = logging.getLogger().handlers.copy()

import kedro.runner

logging.getLogger().handlers = _old_handlers

In [20]:
from kedro.runner import SequentialRunner

outputs = SequentialRunner().run(pipe, catalog=catalog)

INFO:kedro.runner.sequential_runner:Using synchronous mode for loading and saving data. Use the --async flag for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously
INFO:kedro.io.data_catalog:Loading data from [dark_orange]reddit_submissions_raw[/dark_orange] (EagerPolarsDataset)...
INFO:kedro.pipeline.node:Running node: enrich_submissions([reddit_submissions_raw]) -> [reddit_submissions_enriched]
INFO:kedro.io.data_catalog:Saving data to [dark_orange]reddit_submissions_enriched[/dark_orange] (MemoryDataset)...
INFO:kedro.runner.sequential_runner:Completed 1 out of 3 tasks
INFO:kedro.io.data_catalog:Loading data from [dark_orange]reddit_submissions_raw[/dark_orange] (EagerPolarsDataset)...
INFO:kedro.pipeline.node:Running node: sentiment_analysis_by_sentences([reddit_submissions_raw]) -> [reddit_sentiment_by_sentences]
INFO:kedro.io.data_catalog:Saving data to [dark_orange]reddit_sentiment_by_sentences[/dark

The output of the `.run(...)` method will be "Any node outputs that cannot be processed by the `DataCatalog`". Since `reddit_model_input` is not declared in the Data Catalog, it's right there in the dictionary:

In [21]:
outputs.keys()

[1;35mdict_keys[0m[1m([0m[1m[[0m[32m'reddit_model_input'[0m[1m][0m[1m)[0m

In [22]:
outputs["reddit_model_input"].head(5)

title,author_name,creation_datetime,subreddit_name,num_comments,sfw,score,upvote_ratio,is_self,permalink,selftext,flair_text,post_type,text_length,compound_sentiment
str,str,"datetime[μs, UTC]",str,i64,bool,i64,f64,bool,str,str,str,str,u32,f64
"""WIBTA if I got back at my bfs …","""Gioyxp3ez3z11""",2024-07-08 05:16:00 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""WIBTA  So my Bf 20s was in a b…",,"""WIBTA""",1373,-0.124925
"""AITA for getting snarky with m…","""nointroductionssosa""",2024-07-08 05:15:57 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""Hi all. It’s a long one. I app…",,"""AITA""",2717,0.130606
"""AITA for going on a holiday wh…","""Many-Yogurt2479""",2024-07-08 05:14:38 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""Me and a group of three friend…",,"""AITA""",1772,0.117583
"""AITAH For arguing with family …","""PaleontologistJust39""",2024-07-08 05:14:05 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""Hello, I'm 19M and my mother h…",,"""AITA""",736,-0.100257
"""AITA for trying to convince my…","""Throw-whyevenstart""",2024-07-08 05:11:09 UTC,"""r/AmItheAsshole""",4,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""My (38f) husband (40M) recentl…",,"""AITA""",1461,-0.016927


## Exercise 1

1. Create a `exclude_social_features` function that receives the raw data as a Polars `DataFrame` and returns another `Da only the `"score", "num_comments", "upvote_ratio"` columns.
2. Rewrite the pipeline so that this function is in the first node. _Hint: You will have to introduce another intermediate dataset_.

In [23]:
%load solutions/ex01_kedro_pipeline.py

## Appendix: Custom datasets

Sometimes there is not an appropriate dataset in `kedro_datasets` that suits our needs. In that cases we need to write our own.

Have a look at `reddit_a_predictor.datasets.PolarsDeltaDataset` to see how a simple, custom Kedro dataset works.

In [24]:
import os

In [25]:
%load_ext dotenv

In [26]:
%dotenv

In [27]:
minio_endpoint_url = os.environ["MINIO_ENDPOINT_URL"]
minio_access_key = os.environ["MINIO_KEY"]
minio_secret_id = os.environ["MINIO_SECRET"]

In [28]:
catalog = DataCatalog.from_config(
    {
        "reddit_submissions_raw": {
            "type": "reddit_a_predictor.datasets.PolarsDeltaDataset",
            "filepath": "s3://reddit-submissions/submissions-raw",
            "credentials": "minio_object_store",
            "storage_options": {
                "AWS_ALLOW_HTTP": "true",
                "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
                "AWS_EC2_METADATA_DISABLED": "true",
            }
        }
    },
    # Credentials are treated separately
    credentials={
        "minio_object_store": {
            "AWS_ENDPOINT_URL": minio_endpoint_url,
            "AWS_ACCESS_KEY_ID": minio_access_key,
            "AWS_SECRET_ACCESS_KEY": minio_secret_id,
        }
    }
)
catalog.load("reddit_submissions_raw").head(1)

INFO:kedro.io.data_catalog:Loading data from [dark_orange]reddit_submissions_raw[/dark_orange] (PolarsDeltaDataset)...


title,author_name,creation_datetime,subreddit_name,num_comments,sfw,score,upvote_ratio,is_self,permalink,selftext,flair_text
str,str,"datetime[μs, UTC]",str,i64,bool,i64,f64,bool,str,str,str
"""AITAH For arguing with family …","""PaleontologistJust39""",2024-07-08 05:14:05 UTC,"""r/AmItheAsshole""",2,True,1,1.0,True,"""/r/AmItheAsshole/comments/1dy0…","""Hello, I'm 19M and my mother h…",
