In [None]:

# Define a data interface: better ones such as "CSVDataSet" are provided by Kedro

import pandas as pd
from pathlib import Path


class SimpleCSVLocalDataSet:

    def __init__(self, filepath, load_args{}, save_args={"index": False}):
        self._filepath = filepath
        self._load_args = load_args
        self._save_args = save_args

    def _load(self) -> pd.DataFrame:
        return pd.read_csv(self._filepath, **self._load_args)

    def _save(self, data: pd.DataFrame) -> None:
        save_path = Path(self._filepath)
        save_path.parent.mkdir(parents=True, exist_ok=True)
        data.to_csv(str(save_path), **self._save_args)


# Define a task

def do_some_transformation(df):
    # do some transformation here
    return df


# Configure data interface: can be written in catalog.yml using Kedro

input_dataset = SimpleCSVLocalDataSet(filepath="data/input.csv", load_args={"float_precision": high} save_args={"float_format": "%.16e"})
output_dataset = SimpleCSVLocalDataSet(filepath="data/output.csv")
save_output_flag = True


# Run tasks: can be configured as a pipeline using Kedro and written in parameters.yml using PipelineX
df_001 = input_dataset._load()
df_002 = do_some_transformation(df_001)
if save_output_flag:
    output_dataset._save(df_002)


In [None]:
from kedro.pipeline import Pipeline, node

from my_module import processing_task_1, processing_task_2, processing_task_3

def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=processing_task_1,
                inputs="input_data_1",
                outputs=["intermediate_data_1", "intermediate_data_2"],
            ),
            node(
                func=processing_task_2,
                inputs="intermediate_data_1",
                outputs="trivial_intermediate_data",
            ),
            node(
                func=processing_task_3,
                inputs="trivial_intermediate_data",
                outputs="output_data",
            ),
        ]
    )