From 8e10eea2e62fb43ca7ad25669a11abf58227bbce Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Thu, 28 Jul 2022 15:47:59 +0200 Subject: [PATCH 01/13] refactor: Changes for new kedro version --- docs/conf.py | 2 +- kedro_kubeflow/cli.py | 19 +- kedro_kubeflow/context_helper.py | 23 +- .../generators/one_pod_pipeline_generator.py | 3 +- .../pod_per_node_pipeline_generator.py | 9 +- kedro_kubeflow/utils.py | 2 +- setup.py | 10 +- tests/test_cli.py | 5 +- tests/test_context_helper.py | 57 +- tests/test_one_pod_pipeline_generator.py | 343 ++++++--- tests/test_pod_per_node_pipeline_generator.py | 701 +++++++++++------- 11 files changed, 740 insertions(+), 434 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 44b4c91..a7fb62f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -33,7 +33,7 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - # "sphinx.ext.autodoc", + "sphinx.ext.autodoc", # "sphinx.ext.napoleon", # "sphinx_autodoc_typehints", # "sphinx.ext.doctest", diff --git a/kedro_kubeflow/cli.py b/kedro_kubeflow/cli.py index 86f249a..e72fa4b 100644 --- a/kedro_kubeflow/cli.py +++ b/kedro_kubeflow/cli.py @@ -4,6 +4,7 @@ from pathlib import Path import click +from click import ClickException from .auth import AuthHandler from .config import PluginConfig @@ -311,7 +312,8 @@ def init(ctx, kfp_url: str, with_github_actions: bool): @click.pass_context def mlflow_start(ctx, kubeflow_run_id: str, output: str): import mlflow - from kedro_mlflow.framework.context import get_mlflow_config + + # from kedro_mlflow.framework.context import get_mlflow_config token = AuthHandler().obtain_id_token() if token: @@ -320,13 +322,16 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str): try: kedro_context = ctx.obj["context_helper"].context - mlflow_conf = get_mlflow_config(kedro_context) - mlflow_conf.setup(kedro_context) + # mlflow_conf = get_mlflow_config(kedro_context) + # mlflow_conf.setup(kedro_context) + mlflow_conf = kedro_context.mlflow + except AttributeError: - kedro_session = ctx.obj["context_helper"].session - with kedro_session: - mlflow_conf = get_mlflow_config(kedro_session) - mlflow_conf.setup() + # kedro_session = ctx.obj["context_helper"].session + # with kedro_session: + # mlflow_conf = get_mlflow_config(kedro_session) + # mlflow_conf.setup() + raise ClickException("Could not read MLFlow config") run = mlflow.start_run( experiment_id=mlflow_conf.experiment.experiment_id, nested=False diff --git a/kedro_kubeflow/context_helper.py b/kedro_kubeflow/context_helper.py index b6755e0..440af25 100644 --- a/kedro_kubeflow/context_helper.py +++ b/kedro_kubeflow/context_helper.py @@ -1,7 +1,9 @@ import os from functools import lru_cache from pathlib import Path -from typing import Dict, Iterable + +# from typing import Dict, Iterable +from typing import Any, Dict from kedro import __version__ as kedro_version from kedro.config import TemplatedConfigLoader @@ -18,8 +20,23 @@ class EnvTemplatedConfigLoader(TemplatedConfigLoader): # defaults provided so default variables ${commit_id|dirty} work for some entries ENV_DEFAULTS = {"commit_id": None, "branch_name": None} - def __init__(self, conf_paths: Iterable[str]): - super().__init__(conf_paths, globals_dict=self.read_env()) + def __init__( + self, + conf_source: str, + env: str = None, + runtime_params: Dict[str, Any] = None, + *, + base_env: str = "base", + default_run_env: str = "local" + ): + super().__init__( + conf_source, + env=env, + runtime_params=runtime_params, + globals_dict=self.read_env(), + base_env=base_env, + default_run_env=default_run_env, + ) def read_env(self) -> Dict: config = EnvTemplatedConfigLoader.ENV_DEFAULTS.copy() diff --git a/kedro_kubeflow/generators/one_pod_pipeline_generator.py b/kedro_kubeflow/generators/one_pod_pipeline_generator.py index 32fae02..0f0a594 100644 --- a/kedro_kubeflow/generators/one_pod_pipeline_generator.py +++ b/kedro_kubeflow/generators/one_pod_pipeline_generator.py @@ -1,5 +1,6 @@ import logging +from kedro.framework.context import KedroContext from kfp import dsl from ..utils import clean_name @@ -19,7 +20,7 @@ class OnePodPipelineGenerator(object): def __init__(self, config, project_name, context): self.project_name = project_name - self.context = context + self.context: KedroContext = context dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True self.run_config = config.run_config self.catalog = context.config_loader.get("catalog*") diff --git a/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py b/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py index dd2255d..d81cf90 100644 --- a/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py +++ b/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py @@ -2,6 +2,7 @@ from typing import Dict, Set import kubernetes.client as k8s +from kedro.framework.context import KedroContext from kedro.pipeline.node import Node from kfp import dsl @@ -22,7 +23,7 @@ class PodPerNodePipelineGenerator(object): def __init__(self, config, project_name, context): self.project_name = project_name - self.context = context + self.context: KedroContext = context dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True self.run_config = config.run_config self.catalog = context.config_loader.get("catalog*") @@ -41,13 +42,13 @@ def generate_pipeline(self, pipeline, image, image_pull_policy): ) @maybe_add_params(self.context.params) def convert_kedro_pipeline_to_kfp() -> None: + from kedro.framework.project import pipelines + """Convert from a Kedro pipeline into a kfp container graph.""" dsl.get_pipeline_conf().set_ttl_seconds_after_finished( self.run_config.ttl ) - node_dependencies = self.context.pipelines.get( - pipeline - ).node_dependencies + node_dependencies = pipelines[pipeline].node_dependencies with create_pipeline_exit_handler( pipeline, image, diff --git a/kedro_kubeflow/utils.py b/kedro_kubeflow/utils.py index e193775..ca47bcc 100644 --- a/kedro_kubeflow/utils.py +++ b/kedro_kubeflow/utils.py @@ -11,8 +11,8 @@ def clean_name(name: str) -> str: def is_mlflow_enabled() -> bool: try: + import kedro_mlflow # NOQA import mlflow # NOQA - from kedro_mlflow.framework.context import get_mlflow_config # NOQA return True except ImportError: diff --git a/setup.py b/setup.py index bf8ba05..347434e 100644 --- a/setup.py +++ b/setup.py @@ -6,8 +6,8 @@ # Runtime Requirements. INSTALL_REQUIRES = [ - "kedro>=0.16,<0.18", - "click<8.0", + "kedro>=0.18.1, <0.19.0", + "click>=8.0.4", "kfp>=1.8.12,<2.0", "tabulate>=0.8.7", "semver~=2.10", @@ -17,10 +17,10 @@ # Dev Requirements EXTRA_REQUIRE = { - "mlflow": ["kedro-mlflow>=0.4.1,<0.8.0"], + "mlflow": ["kedro-mlflow~=0.11.1"], "tests": [ - "pytest>=5.4.0, <8.0.0", - "pytest-cov>=2.8.0, <4.0.0", + "pytest>=7.0.0", + "pytest-cov>=2.8.0, <3.0.0", "pytest-subtests>=0.5.0, <1.0.0", "tox==3.25.1", "pre-commit==2.20.0", diff --git a/tests/test_cli.py b/tests/test_cli.py index 5d2ea80..21d618d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -278,12 +278,9 @@ def test_init_with_github_actions(self, cwd): assert "kedro kubeflow upload-pipeline" in content assert "kedro kubeflow schedule" in content - @patch("kedro_mlflow.framework.context.get_mlflow_config") @patch("mlflow.start_run") @patch("mlflow.set_tag") - def test_mlflow_start( - self, set_tag_mock, start_run_mock, get_mlflow_config_mock - ): + def test_mlflow_start(self, set_tag_mock, start_run_mock): context_helper = MagicMock(ContextHelper) config = dict(context_helper=context_helper) runner = CliRunner() diff --git a/tests/test_context_helper.py b/tests/test_context_helper.py index 7df7292..8c30096 100644 --- a/tests/test_context_helper.py +++ b/tests/test_context_helper.py @@ -1,10 +1,10 @@ -import os import unittest from unittest.mock import MagicMock, Mock, patch from kedro.framework.session import KedroSession from kedro_kubeflow.config import PluginConfig + from kedro_kubeflow.context_helper import ( ContextHelper, ContextHelper16, @@ -54,29 +54,32 @@ def test_config(self): assert helper.config == PluginConfig(**self.minimal_config()) -class TestEnvTemplatedConfigLoader(unittest.TestCase): - @staticmethod - def get_config(): - config_path = [os.path.dirname(os.path.abspath(__file__))] - loader = EnvTemplatedConfigLoader(config_path) - return loader.get("test_config.yml") - - def test_loader_with_defaults(self): - config = self.get_config() - assert config["run_config"]["image"] == "gcr.io/project-image/dirty" - assert config["run_config"]["experiment_name"] == "[Test] local" - assert config["run_config"]["run_name"] == "dirty" - - def test_loader_with_env(self): - with environment( - { - "KEDRO_CONFIG_COMMIT_ID": "123abc", - "KEDRO_CONFIG_BRANCH_NAME": "feature-1", - "KEDRO_CONFIG_XYZ123": "123abc", - } - ): - config = self.get_config() - - assert config["run_config"]["image"] == "gcr.io/project-image/123abc" - assert config["run_config"]["experiment_name"] == "[Test] feature-1" - assert config["run_config"]["run_name"] == "123abc" +# class TestEnvTemplatedConfigLoader(unittest.TestCase): +# @staticmethod +# def get_config(): +# config_path = [os.path.dirname(os.path.abspath(__file__))] +# # config_path = str(Path(os.path.dirname(os.path.abspath(__file__))) / "conf") +# # moze to? +# # loader = EnvTemplatedConfigLoader(config_path) +# loader = EnvTemplatedConfigLoader(config_path, default_run_env="base") +# return loader.get("test_config.yml") +# +# def test_loader_with_defaults(self): +# config = self.get_config() +# assert config["run_config"]["image"] == "gcr.io/project-image/dirty" +# assert config["run_config"]["experiment_name"] == "[Test] local" +# assert config["run_config"]["run_name"] == "dirty" +# +# def test_loader_with_env(self): +# with environment( +# { +# "KEDRO_CONFIG_COMMIT_ID": "123abc", +# "KEDRO_CONFIG_BRANCH_NAME": "feature-1", +# "KEDRO_CONFIG_XYZ123": "123abc", +# } +# ): +# config = self.get_config() +# +# assert config["run_config"]["image"] == "gcr.io/project-image/123abc" +# assert config["run_config"]["experiment_name"] == "[Test] feature-1" +# assert config["run_config"]["run_name"] == "123abc" diff --git a/tests/test_one_pod_pipeline_generator.py b/tests/test_one_pod_pipeline_generator.py index a8c6624..5b3071b 100644 --- a/tests/test_one_pod_pipeline_generator.py +++ b/tests/test_one_pod_pipeline_generator.py @@ -4,7 +4,7 @@ import os import unittest from inspect import signature -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import kfp from kedro.pipeline import Pipeline, node @@ -26,17 +26,30 @@ def test_support_modification_of_pull_policy(self): self.create_generator() # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Never" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Never" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert len(dsl_pipeline.ops) == 1 - assert dsl_pipeline.ops["pipeline"].container.image == "unittest-image" - assert ( - dsl_pipeline.ops["pipeline"].container.image_pull_policy == "Never" - ) + # then + assert len(dsl_pipeline.ops) == 1 + assert ( + dsl_pipeline.ops["pipeline"].container.image + == "unittest-image" + ) + assert ( + dsl_pipeline.ops["pipeline"].container.image_pull_policy + == "Never" + ) def test_should_support_params_and_inject_them_to_the_node(self): # given @@ -49,43 +62,50 @@ def test_should_support_params_and_inject_them_to_the_node(self): ) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - default_params = signature(pipeline).parameters - pipeline() + # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # then - assert len(default_params) == 3 - assert default_params["param1"].default == 0.3 - assert default_params["param2"].default == 42 - assert default_params["param3"].default == "2022-02-24" - assert dsl_pipeline.ops["pipeline"].container.args[1:] == [ - "param1", - "{{pipelineparam:op=;name=param1}}", - "param2", - "{{pipelineparam:op=;name=param2}}", - "param3", - "{{pipelineparam:op=;name=param3}}", - ] + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + default_params = signature(pipeline).parameters + pipeline() + + # then + assert len(default_params) == 3 + assert default_params["param1"].default == 0.3 + assert default_params["param2"].default == 42 + assert default_params["param3"].default == "2022-02-24" + assert dsl_pipeline.ops["pipeline"].container.args[1:] == [ + "param1", + "{{pipelineparam:op=;name=param1}}", + "param2", + "{{pipelineparam:op=;name=param2}}", + "param3", + "{{pipelineparam:op=;name=param3}}", + ] def test_should_use_default_resources_spec_if_not_requested(self): # given self.create_generator(config={}) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert dsl_pipeline.ops["pipeline"].container.resources is not None - assert dsl_pipeline.ops["pipeline"].container.resources.limits["cpu"] - assert dsl_pipeline.ops["pipeline"].container.resources.limits[ - "memory" - ] + # then + assert dsl_pipeline.ops["pipeline"].container.resources is None def test_should_add_resources_spec(self): # given @@ -99,33 +119,51 @@ def test_should_add_resources_spec(self): ) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - resources = dsl_pipeline.ops["pipeline"].container.resources - assert resources.limits == {"cpu": "100m", "memory": "8Gi"} - assert resources.requests == {"cpu": "100m", "memory": "8Gi"} + # then + resources = dsl_pipeline.ops["pipeline"].container.resources + assert resources.limits == {"cpu": "100m", "memory": "8Gi"} + assert resources.requests == {"cpu": "100m", "memory": "8Gi"} def test_should_not_add_retry_policy_if_not_requested(self): # given self.create_generator(config={}) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - op = dsl_pipeline.ops["pipeline"] - assert op.num_retries == 0 - assert op.retry_policy is None - assert op.backoff_factor is None - assert op.backoff_duration is None - assert op.backoff_max_duration is None + # then + op = dsl_pipeline.ops["pipeline"] + assert op.num_retries == 0 + assert op.retry_policy is None + assert op.backoff_factor is None + assert op.backoff_duration is None + assert op.backoff_max_duration is None def test_should_add_retry_policy(self): # given @@ -147,18 +185,27 @@ def test_should_add_retry_policy(self): ) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - op = dsl_pipeline.ops["pipeline"] - assert op.num_retries == 4 - assert op.retry_policy == "Always" - assert op.backoff_factor == 2 - assert op.backoff_duration == "60s" - assert op.backoff_max_duration is None + # then + op = dsl_pipeline.ops["pipeline"] + assert op.num_retries == 4 + assert op.retry_policy == "Always" + assert op.backoff_factor == 2 + assert op.backoff_duration == "60s" + assert op.backoff_max_duration is None def test_should_set_description(self): # given @@ -184,15 +231,24 @@ def test_artifact_registration(self): ) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert dsl_pipeline.ops["pipeline"].file_outputs == { - "B": "/home/kedro/data/02_intermediate/b.csv" - } + # then + assert dsl_pipeline.ops["pipeline"].file_outputs == { + "B": "/home/kedro/data/02_intermediate/b.csv" + } def test_should_skip_artifact_registration_if_requested(self): # given @@ -207,13 +263,22 @@ def test_should_skip_artifact_registration_if_requested(self): ) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert dsl_pipeline.ops["pipeline"].file_outputs == {} + # then + assert dsl_pipeline.ops["pipeline"].file_outputs == {} def test_should_pass_kedro_config_env_to_nodes(self): # given @@ -223,19 +288,28 @@ def test_should_pass_kedro_config_env_to_nodes(self): try: # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - env_values = { - e.name: e.value - for e in dsl_pipeline.ops["pipeline"].container.env - } - assert "KEDRO_CONFIG_MY_KEY" in env_values - assert env_values["KEDRO_CONFIG_MY_KEY"] == "42" - assert "SOME_VALUE" not in env_values + # then + env_values = { + e.name: e.value + for e in dsl_pipeline.ops["pipeline"].container.env + } + assert "KEDRO_CONFIG_MY_KEY" in env_values + assert env_values["KEDRO_CONFIG_MY_KEY"] == "42" + assert "SOME_VALUE" not in env_values finally: del os.environ["KEDRO_CONFIG_MY_KEY"] del os.environ["SOME_VALUE"] @@ -245,40 +319,59 @@ def test_should_pass_kubeflow_run_id_to_nodes(self): self.create_generator(params={"param1": 0.3, "param2": 42}) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - env_values = { - e.name: e.value for e in dsl_pipeline.ops["pipeline"].container.env - } - assert "KUBEFLOW_RUN_ID" in env_values - assert env_values["KUBEFLOW_RUN_ID"] == "{{workflow.uid}}" + # then + env_values = { + e.name: e.value + for e in dsl_pipeline.ops["pipeline"].container.env + } + assert "KUBEFLOW_RUN_ID" in env_values + assert env_values["KUBEFLOW_RUN_ID"] == "{{workflow.uid}}" def test_should_generate_exit_handler_if_requested(self): # given self.create_generator(config={"on_exit_pipeline": "notify_via_slack"}) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" ) - pipeline() + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert len(dsl_pipeline.ops) == 2 - assert "on-exit" in dsl_pipeline.ops - assert ( - dsl_pipeline.ops["on-exit"] - .container.command[-1] - .endswith( - "kedro run --config config.yaml " - "--env unittests --pipeline notify_via_slack" + # then + assert len(dsl_pipeline.ops) == 2 + assert "on-exit" in dsl_pipeline.ops + assert ( + dsl_pipeline.ops["on-exit"] + .container.command[-1] + .endswith( + "kedro run --config config.yaml " + "--env unittests --pipeline notify_via_slack" + ) ) - ) def test_should_generate_exit_handler_with_max_staleness(self): # given @@ -297,10 +390,10 @@ def test_should_generate_exit_handler_with_max_staleness(self): pipeline() assert ( - dsl_pipeline.ops[ - "on-exit" - ].execution_options.caching_strategy.max_cache_staleness - == "P0D" + dsl_pipeline.ops[ + "on-exit" + ].execution_options.caching_strategy.max_cache_staleness + == "P0D" ) def create_generator(self, config=None, params=None, catalog=None): @@ -319,16 +412,24 @@ def create_generator(self, config=None, params=None, catalog=None): "env": "unittests", "params": params, "config_loader": config_loader, - "pipelines": { - "pipeline": Pipeline( - [ - node(identity, "A", "B", name="node1"), - node(identity, "B", "C", name="node2"), - ] - ) - }, + # "pipelines": { + # "pipeline": Pipeline( + # [ + # node(identity, "A", "B", name="node1"), + # node(identity, "B", "C", name="node2"), + # ] + # ) + # }, }, ) + self.pipelines_under_test = { + "pipeline": Pipeline( + [ + node(identity, "A", "B", name="node1"), + node(identity, "B", "C", name="node2"), + ] + ) + } self.generator_under_test = OnePodPipelineGenerator( config=PluginConfig( **self.minimal_config( diff --git a/tests/test_pod_per_node_pipeline_generator.py b/tests/test_pod_per_node_pipeline_generator.py index 0e20ad3..15f252c 100644 --- a/tests/test_pod_per_node_pipeline_generator.py +++ b/tests/test_pod_per_node_pipeline_generator.py @@ -3,7 +3,7 @@ import os import unittest from inspect import signature -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import kfp from kedro.pipeline import Pipeline, node @@ -25,79 +25,113 @@ def test_support_modification_of_pull_policy(self): self.create_generator() # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Never" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Never" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Never" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert dsl_pipeline.ops["node1"].container.image == "unittest-image" - assert dsl_pipeline.ops["node1"].container.image_pull_policy == "Never" + # then + assert ( + dsl_pipeline.ops["node1"].container.image == "unittest-image" + ) + assert ( + dsl_pipeline.ops["node1"].container.image_pull_policy + == "Never" + ) def test_should_support_inter_steps_volume_with_defaults(self): # given self.create_generator(config={"volume": {}}) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "IfNotPresent" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - assert len(dsl_pipeline.ops) == 5 - assert "on-exit" in dsl_pipeline.ops - assert ( - dsl_pipeline.ops["on-exit"] - .container.command[-1] - .endswith( - "kedro kubeflow delete-pipeline-volume " - "{{workflow.name}}-pipeline-data-volume" + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "IfNotPresent" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "IfNotPresent" ) - ) - volume_spec = dsl_pipeline.ops["data-volume-create"].k8s_resource.spec - assert volume_spec.resources.requests["storage"] == "1Gi" - assert volume_spec.access_modes == ["ReadWriteOnce"] - assert volume_spec.storage_class_name is None - volume_init_spec = dsl_pipeline.ops["data-volume-init"].container - assert volume_init_spec.image == "unittest-image" - assert volume_init_spec.image_pull_policy == "IfNotPresent" - assert volume_init_spec.security_context.run_as_user == 0 - assert volume_init_spec.args[0].startswith("cp --verbose -r") - for node_name in ["data-volume-init", "node1", "node2"]: - volumes = dsl_pipeline.ops[node_name].container.volume_mounts - assert len(volumes) == 1 - assert volumes[0].name == "data-volume-create" + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() + + # then + assert len(dsl_pipeline.ops) == 5 + assert "on-exit" in dsl_pipeline.ops assert ( - dsl_pipeline.ops[ - node_name - ].container.security_context.run_as_user - == 0 + dsl_pipeline.ops["on-exit"] + .container.command[-1] + .endswith( + "kedro kubeflow delete-pipeline-volume " + "{{workflow.name}}-pipeline-data-volume" + ) ) + volume_spec = dsl_pipeline.ops[ + "data-volume-create" + ].k8s_resource.spec + assert volume_spec.resources.requests["storage"] == "1Gi" + assert volume_spec.access_modes == ["ReadWriteOnce"] + assert volume_spec.storage_class_name is None + volume_init_spec = dsl_pipeline.ops["data-volume-init"].container + assert volume_init_spec.image == "unittest-image" + assert volume_init_spec.image_pull_policy == "IfNotPresent" + assert volume_init_spec.security_context.run_as_user == 0 + assert volume_init_spec.args[0].startswith("cp --verbose -r") + for node_name in ["data-volume-init", "node1", "node2"]: + volumes = dsl_pipeline.ops[node_name].container.volume_mounts + assert len(volumes) == 1 + assert volumes[0].name == "data-volume-create" + assert ( + dsl_pipeline.ops[ + node_name + ].container.security_context.run_as_user + == 0 + ) def test_should_generate_on_exit_pipeline_run(self): # given self.create_generator(config={"on_exit_pipeline": "notify_via_slack"}) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "IfNotPresent" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "IfNotPresent" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "IfNotPresent" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert "on-exit" in dsl_pipeline.ops - assert ( - dsl_pipeline.ops["on-exit"] - .container.command[-1] - .endswith( - "kedro run --config config.yaml " - "--env unittests --pipeline notify_via_slack" + # then + assert "on-exit" in dsl_pipeline.ops + assert ( + dsl_pipeline.ops["on-exit"] + .container.command[-1] + .endswith( + "kedro run --config config.yaml " + "--env unittests --pipeline notify_via_slack" + ) ) - ) def test_should_generate_volume_removal_and_on_exit_pipeline_run(self): # given @@ -106,24 +140,33 @@ def test_should_generate_volume_removal_and_on_exit_pipeline_run(self): ) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "IfNotPresent" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "IfNotPresent" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "IfNotPresent" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert "on-exit" in dsl_pipeline.ops - assert ( - dsl_pipeline.ops["on-exit"] - .container.command[-1] - .endswith( - "kedro kubeflow delete-pipeline-volume " - "{{workflow.name}}-pipeline-data-volume;" - "kedro run --config config.yaml " - "--env unittests --pipeline notify_via_slack" + # then + assert "on-exit" in dsl_pipeline.ops + assert ( + dsl_pipeline.ops["on-exit"] + .container.command[-1] + .endswith( + "kedro kubeflow delete-pipeline-volume " + "{{workflow.name}}-pipeline-data-volume;" + "kedro run --config config.yaml " + "--env unittests --pipeline notify_via_slack" + ) ) - ) def test_should_support_inter_steps_volume_with_given_spec(self): # given @@ -138,19 +181,30 @@ def test_should_support_inter_steps_volume_with_given_spec(self): ) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert len(dsl_pipeline.ops) == 5 - assert "on-exit" in dsl_pipeline.ops - volume_spec = dsl_pipeline.ops["data-volume-create"].k8s_resource.spec - assert volume_spec.resources.requests["storage"] == "1Mi" - assert volume_spec.access_modes == ["ReadWriteOnce"] - assert volume_spec.storage_class_name == "nfs" + # then + assert len(dsl_pipeline.ops) == 5 + assert "on-exit" in dsl_pipeline.ops + volume_spec = dsl_pipeline.ops[ + "data-volume-create" + ].k8s_resource.spec + assert volume_spec.resources.requests["storage"] == "1Mi" + assert volume_spec.access_modes == ["ReadWriteOnce"] + assert volume_spec.storage_class_name == "nfs" def test_should_change_effective_user_if_to_volume_owner(self): # given @@ -166,22 +220,31 @@ def test_should_change_effective_user_if_to_volume_owner(self): ) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - volume_init_spec = dsl_pipeline.ops["data-volume-init"].container - assert volume_init_spec.security_context.run_as_user == 47 - for node_name in ["data-volume-init", "node1", "node2"]: - assert ( - dsl_pipeline.ops[ - node_name - ].container.security_context.run_as_user - == 47 + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() + + # then + volume_init_spec = dsl_pipeline.ops["data-volume-init"].container + assert volume_init_spec.security_context.run_as_user == 47 + for node_name in ["data-volume-init", "node1", "node2"]: + assert ( + dsl_pipeline.ops[ + node_name + ].container.security_context.run_as_user + == 47 + ) def test_should_add_mlflow_init_step_if_enabled(self): # given @@ -189,97 +252,135 @@ def test_should_add_mlflow_init_step_if_enabled(self): self.mock_mlflow(True) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - assert len(dsl_pipeline.ops) == 3 - init_step = dsl_pipeline.ops["mlflow-start-run"].container - assert init_step.image == "unittest-image" - assert init_step.args == [ - "kubeflow", - "--env", - "unittests", - "mlflow-start", - "{{workflow.uid}}", - ] - assert "MLFLOW_RUN_ID" not in {e.name for e in init_step.env} - for node_name in ["node1", "node2"]: - env = { - e.name: e.value - for e in dsl_pipeline.ops[node_name].container.env - } - assert "MLFLOW_RUN_ID" in env - assert ( - env["MLFLOW_RUN_ID"] - == "{{pipelineparam:op=mlflow-start-run;name=mlflow_run_id}}" + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() + + # then + assert len(dsl_pipeline.ops) == 3 + init_step = dsl_pipeline.ops["mlflow-start-run"].container + assert init_step.image == "unittest-image" + assert init_step.args == [ + "kubeflow", + "--env", + "unittests", + "mlflow-start", + "{{workflow.uid}}", + ] + assert "MLFLOW_RUN_ID" not in {e.name for e in init_step.env} + for node_name in ["node1", "node2"]: + env = { + e.name: e.value + for e in dsl_pipeline.ops[node_name].container.env + } + assert "MLFLOW_RUN_ID" in env + assert ( + env["MLFLOW_RUN_ID"] + == "{{pipelineparam:op=mlflow-start-run;name=mlflow_run_id}}" + ) def test_should_skip_volume_init_if_requested(self): # given self.create_generator(config={"volume": {"skip_init": True}}) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert len(dsl_pipeline.ops) == 4 - assert "data-volume-create" in dsl_pipeline.ops - assert "on-exit" in dsl_pipeline.ops - assert "data-volume-init" not in dsl_pipeline.ops - for node_name in ["node1", "node2"]: - volumes = dsl_pipeline.ops[node_name].container.volume_mounts - assert len(volumes) == 1 - assert volumes[0].name == "data-volume-create" + # then + assert len(dsl_pipeline.ops) == 4 + assert "data-volume-create" in dsl_pipeline.ops + assert "on-exit" in dsl_pipeline.ops + assert "data-volume-init" not in dsl_pipeline.ops + for node_name in ["node1", "node2"]: + volumes = dsl_pipeline.ops[node_name].container.volume_mounts + assert len(volumes) == 1 + assert volumes[0].name == "data-volume-create" def test_should_support_params_and_inject_them_to_the_nodes(self): # given self.create_generator(params={"param1": 0.3, "param2": 42}) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - default_params = signature(pipeline).parameters - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # default_params = signature(pipeline).parameters + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + default_params = signature(pipeline).parameters + pipeline() - # then - assert len(default_params) == 2 - assert default_params["param1"].default == 0.3 - assert default_params["param2"].default == 42 - for node_name in ["node1", "node2"]: - args = dsl_pipeline.ops[node_name].container.args - assert args == [ - "_", - "param1", - "{{pipelineparam:op=;name=param1}}", - "param2", - "{{pipelineparam:op=;name=param2}}", - ] + # then + assert len(default_params) == 2 + assert default_params["param1"].default == 0.3 + assert default_params["param2"].default == 42 + for node_name in ["node1", "node2"]: + args = dsl_pipeline.ops[node_name].container.args + assert args == [ + "_", + "param1", + "{{pipelineparam:op=;name=param1}}", + "param2", + "{{pipelineparam:op=;name=param2}}", + ] def test_should_fallbackto_default_resources_spec_if_not_requested(self): # given self.create_generator(config={}) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() + + # then + for node_name in ["node1", "node2"]: + spec = dsl_pipeline.ops[node_name].container + assert spec.resources is None - # then - for node_name in ["node1", "node2"]: - spec = dsl_pipeline.ops[node_name].container - assert spec.resources is not None def test_should_add_resources_spec(self): # given @@ -293,19 +394,28 @@ def test_should_add_resources_spec(self): ) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - node1_spec = dsl_pipeline.ops["node1"].container.resources - node2_spec = dsl_pipeline.ops["node2"].container.resources - assert node1_spec.limits == {"cpu": "400m", "memory": "64Gi"} - assert node1_spec.requests == {"cpu": "400m", "memory": "64Gi"} - assert node2_spec.limits == {"cpu": "100m"} - assert node2_spec.requests == {"cpu": "100m"} + # then + node1_spec = dsl_pipeline.ops["node1"].container.resources + node2_spec = dsl_pipeline.ops["node2"].container.resources + assert node1_spec.limits == {"cpu": "400m", "memory": "64Gi"} + assert node1_spec.requests == {"cpu": "400m", "memory": "64Gi"} + assert node2_spec.limits == {"cpu": "100m"} + assert node2_spec.requests == {"cpu": "100m"} def test_can_add_extra_volumes(self): self.create_generator( @@ -341,19 +451,28 @@ def test_should_not_add_retry_policy_if_not_requested(self): self.create_generator(config={}) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - for node_name in ["node1", "node2"]: - op = dsl_pipeline.ops[node_name] - assert op.num_retries == 0 - assert op.retry_policy is None - assert op.backoff_factor is None - assert op.backoff_duration is None - assert op.backoff_max_duration is None + # then + for node_name in ["node1", "node2"]: + op = dsl_pipeline.ops[node_name] + assert op.num_retries == 0 + assert op.retry_policy is None + assert op.backoff_factor is None + assert op.backoff_duration is None + assert op.backoff_max_duration is None def test_should_add_retry_policy(self): # given @@ -375,24 +494,33 @@ def test_should_add_retry_policy(self): ) # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - op1 = dsl_pipeline.ops["node1"] - assert op1.num_retries == 100 - assert op1.retry_policy == "Always" - assert op1.backoff_factor == 1 - assert op1.backoff_duration == "5m" - assert op1.backoff_max_duration is None - op2 = dsl_pipeline.ops["node2"] - assert op2.num_retries == 4 - assert op2.retry_policy == "Always" - assert op2.backoff_factor == 2 - assert op2.backoff_duration == "60s" - assert op2.backoff_max_duration is None + # then + op1 = dsl_pipeline.ops["node1"] + assert op1.num_retries == 100 + assert op1.retry_policy == "Always" + assert op1.backoff_factor == 1 + assert op1.backoff_duration == "5m" + assert op1.backoff_max_duration is None + op2 = dsl_pipeline.ops["node2"] + assert op2.num_retries == 4 + assert op2.retry_policy == "Always" + assert op2.backoff_factor == 2 + assert op2.backoff_duration == "60s" + assert op2.backoff_max_duration is None def test_should_add_max_cache_staleness(self): self.create_generator(config={"max_cache_staleness": "P0D"}) @@ -412,12 +540,21 @@ def test_should_set_description(self): self.create_generator(config={"description": "DESC"}) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Never" - ) + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Never" + # ) + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Never" + ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() - # then - assert pipeline._component_description == "DESC" + # then + assert pipeline._component_description == "DESC" def test_artifact_registration(self): # given @@ -431,19 +568,28 @@ def test_artifact_registration(self): ) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - outputs1 = dsl_pipeline.ops["node1"].file_outputs - assert len(outputs1) == 1 - assert "B" in outputs1 - assert outputs1["B"] == "/home/kedro/data/02_intermediate/b.csv" - outputs2 = dsl_pipeline.ops["node2"].file_outputs - assert len(outputs2) == 0 # output "C" is missing in the catalog + # then + outputs1 = dsl_pipeline.ops["node1"].file_outputs + assert len(outputs1) == 1 + assert "B" in outputs1 + assert outputs1["B"] == "/home/kedro/data/02_intermediate/b.csv" + outputs2 = dsl_pipeline.ops["node2"].file_outputs + assert len(outputs2) == 0 # output "C" is missing in the catalog def test_should_skip_artifact_registration_if_requested(self): # given @@ -458,29 +604,47 @@ def test_should_skip_artifact_registration_if_requested(self): ) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - outputs1 = dsl_pipeline.ops["node1"].file_outputs - assert len(outputs1) == 0 + # then + outputs1 = dsl_pipeline.ops["node1"].file_outputs + assert len(outputs1) == 0 def test_should_skip_volume_removal_if_requested(self): # given self.create_generator(config={"volume": {"keep": True}}) # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + # pipeline = self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # ) + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - # then - assert "schedule-volume-termination" not in dsl_pipeline.ops + # then + assert "schedule-volume-termination" not in dsl_pipeline.ops def test_should_pass_kedro_config_env_to_nodes(self): # given @@ -490,20 +654,29 @@ def test_should_pass_kedro_config_env_to_nodes(self): try: # when - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + # with kfp.dsl.Pipeline(None) as dsl_pipeline: + # self.generator_under_test.generate_pipeline( + # "pipeline", "unittest-image", "Always" + # )() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() - - # then - for node_name in ["node1", "node2"]: - env_values = { - e.name: e.value - for e in dsl_pipeline.ops[node_name].container.env - } - assert "KEDRO_CONFIG_MY_KEY" in env_values - assert env_values["KEDRO_CONFIG_MY_KEY"] == "42" - assert "SOME_VALUE" not in env_values + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() + + # then + for node_name in ["node1", "node2"]: + env_values = { + e.name: e.value + for e in dsl_pipeline.ops[node_name].container.env + } + assert "KEDRO_CONFIG_MY_KEY" in env_values + assert env_values["KEDRO_CONFIG_MY_KEY"] == "42" + assert "SOME_VALUE" not in env_values finally: del os.environ["KEDRO_CONFIG_MY_KEY"] del os.environ["SOME_VALUE"] @@ -519,16 +692,24 @@ def create_generator(self, config=None, params=None, catalog=None): "env": "unittests", "params": params or {}, "config_loader": config_loader, - "pipelines": { - "pipeline": Pipeline( - [ - node(identity, "A", "B", name="node1"), - node(identity, "B", "C", name="node2"), - ] - ) - }, + # "pipelines": { + # "pipeline": Pipeline( + # [ + # node(identity, "A", "B", name="node1"), + # node(identity, "B", "C", name="node2"), + # ] + # ) + # }, }, ) + self.pipelines_under_test = { + "pipeline": Pipeline( + [ + node(identity, "A", "B", name="node1"), + node(identity, "B", "C", name="node2"), + ] + ) + } self.generator_under_test = PodPerNodePipelineGenerator( PluginConfig( **self.minimal_config( From 3ec791a01f900481269ca165a7272feaf59df557 Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Thu, 28 Jul 2022 17:39:40 +0200 Subject: [PATCH 02/13] fix: Fixed test text_context_helper.py for new kedro version --- .gitignore | 2 + tests/{ => conf/base}/test_config.yml | 0 tests/test_context_helper.py | 60 ++++++++++++++------------- 3 files changed, 33 insertions(+), 29 deletions(-) rename tests/{ => conf/base}/test_config.yml (100%) diff --git a/.gitignore b/.gitignore index 8192175..f315e46 100644 --- a/.gitignore +++ b/.gitignore @@ -124,3 +124,5 @@ venv.bak/ .mypy_cache/ docs/_build + +spaceflights/ diff --git a/tests/test_config.yml b/tests/conf/base/test_config.yml similarity index 100% rename from tests/test_config.yml rename to tests/conf/base/test_config.yml diff --git a/tests/test_context_helper.py b/tests/test_context_helper.py index 8c30096..65c7f61 100644 --- a/tests/test_context_helper.py +++ b/tests/test_context_helper.py @@ -1,4 +1,6 @@ +import os import unittest +from pathlib import Path from unittest.mock import MagicMock, Mock, patch from kedro.framework.session import KedroSession @@ -12,6 +14,7 @@ ) from .common import MinimalConfigMixin + from .utils import environment @@ -54,32 +57,31 @@ def test_config(self): assert helper.config == PluginConfig(**self.minimal_config()) -# class TestEnvTemplatedConfigLoader(unittest.TestCase): -# @staticmethod -# def get_config(): -# config_path = [os.path.dirname(os.path.abspath(__file__))] -# # config_path = str(Path(os.path.dirname(os.path.abspath(__file__))) / "conf") -# # moze to? -# # loader = EnvTemplatedConfigLoader(config_path) -# loader = EnvTemplatedConfigLoader(config_path, default_run_env="base") -# return loader.get("test_config.yml") -# -# def test_loader_with_defaults(self): -# config = self.get_config() -# assert config["run_config"]["image"] == "gcr.io/project-image/dirty" -# assert config["run_config"]["experiment_name"] == "[Test] local" -# assert config["run_config"]["run_name"] == "dirty" -# -# def test_loader_with_env(self): -# with environment( -# { -# "KEDRO_CONFIG_COMMIT_ID": "123abc", -# "KEDRO_CONFIG_BRANCH_NAME": "feature-1", -# "KEDRO_CONFIG_XYZ123": "123abc", -# } -# ): -# config = self.get_config() -# -# assert config["run_config"]["image"] == "gcr.io/project-image/123abc" -# assert config["run_config"]["experiment_name"] == "[Test] feature-1" -# assert config["run_config"]["run_name"] == "123abc" +class TestEnvTemplatedConfigLoader(unittest.TestCase): + @staticmethod + def get_config(): + config_path = str( + Path(os.path.dirname(os.path.abspath(__file__))) / "conf" + ) + loader = EnvTemplatedConfigLoader(config_path, default_run_env="base") + return loader.get("test_config.yml") + + def test_loader_with_defaults(self): + config = self.get_config() + assert config["run_config"]["image"] == "gcr.io/project-image/dirty" + assert config["run_config"]["experiment_name"] == "[Test] local" + assert config["run_config"]["run_name"] == "dirty" + + def test_loader_with_env(self): + with environment( + { + "KEDRO_CONFIG_COMMIT_ID": "123abc", + "KEDRO_CONFIG_BRANCH_NAME": "feature-1", + "KEDRO_CONFIG_XYZ123": "123abc", + } + ): + config = self.get_config() + + assert config["run_config"]["image"] == "gcr.io/project-image/123abc" + assert config["run_config"]["experiment_name"] == "[Test] feature-1" + assert config["run_config"]["run_name"] == "123abc" From 8d85f07f7245b8d6c6922791b02da88877df552a Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Fri, 29 Jul 2022 11:37:23 +0200 Subject: [PATCH 03/13] fix: Working tests after merging, cleaned up commented code fragments --- kedro_kubeflow/cli.py | 8 - kedro_kubeflow/context_helper.py | 9 +- setup.py | 4 +- tests/test_one_pod_pipeline_generator.py | 115 +++++---------- tests/test_pod_per_node_pipeline_generator.py | 137 ++++-------------- 5 files changed, 66 insertions(+), 207 deletions(-) diff --git a/kedro_kubeflow/cli.py b/kedro_kubeflow/cli.py index e72fa4b..f6e2873 100644 --- a/kedro_kubeflow/cli.py +++ b/kedro_kubeflow/cli.py @@ -322,15 +322,8 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str): try: kedro_context = ctx.obj["context_helper"].context - # mlflow_conf = get_mlflow_config(kedro_context) - # mlflow_conf.setup(kedro_context) mlflow_conf = kedro_context.mlflow - except AttributeError: - # kedro_session = ctx.obj["context_helper"].session - # with kedro_session: - # mlflow_conf = get_mlflow_config(kedro_session) - # mlflow_conf.setup() raise ClickException("Could not read MLFlow config") run = mlflow.start_run( @@ -345,7 +338,6 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str): @kubeflow_group.command(hidden=True) @click.argument("pvc_name", type=str) def delete_pipeline_volume(pvc_name: str): - import kubernetes.client import kubernetes.config kubernetes.config.load_incluster_config() diff --git a/kedro_kubeflow/context_helper.py b/kedro_kubeflow/context_helper.py index 440af25..c86ec76 100644 --- a/kedro_kubeflow/context_helper.py +++ b/kedro_kubeflow/context_helper.py @@ -1,8 +1,5 @@ import os from functools import lru_cache -from pathlib import Path - -# from typing import Dict, Iterable from typing import Any, Dict from kedro import __version__ as kedro_version @@ -82,7 +79,7 @@ def context(self): @lru_cache() def config(self) -> PluginConfig: raw = EnvTemplatedConfigLoader( - self.context.config_loader.conf_paths + self.context.config_loader.conf_source ).get(self.CONFIG_FILE_PATTERN) return PluginConfig(**raw) @@ -115,6 +112,4 @@ def project_name(self): @property def context(self): - from kedro.framework.context import load_context - - return load_context(Path.cwd(), env=self._env) + return self.session.load_context() diff --git a/setup.py b/setup.py index 347434e..d8c9d68 100644 --- a/setup.py +++ b/setup.py @@ -19,8 +19,8 @@ EXTRA_REQUIRE = { "mlflow": ["kedro-mlflow~=0.11.1"], "tests": [ - "pytest>=7.0.0", - "pytest-cov>=2.8.0, <3.0.0", + "pytest>=5.4.0, <8.0.0", + "pytest-cov>=2.8.0, <4.0.0", "pytest-subtests>=0.5.0, <1.0.0", "tox==3.25.1", "pre-commit==2.20.0", diff --git a/tests/test_one_pod_pipeline_generator.py b/tests/test_one_pod_pipeline_generator.py index 5b3071b..0f3c2ca 100644 --- a/tests/test_one_pod_pipeline_generator.py +++ b/tests/test_one_pod_pipeline_generator.py @@ -26,13 +26,9 @@ def test_support_modification_of_pull_policy(self): self.create_generator() # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Never" - # )() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Never" @@ -43,12 +39,12 @@ def test_support_modification_of_pull_policy(self): # then assert len(dsl_pipeline.ops) == 1 assert ( - dsl_pipeline.ops["pipeline"].container.image - == "unittest-image" + dsl_pipeline.ops["pipeline"].container.image + == "unittest-image" ) assert ( - dsl_pipeline.ops["pipeline"].container.image_pull_policy - == "Never" + dsl_pipeline.ops["pipeline"].container.image_pull_policy + == "Never" ) def test_should_support_params_and_inject_them_to_the_node(self): @@ -62,11 +58,9 @@ def test_should_support_params_and_inject_them_to_the_node(self): ) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): with kfp.dsl.Pipeline(None) as dsl_pipeline: pipeline = self.generator_under_test.generate_pipeline( @@ -95,8 +89,8 @@ def test_should_use_default_resources_spec_if_not_requested(self): # when with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -105,7 +99,13 @@ def test_should_use_default_resources_spec_if_not_requested(self): pipeline() # then - assert dsl_pipeline.ops["pipeline"].container.resources is None + assert dsl_pipeline.ops["pipeline"].container.resources is not None + assert dsl_pipeline.ops["pipeline"].container.resources.limits[ + "cpu" + ] + assert dsl_pipeline.ops["pipeline"].container.resources.limits[ + "memory" + ] def test_should_add_resources_spec(self): # given @@ -119,13 +119,9 @@ def test_should_add_resources_spec(self): ) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -143,13 +139,9 @@ def test_should_not_add_retry_policy_if_not_requested(self): self.create_generator(config={}) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -185,13 +177,9 @@ def test_should_add_retry_policy(self): ) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -231,13 +219,9 @@ def test_artifact_registration(self): ) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -263,13 +247,9 @@ def test_should_skip_artifact_registration_if_requested(self): ) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -288,13 +268,9 @@ def test_should_pass_kedro_config_env_to_nodes(self): try: # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -319,13 +295,9 @@ def test_should_pass_kubeflow_run_id_to_nodes(self): self.create_generator(params={"param1": 0.3, "param2": 42}) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -346,14 +318,9 @@ def test_should_generate_exit_handler_if_requested(self): self.create_generator(config={"on_exit_pipeline": "notify_via_slack"}) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # pipeline() with patch( - "kedro.framework.project.pipelines", - new=self.pipelines_under_test, + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, ): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" @@ -366,8 +333,8 @@ def test_should_generate_exit_handler_if_requested(self): assert "on-exit" in dsl_pipeline.ops assert ( dsl_pipeline.ops["on-exit"] - .container.command[-1] - .endswith( + .container.command[-1] + .endswith( "kedro run --config config.yaml " "--env unittests --pipeline notify_via_slack" ) @@ -389,12 +356,12 @@ def test_should_generate_exit_handler_with_max_staleness(self): ) pipeline() - assert ( + assert ( dsl_pipeline.ops[ "on-exit" ].execution_options.caching_strategy.max_cache_staleness == "P0D" - ) + ) def create_generator(self, config=None, params=None, catalog=None): if config is None: @@ -412,14 +379,6 @@ def create_generator(self, config=None, params=None, catalog=None): "env": "unittests", "params": params, "config_loader": config_loader, - # "pipelines": { - # "pipeline": Pipeline( - # [ - # node(identity, "A", "B", name="node1"), - # node(identity, "B", "C", name="node2"), - # ] - # ) - # }, }, ) self.pipelines_under_test = { diff --git a/tests/test_pod_per_node_pipeline_generator.py b/tests/test_pod_per_node_pipeline_generator.py index 15f252c..2b56f55 100644 --- a/tests/test_pod_per_node_pipeline_generator.py +++ b/tests/test_pod_per_node_pipeline_generator.py @@ -25,11 +25,6 @@ def test_support_modification_of_pull_policy(self): self.create_generator() # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Never" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -54,11 +49,6 @@ def test_should_support_inter_steps_volume_with_defaults(self): self.create_generator(config={"volume": {}}) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "IfNotPresent" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -107,11 +97,6 @@ def test_should_generate_on_exit_pipeline_run(self): self.create_generator(config={"on_exit_pipeline": "notify_via_slack"}) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "IfNotPresent" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -140,11 +125,6 @@ def test_should_generate_volume_removal_and_on_exit_pipeline_run(self): ) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "IfNotPresent" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -181,11 +161,6 @@ def test_should_support_inter_steps_volume_with_given_spec(self): ) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -220,11 +195,6 @@ def test_should_change_effective_user_if_to_volume_owner(self): ) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -252,11 +222,6 @@ def test_should_add_mlflow_init_step_if_enabled(self): self.mock_mlflow(True) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -295,11 +260,6 @@ def test_should_skip_volume_init_if_requested(self): self.create_generator(config={"volume": {"skip_init": True}}) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -325,12 +285,6 @@ def test_should_support_params_and_inject_them_to_the_nodes(self): self.create_generator(params={"param1": 0.3, "param2": 42}) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # default_params = signature(pipeline).parameters - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -361,11 +315,6 @@ def test_should_fallbackto_default_resources_spec_if_not_requested(self): self.create_generator(config={}) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -379,8 +328,7 @@ def test_should_fallbackto_default_resources_spec_if_not_requested(self): # then for node_name in ["node1", "node2"]: spec = dsl_pipeline.ops[node_name].container - assert spec.resources is None - + assert spec.resources is not None def test_should_add_resources_spec(self): # given @@ -394,11 +342,6 @@ def test_should_add_resources_spec(self): ) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -437,24 +380,24 @@ def test_can_add_extra_volumes(self): } ) - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( + "pipeline", "unittest-image", "Always" + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - volume_mounts = dsl_pipeline.ops["node1"].container.volume_mounts - assert len(volume_mounts) == 1 + volume_mounts = dsl_pipeline.ops["node1"].container.volume_mounts + assert len(volume_mounts) == 1 def test_should_not_add_retry_policy_if_not_requested(self): # given self.create_generator(config={}) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -494,10 +437,6 @@ def test_should_add_retry_policy(self): ) # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -525,24 +464,27 @@ def test_should_add_retry_policy(self): def test_should_add_max_cache_staleness(self): self.create_generator(config={"max_cache_staleness": "P0D"}) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - self.generator_under_test.generate_pipeline( + with patch( + "kedro.framework.project.pipelines", + new=self.pipelines_under_test, + ): + pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Always" - )() + ) + with kfp.dsl.Pipeline(None) as dsl_pipeline: + pipeline() - op1 = dsl_pipeline.ops["node1"] - assert ( - op1.execution_options.caching_strategy.max_cache_staleness == "P0D" - ) + op1 = dsl_pipeline.ops["node1"] + assert ( + op1.execution_options.caching_strategy.max_cache_staleness + == "P0D" + ) def test_should_set_description(self): # given self.create_generator(config={"description": "DESC"}) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Never" - # ) with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -550,8 +492,6 @@ def test_should_set_description(self): pipeline = self.generator_under_test.generate_pipeline( "pipeline", "unittest-image", "Never" ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() # then assert pipeline._component_description == "DESC" @@ -568,11 +508,6 @@ def test_artifact_registration(self): ) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -604,11 +539,6 @@ def test_should_skip_artifact_registration_if_requested(self): ) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -628,11 +558,6 @@ def test_should_skip_volume_removal_if_requested(self): self.create_generator(config={"volume": {"keep": True}}) # when - # pipeline = self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # ) - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # pipeline() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -654,10 +579,6 @@ def test_should_pass_kedro_config_env_to_nodes(self): try: # when - # with kfp.dsl.Pipeline(None) as dsl_pipeline: - # self.generator_under_test.generate_pipeline( - # "pipeline", "unittest-image", "Always" - # )() with patch( "kedro.framework.project.pipelines", new=self.pipelines_under_test, @@ -692,14 +613,6 @@ def create_generator(self, config=None, params=None, catalog=None): "env": "unittests", "params": params or {}, "config_loader": config_loader, - # "pipelines": { - # "pipeline": Pipeline( - # [ - # node(identity, "A", "B", name="node1"), - # node(identity, "B", "C", name="node2"), - # ] - # ) - # }, }, ) self.pipelines_under_test = { From 8417d65b9aeb17a61c80bf8275d9ac5b42f402a4 Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Fri, 29 Jul 2022 11:39:10 +0200 Subject: [PATCH 04/13] fix: Excluded test one import --- tests/test_generator_utils.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/test_generator_utils.py b/tests/test_generator_utils.py index e96f9c6..5596ed3 100644 --- a/tests/test_generator_utils.py +++ b/tests/test_generator_utils.py @@ -5,8 +5,17 @@ from kedro_kubeflow.generators.utils import is_local_fs +def import_condition(): + try: + import gcsfs # NOQA + + return False + except ImportError: + return True + + +@unittest.skipIf(import_condition(), "Package gcsfs is not installed") class TestGeneratorUtils(unittest.TestCase): def test_is_local(self): - assert is_local_fs("data/test/file.txt") is True assert is_local_fs("gs://test-bucket/file.txt") is False From e99ac3e69e1d40f0d83ff0482ba91509f08242e3 Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Fri, 29 Jul 2022 15:55:44 +0200 Subject: [PATCH 05/13] fix: Deleted blank lines, undo changes in conf.py --- docs/conf.py | 2 +- tests/test_context_helper.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index a7fb62f..44b4c91 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -33,7 +33,7 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - "sphinx.ext.autodoc", + # "sphinx.ext.autodoc", # "sphinx.ext.napoleon", # "sphinx_autodoc_typehints", # "sphinx.ext.doctest", diff --git a/tests/test_context_helper.py b/tests/test_context_helper.py index 65c7f61..7ff1921 100644 --- a/tests/test_context_helper.py +++ b/tests/test_context_helper.py @@ -6,7 +6,6 @@ from kedro.framework.session import KedroSession from kedro_kubeflow.config import PluginConfig - from kedro_kubeflow.context_helper import ( ContextHelper, ContextHelper16, @@ -14,7 +13,6 @@ ) from .common import MinimalConfigMixin - from .utils import environment From c4e9debbbb19d5eaaaa8ec4accdeecab28708c2e Mon Sep 17 00:00:00 2001 From: Artur Dobrogowski Date: Fri, 29 Jul 2022 18:17:09 +0200 Subject: [PATCH 06/13] fix: Updated kedro version in e2e test --- .github/workflows/e2e-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 60a9015..815e34e 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -27,7 +27,7 @@ jobs: - name: Initialize kedro spaceflights project run: | - pip install . 'kedro<0.18' + pip install . 'kedro>=0.18,<0.19' kedro new --starter spaceflights --config tests/e2e/starter-config.yml --verbose - name: Install project dependencies From 715308b51d18b1637ac944c6c91ed8957e84951f Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Mon, 1 Aug 2022 09:15:41 +0200 Subject: [PATCH 07/13] fix: Update changelog, move import and clean code --- CHANGELOG.md | 1 + kedro_kubeflow/cli.py | 2 -- kedro_kubeflow/generators/pod_per_node_pipeline_generator.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f287b2..9151377 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Added e2e tests github action for pull requests with kubeflow setup in gcp - Added support for extra volumes per node - Refactored configuration classes to Pydantic +- Add support for `kedro>=0.18.1,<0.19` ## [0.6.4] - 2022-06-01 diff --git a/kedro_kubeflow/cli.py b/kedro_kubeflow/cli.py index f6e2873..2b33065 100644 --- a/kedro_kubeflow/cli.py +++ b/kedro_kubeflow/cli.py @@ -313,8 +313,6 @@ def init(ctx, kfp_url: str, with_github_actions: bool): def mlflow_start(ctx, kubeflow_run_id: str, output: str): import mlflow - # from kedro_mlflow.framework.context import get_mlflow_config - token = AuthHandler().obtain_id_token() if token: os.environ["MLFLOW_TRACKING_TOKEN"] = token diff --git a/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py b/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py index d81cf90..bb0f10d 100644 --- a/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py +++ b/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py @@ -3,6 +3,7 @@ import kubernetes.client as k8s from kedro.framework.context import KedroContext +from kedro.framework.project import pipelines from kedro.pipeline.node import Node from kfp import dsl @@ -42,7 +43,6 @@ def generate_pipeline(self, pipeline, image, image_pull_policy): ) @maybe_add_params(self.context.params) def convert_kedro_pipeline_to_kfp() -> None: - from kedro.framework.project import pipelines """Convert from a Kedro pipeline into a kfp container graph.""" dsl.get_pipeline_conf().set_ttl_seconds_after_finished( From 6bd0c3752ddb7c6b4db6ca07d3f4c0df59132b74 Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Mon, 1 Aug 2022 11:29:55 +0200 Subject: [PATCH 08/13] fix: Fixed local import, moved import to the top of the file --- kedro_kubeflow/auth.py | 6 +++--- kedro_kubeflow/cli.py | 4 ++-- kedro_kubeflow/context_helper.py | 3 +-- .../generators/pod_per_node_pipeline_generator.py | 5 +++-- kedro_kubeflow/generators/utils.py | 2 +- kedro_kubeflow/hooks.py | 3 ++- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/kedro_kubeflow/auth.py b/kedro_kubeflow/auth.py index 36ad561..2b198e8 100644 --- a/kedro_kubeflow/auth.py +++ b/kedro_kubeflow/auth.py @@ -4,6 +4,9 @@ from urllib.parse import urlsplit, urlunsplit import requests +from google.auth.exceptions import DefaultCredentialsError +from google.auth.transport.requests import Request +from google.oauth2 import id_token IAP_CLIENT_ID = "IAP_CLIENT_ID" DEX_USERNAME = "DEX_USERNAME" @@ -15,9 +18,6 @@ class AuthHandler(object): log = logging.getLogger(__name__) def obtain_id_token(self): - from google.auth.exceptions import DefaultCredentialsError - from google.auth.transport.requests import Request - from google.oauth2 import id_token client_id = os.environ.get(IAP_CLIENT_ID, None) diff --git a/kedro_kubeflow/cli.py b/kedro_kubeflow/cli.py index 2b33065..33fcad7 100644 --- a/kedro_kubeflow/cli.py +++ b/kedro_kubeflow/cli.py @@ -4,6 +4,8 @@ from pathlib import Path import click +import kubernetes.config +import mlflow from click import ClickException from .auth import AuthHandler @@ -311,7 +313,6 @@ def init(ctx, kfp_url: str, with_github_actions: bool): ) @click.pass_context def mlflow_start(ctx, kubeflow_run_id: str, output: str): - import mlflow token = AuthHandler().obtain_id_token() if token: @@ -336,7 +337,6 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str): @kubeflow_group.command(hidden=True) @click.argument("pvc_name", type=str) def delete_pipeline_volume(pvc_name: str): - import kubernetes.config kubernetes.config.load_incluster_config() current_namespace = open( diff --git a/kedro_kubeflow/context_helper.py b/kedro_kubeflow/context_helper.py index c86ec76..3fc0607 100644 --- a/kedro_kubeflow/context_helper.py +++ b/kedro_kubeflow/context_helper.py @@ -4,6 +4,7 @@ from kedro import __version__ as kedro_version from kedro.config import TemplatedConfigLoader +from kedro.framework.session import KedroSession from semver import VersionInfo from .config import PluginConfig @@ -63,8 +64,6 @@ def project_name(self): @property @lru_cache() def session(self): - from kedro.framework.session import KedroSession - return KedroSession.create(self._metadata.package_name, env=self._env) @property diff --git a/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py b/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py index bb0f10d..863037b 100644 --- a/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py +++ b/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py @@ -3,7 +3,6 @@ import kubernetes.client as k8s from kedro.framework.context import KedroContext -from kedro.framework.project import pipelines from kedro.pipeline.node import Node from kfp import dsl @@ -43,8 +42,10 @@ def generate_pipeline(self, pipeline, image, image_pull_policy): ) @maybe_add_params(self.context.params) def convert_kedro_pipeline_to_kfp() -> None: - """Convert from a Kedro pipeline into a kfp container graph.""" + + from kedro.framework.project import pipelines + dsl.get_pipeline_conf().set_ttl_seconds_after_finished( self.run_config.ttl ) diff --git a/kedro_kubeflow/generators/utils.py b/kedro_kubeflow/generators/utils.py index 4602c4d..c3bedf3 100644 --- a/kedro_kubeflow/generators/utils.py +++ b/kedro_kubeflow/generators/utils.py @@ -7,6 +7,7 @@ import fsspec import kubernetes.client as k8s +from fsspec.implementations.local import LocalFileSystem from kfp import dsl from kfp.compiler._k8s_helper import sanitize_k8s_name @@ -153,7 +154,6 @@ def customize_op(op, image_pull_policy, run_config: RunConfig): def is_local_fs(filepath): - from fsspec.implementations.local import LocalFileSystem file_open = fsspec.open(filepath) return isinstance(file_open.fs, LocalFileSystem) diff --git a/kedro_kubeflow/hooks.py b/kedro_kubeflow/hooks.py index a3d78bf..5c5d69b 100644 --- a/kedro_kubeflow/hooks.py +++ b/kedro_kubeflow/hooks.py @@ -5,13 +5,14 @@ from kedro_kubeflow.utils import is_mlflow_enabled +from .auth import AuthHandler + class MlflowIapAuthHook: """Allows authentication trough IAP proxy the same way as kubeflow pipelines""" @hook_impl def after_catalog_created(self, catalog: DataCatalog, **kwargs) -> None: - from .auth import AuthHandler token = AuthHandler().obtain_id_token() if token: From 70e9837bc5202de5208b153d923347ade75eaddc Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Mon, 1 Aug 2022 11:51:40 +0200 Subject: [PATCH 09/13] fix: Moved imports from top to def of funcs --- kedro_kubeflow/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro_kubeflow/cli.py b/kedro_kubeflow/cli.py index 33fcad7..2b33065 100644 --- a/kedro_kubeflow/cli.py +++ b/kedro_kubeflow/cli.py @@ -4,8 +4,6 @@ from pathlib import Path import click -import kubernetes.config -import mlflow from click import ClickException from .auth import AuthHandler @@ -313,6 +311,7 @@ def init(ctx, kfp_url: str, with_github_actions: bool): ) @click.pass_context def mlflow_start(ctx, kubeflow_run_id: str, output: str): + import mlflow token = AuthHandler().obtain_id_token() if token: @@ -337,6 +336,7 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str): @kubeflow_group.command(hidden=True) @click.argument("pvc_name", type=str) def delete_pipeline_volume(pvc_name: str): + import kubernetes.config kubernetes.config.load_incluster_config() current_namespace = open( From 40c2105ab299ad6e193734f658555ff33e16561b Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Mon, 1 Aug 2022 12:05:26 +0200 Subject: [PATCH 10/13] fix: Delete version of kedro from e2e tests --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0ff5c2e..f83da56 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -69,7 +69,7 @@ jobs: - name: Initialize kedro spaceflights project run: | - pip install . 'kedro>=0.18,<0.19' + pip install . kedro new --starter spaceflights --config tests/e2e/starter-config.yml --verbose - name: Install project dependencies From 0695912bda0bbf8e7942af41e724aaaf9b023f39 Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Mon, 1 Aug 2022 12:21:12 +0200 Subject: [PATCH 11/13] fix: Ignore imports for CodeClimate --- kedro_kubeflow/cli.py | 4 ++-- kedro_kubeflow/generators/pod_per_node_pipeline_generator.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kedro_kubeflow/cli.py b/kedro_kubeflow/cli.py index 2b33065..b118b55 100644 --- a/kedro_kubeflow/cli.py +++ b/kedro_kubeflow/cli.py @@ -311,7 +311,7 @@ def init(ctx, kfp_url: str, with_github_actions: bool): ) @click.pass_context def mlflow_start(ctx, kubeflow_run_id: str, output: str): - import mlflow + import mlflow # NOQA token = AuthHandler().obtain_id_token() if token: @@ -336,7 +336,7 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str): @kubeflow_group.command(hidden=True) @click.argument("pvc_name", type=str) def delete_pipeline_volume(pvc_name: str): - import kubernetes.config + import kubernetes.config # NOQA kubernetes.config.load_incluster_config() current_namespace = open( diff --git a/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py b/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py index 863037b..6508dfe 100644 --- a/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py +++ b/kedro_kubeflow/generators/pod_per_node_pipeline_generator.py @@ -44,7 +44,7 @@ def generate_pipeline(self, pipeline, image, image_pull_policy): def convert_kedro_pipeline_to_kfp() -> None: """Convert from a Kedro pipeline into a kfp container graph.""" - from kedro.framework.project import pipelines + from kedro.framework.project import pipelines # NOQA dsl.get_pipeline_conf().set_ttl_seconds_after_finished( self.run_config.ttl From 24038cd8895ab31ea110bd311933828b80d880c3 Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Mon, 1 Aug 2022 13:40:39 +0200 Subject: [PATCH 12/13] fix: rename function and moved back imports to definition of function --- kedro_kubeflow/auth.py | 6 +++--- tests/test_generator_utils.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kedro_kubeflow/auth.py b/kedro_kubeflow/auth.py index 2b198e8..36ad561 100644 --- a/kedro_kubeflow/auth.py +++ b/kedro_kubeflow/auth.py @@ -4,9 +4,6 @@ from urllib.parse import urlsplit, urlunsplit import requests -from google.auth.exceptions import DefaultCredentialsError -from google.auth.transport.requests import Request -from google.oauth2 import id_token IAP_CLIENT_ID = "IAP_CLIENT_ID" DEX_USERNAME = "DEX_USERNAME" @@ -18,6 +15,9 @@ class AuthHandler(object): log = logging.getLogger(__name__) def obtain_id_token(self): + from google.auth.exceptions import DefaultCredentialsError + from google.auth.transport.requests import Request + from google.oauth2 import id_token client_id = os.environ.get(IAP_CLIENT_ID, None) diff --git a/tests/test_generator_utils.py b/tests/test_generator_utils.py index 5596ed3..41442d5 100644 --- a/tests/test_generator_utils.py +++ b/tests/test_generator_utils.py @@ -5,7 +5,7 @@ from kedro_kubeflow.generators.utils import is_local_fs -def import_condition(): +def gcsfs_is_missing(): try: import gcsfs # NOQA @@ -14,7 +14,7 @@ def import_condition(): return True -@unittest.skipIf(import_condition(), "Package gcsfs is not installed") +@unittest.skipIf(gcsfs_is_missing(), "Package gcsfs is not installed") class TestGeneratorUtils(unittest.TestCase): def test_is_local(self): assert is_local_fs("data/test/file.txt") is True From ca334dbb96add98d0fc755f39404be4b686125fc Mon Sep 17 00:00:00 2001 From: Blazej Palkus Date: Mon, 1 Aug 2022 14:34:18 +0200 Subject: [PATCH 13/13] fix: fix import --- kedro_kubeflow/cli.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kedro_kubeflow/cli.py b/kedro_kubeflow/cli.py index b118b55..6af128c 100644 --- a/kedro_kubeflow/cli.py +++ b/kedro_kubeflow/cli.py @@ -4,7 +4,6 @@ from pathlib import Path import click -from click import ClickException from .auth import AuthHandler from .config import PluginConfig @@ -322,7 +321,7 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str): kedro_context = ctx.obj["context_helper"].context mlflow_conf = kedro_context.mlflow except AttributeError: - raise ClickException("Could not read MLFlow config") + raise click.ClickException("Could not read MLFlow config") run = mlflow.start_run( experiment_id=mlflow_conf.experiment.experiment_id, nested=False