Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catalog entries with factory patterns are not recognized by PipelineML #516

Closed
sebastiandro opened this issue Jan 16, 2024 · 2 comments · Fixed by #519
Closed

Catalog entries with factory patterns are not recognized by PipelineML #516

sebastiandro opened this issue Jan 16, 2024 · 2 comments · Fixed by #519
Labels
bug Something isn't working

Comments

@sebastiandro
Copy link
Contributor

sebastiandro commented Jan 16, 2024

Description

Hello there! First off, I want to thank you for this great plugin :)

The problem

I'm using modular pipelines and dataset factories. When using the pipeline_ml_factory to deploy pipelines to MLFlow, I've encountered issues with KedroMLFlow not recognizing catalogue entries with factory patterns.

Use case example:

Let's say we have a model pipeline that we want to run on different datasets. To differentiate the datasets, we use namespaces.

If we have the current setup:

model_pipeline = pipeline(
    [
        node(
            func=train,
            inputs="data",
            outputs="model",
            tags=["training"]
        ),
        node(
            func=infer,
            inputs=["model", "model_input"]
            outputs="prediction",
            tags=["inference"]
        ),
    ]
)

model_pipeline_a = pipeline(
    pipe=model_pipeline,
    namespace="dataset_a",
)

model_pipeline_b = pipeline(
    pipe=model_pipeline,
    namespace="dataset_b",
)

Our catalogue contains the following entries to make sure we are persisting the data and model separated by namespace:

{namespace}.data:
  type: pandas.CSVDataset
  filepath: data/01_raw/{namespace}/data.csv

{namespace}.model:
  type: pickle.PickleDataset
  filepath: data/02_models/{namespace}/model.pkl

If we setup the PipelineML instance for pipeline a:

deploy_pipeline_a = pipeline_ml_factory(
        training=model_pipeline_a.only_nodes_with_tags("training"),
        inference=model_pipeline_a.only_nodes_with_tags("inference"),
        input_name="dataset_a.model_input",
        log_model_kwargs=dict(
            artifact_path="artifacts",
            conda_env={
                "python": 3.10,
                "dependencies": [f"my_project=={PROJECT_VERSION}"],
            },
            signature=None,
        ),
    )

When running the pipeline, KedroMLFlow does not recognise dataset_a.model as an artefact that should be uploaded. It instead throws an error:

kedro_mlflow.mlflow.kedro_pipeline_model.KedroPipelineModelError: The provided catalog must contains 'dataset_a.model' dataset since it is the input of the pipeline.

It works if I add the full name to the catalogue:

dataset_a.model:
  type: pickle.PickleDataset
  filepath: data/02_models/dataset_a/model.pkl

But then I lose the benefits of the nice naming patterns :)

Context

This is important since I am working on a project where we re-use the model pipelines across many datasets. We separate the datasets using namespaces. Hence, it would greatly help to use the naming patterns to reduce the work every time a new dataset is added.

Possible Implementation

This line raises the exception:
https://github.com/Galileo-Galilei/kedro-mlflow/blob/master/kedro_mlflow/mlflow/kedro_pipeline_model.py#L119

It turns out that the DataCatalog passed to the KedroPipelineModel constructor in the after_pipeline_run hook does not contain catalogue entries with factory patterns:
https://github.com/Galileo-Galilei/kedro-mlflow/blob/e0033c5072c929a4c26cfaeaf61fcedf93d36522/kedro_mlflow/framework/hooks/mlflow_hook.py#L353C1-L366

My workaround for now is to update the DataCatalog to resolve the factory patterns before sending it to KedroPipelineModel:

I use this approach from kedro catalog resolve. I add a helper function to mlflow_hooks.py:

def _trim_filepath(self, project_path: str, file_path: str) -> str:
    return file_path.replace(project_path, "", 1)


def _resolve_catalog(self, context: KedroContext, catalog: DataCatalog, pipeline: Pipeline):
    catalog_config = context.config_loader["catalog"]
    explicit_datasets = {
        ds_name: ds_config
        for ds_name, ds_config in catalog_config.items()
        if not catalog._is_pattern(ds_name)
    }

    datasets = pipeline.datasets()
    for ds_name in datasets:
        is_param = ds_name.startswith("params:") or ds_name == "parameters"
        if ds_name in explicit_datasets or is_param:
            continue

        matched_pattern = catalog._match_pattern(
            catalog._dataset_patterns, ds_name
        )
        if matched_pattern:
            ds_config_copy = copy.deepcopy(
                catalog._dataset_patterns[matched_pattern]
            )

            ds_config = catalog._resolve_config(
                ds_name, matched_pattern, ds_config_copy
            )

            ds_config["filepath"] = self._trim_filepath(
                str(context.project_path) + "/", ds_config["filepath"]
            )
            explicit_datasets[ds_name] = ds_config

    return DataCatalog.from_config(explicit_datasets)

And use it like this in the after_pipeline_run hook:

...
resolved_catalog = self._resolve_catalog(self.context, catalog, pipeline)

with TemporaryDirectory() as tmp_dir:
    # This will be removed at the end of the context manager,
    # but we need to log in mlflow before moving the folder
    kedro_pipeline_model = KedroPipelineModel(
        pipeline=pipeline.inference,
        catalog=resolved_catalog,
        input_name=pipeline.input_name,
        **pipeline.kpm_kwargs,
    )

...

Could this be a potential solution to the problem? Or is there a simpler way that I have totally missed :)

@Galileo-Galilei
Copy link
Owner

Galileo-Galilei commented Jan 16, 2024

Glad to see you're using the pipeline_ml_factory, this is an underestimated feature of the plugin which is not well know I guess ;)

Good catch, I've seen a bunch of issues like this since the release of dataset factories. I think we can just call dataset.exists() for each datasets to force the catalog to materialize them, which should make your code much simpler, something like this :

takikadiri/kedro-boot@ac758fc

I'll try to push a fix soon, but PR are welcome!

@Galileo-Galilei Galileo-Galilei added the bug Something isn't working label Jan 16, 2024
@sebastiandro
Copy link
Contributor Author

Thank you for that suggestion, @Galileo-Galilei; that seemed to do the trick! I opened a PR: #519, but unfortunately, I missed linking to this issue or set you as a reviewer 🙈 I can't seem to edit it now after I opened it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants