From 0e8df7052f2b659b6fa676c61e9f27864b252a47 Mon Sep 17 00:00:00 2001 From: Artur Dobrogowski Date: Mon, 21 Nov 2022 16:40:12 +0100 Subject: [PATCH 1/5] feat: Changed resource config to allow custom values For fixing error with labels for nvidia gpus/tpus being not propagated/transmitted to environments properly --- kedro_kubeflow/config.py | 20 +++++++--- kedro_kubeflow/generators/utils.py | 8 +++- tests/test_config.py | 63 ++++++++++++++++-------------- 3 files changed, 54 insertions(+), 37 deletions(-) diff --git a/kedro_kubeflow/config.py b/kedro_kubeflow/config.py index 534eeee..eb9b36d 100644 --- a/kedro_kubeflow/config.py +++ b/kedro_kubeflow/config.py @@ -178,9 +178,13 @@ def __getitem__(self, key): ) -class ResourceConfig(BaseModel): - cpu: Optional[str] - memory: Optional[str] +class ResourceConfig(dict): + def __getitem__(self, key): + defaults: dict = super().__getitem__("__default__") + this: dict = super().get(key, {}) + updated_defaults = defaults.copy() + updated_defaults.update(this) + return updated_defaults class TolerationConfig(BaseModel): @@ -286,9 +290,15 @@ def _create_default_dict_with( @validator("resources", always=True) def _validate_resources(cls, value): - return RunConfig._create_default_dict_with( - value, ResourceConfig(cpu="500m", memory="1024Mi") + default = ResourceConfig( + {"__default__": {"cpu": "500m", "memory": "1024Mi"}} ) + if isinstance(value, dict): + default.update(value) + # else: + # # throw some error? + # logger.error(value, "Unknown type") + return default @validator("retry_policy", always=True) def _validate_retry_policy(cls, value): diff --git a/kedro_kubeflow/generators/utils.py b/kedro_kubeflow/generators/utils.py index c3bedf3..aa0f34a 100644 --- a/kedro_kubeflow/generators/utils.py +++ b/kedro_kubeflow/generators/utils.py @@ -130,8 +130,12 @@ def customize_op(op, image_pull_policy, run_config: RunConfig): op.container.set_security_context( k8s.V1SecurityContext(run_as_user=run_config.volume.owner) ) - - resources = run_config.resources[op.name].dict(exclude_none=True) + resources = run_config.resources.get( + op.name, run_config.resources["__default__"] + ) + for k, v in run_config.resources["__default__"].items(): + if k not in resources: + resources[k] = v op.container.resources = k8s.V1ResourceRequirements( limits=resources, requests=resources, diff --git a/tests/test_config.py b/tests/test_config.py index bd33360..2a7eada 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -28,27 +28,28 @@ class TestPluginConfig(unittest.TestCase, MinimalConfigMixin): def test_plugin_config(self): cfg = PluginConfig(**yaml.safe_load(CONFIG_YAML)) - assert cfg.host == "https://example.com" - assert cfg.run_config.image == "gcr.io/project-image/test" - assert cfg.run_config.image_pull_policy == "Always" - assert cfg.run_config.experiment_name == "Test Experiment" - assert cfg.run_config.run_name == "test run" - assert cfg.run_config.scheduled_run_name == "scheduled run" - assert cfg.run_config.wait_for_completion - assert cfg.run_config.volume.storageclass == "default" - assert cfg.run_config.volume.size == "3Gi" - assert cfg.run_config.volume.keep is True - assert cfg.run_config.volume.access_modes == ["ReadWriteOnce"] - assert cfg.run_config.resources["node1"] is not None - assert cfg.run_config.description == "My awesome pipeline" - assert cfg.run_config.ttl == 300 + self.assertEqual(cfg.host, "https://example.com") + self.assertEqual(cfg.run_config.image, "gcr.io/project-image/test") + self.assertEqual(cfg.run_config.image_pull_policy, "Always") + self.assertEqual(cfg.run_config.experiment_name, "Test Experiment") + self.assertEqual(cfg.run_config.run_name, "test run") + self.assertEqual(cfg.run_config.scheduled_run_name, "scheduled run") + self.assertTrue(cfg.run_config.wait_for_completion) + self.assertEqual(cfg.run_config.volume.storageclass, "default") + self.assertEqual(cfg.run_config.volume.size, "3Gi") + self.assertTrue(cfg.run_config.volume.keep) + self.assertEqual(cfg.run_config.volume.access_modes, ["ReadWriteOnce"]) + self.assertIsNotNone(cfg.run_config.resources["node1"]) + self.assertIsNotNone(cfg.run_config.resources["__default__"]) + self.assertEqual(cfg.run_config.description, "My awesome pipeline") + self.assertEqual(cfg.run_config.ttl, 300) def test_defaults(self): cfg = PluginConfig(**self.minimal_config()) - assert cfg.run_config.image_pull_policy == "IfNotPresent" + self.assertEqual(cfg.run_config.image_pull_policy, "IfNotPresent") assert cfg.run_config.description is None SECONDS_IN_ONE_WEEK = 3600 * 24 * 7 - assert cfg.run_config.ttl == SECONDS_IN_ONE_WEEK + self.assertEqual(cfg.run_config.ttl, SECONDS_IN_ONE_WEEK) assert cfg.run_config.volume is None def test_missing_required_config(self): @@ -61,8 +62,8 @@ def test_resources_default_only(self): {"run_config": {"resources": {"__default__": {"cpu": "100m"}}}} ) ) - assert cfg.run_config.resources["node2"].cpu == "100m" - assert cfg.run_config.resources["node3"].cpu == "100m" + self.assertEqual(cfg.run_config.resources["node2"]["cpu"], "100m") + self.assertEqual(cfg.run_config.resources["node3"]["cpu"], "100m") def test_resources_no_default(self): cfg = PluginConfig( @@ -70,10 +71,10 @@ def test_resources_no_default(self): {"run_config": {"resources": {"node2": {"cpu": "100m"}}}} ) ) - assert cfg.run_config.resources["node2"].cpu == "100m" + self.assertEqual(cfg.run_config.resources["node2"]["cpu"], "100m") self.assertDictEqual( - cfg.run_config.resources["node3"].dict(), - cfg.run_config.resources["__default__"].dict(), + cfg.run_config.resources["node3"], + cfg.run_config.resources["__default__"], ) def test_resources_default_and_node_specific(self): @@ -90,14 +91,14 @@ def test_resources_default_and_node_specific(self): ) ) self.assertDictEqual( - cfg.run_config.resources["node2"].dict(), + cfg.run_config.resources["node2"], { "cpu": "100m", "memory": "64Mi", }, ) self.assertDictEqual( - cfg.run_config.resources["node3"].dict(), + cfg.run_config.resources["node3"], { "cpu": "200m", "memory": "64Mi", @@ -149,13 +150,15 @@ def test_tolerations_no_default(self): cfg.run_config.tolerations["node2"][0].dict(), toleration_config[0] ) - assert ( + self.assertEqual( isinstance(cfg.run_config.tolerations["node2"], list) - and len(cfg.run_config.tolerations["node2"]) == 1 + and len(cfg.run_config.tolerations["node2"]), + 1, ) - assert ( + self.assertEqual( isinstance(cfg.run_config.tolerations["node3"], list) - and len(cfg.run_config.tolerations["node3"]) == 0 + and len(cfg.run_config.tolerations["node3"]), + 0, ) def test_tolerations_default_and_node_specific(self): @@ -206,8 +209,8 @@ def test_reuse_run_name_for_scheduled_run_name(self): cfg = PluginConfig( **self.minimal_config({"run_config": {"run_name": "some run"}}) ) - assert cfg.run_config.run_name == "some run" - assert cfg.run_config.scheduled_run_name == "some run" + self.assertEqual(cfg.run_config.run_name, "some run") + self.assertEqual(cfg.run_config.scheduled_run_name, "some run") def test_retry_policy_default_and_node_specific(self): cfg = PluginConfig( @@ -275,4 +278,4 @@ def test_retry_policy_no_default(self): }, ) - assert cfg.run_config.retry_policy["node2"] is None + self.assertIsNone(cfg.run_config.retry_policy["node2"]) From 5c21fd7a11e7ded43252909734093a5a748096d7 Mon Sep 17 00:00:00 2001 From: Artur Dobrogowski Date: Tue, 22 Nov 2022 15:19:31 +0100 Subject: [PATCH 2/5] tests: Added some tests for custom gpu tags --- tests/test_config.py | 23 +++ tests/test_pod_per_node_pipeline_generator.py | 139 +++++++++++------- 2 files changed, 105 insertions(+), 57 deletions(-) diff --git a/tests/test_config.py b/tests/test_config.py index 2a7eada..d66e2cf 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -65,6 +65,29 @@ def test_resources_default_only(self): self.assertEqual(cfg.run_config.resources["node2"]["cpu"], "100m") self.assertEqual(cfg.run_config.resources["node3"]["cpu"], "100m") + def test_resources_gpu_label(self): + cfg = PluginConfig( + **self.minimal_config( + { + "run_config": { + "resources": { + "__default__": { + "cpu": "100m", + "nvidia.com/gpu": "1", + "nvidia.com/tpu": "1", + } + } + } + } + ) + ) + self.assertEqual( + cfg.run_config.resources["__default__"]["nvidia.com/gpu"], "1" + ) + self.assertEqual( + cfg.run_config.resources["node3"]["nvidia.com/tpu"], "1" + ) + def test_resources_no_default(self): cfg = PluginConfig( **self.minimal_config( diff --git a/tests/test_pod_per_node_pipeline_generator.py b/tests/test_pod_per_node_pipeline_generator.py index 2b56f55..7bb6d01 100644 --- a/tests/test_pod_per_node_pipeline_generator.py +++ b/tests/test_pod_per_node_pipeline_generator.py @@ -36,8 +36,8 @@ def test_support_modification_of_pull_policy(self): pipeline() # then - assert ( - dsl_pipeline.ops["node1"].container.image == "unittest-image" + self.assertEqual( + dsl_pipeline.ops["node1"].container.image, "unittest-image" ) assert ( dsl_pipeline.ops["node1"].container.image_pull_policy @@ -60,7 +60,7 @@ def test_should_support_inter_steps_volume_with_defaults(self): pipeline() # then - assert len(dsl_pipeline.ops) == 5 + self.assertEqual(len(dsl_pipeline.ops), 6) assert "on-exit" in dsl_pipeline.ops assert ( dsl_pipeline.ops["on-exit"] @@ -73,18 +73,20 @@ def test_should_support_inter_steps_volume_with_defaults(self): volume_spec = dsl_pipeline.ops[ "data-volume-create" ].k8s_resource.spec - assert volume_spec.resources.requests["storage"] == "1Gi" - assert volume_spec.access_modes == ["ReadWriteOnce"] + self.assertEqual(volume_spec.resources.requests["storage"], "1Gi") + self.assertEqual(volume_spec.access_modes, ["ReadWriteOnce"]) assert volume_spec.storage_class_name is None volume_init_spec = dsl_pipeline.ops["data-volume-init"].container - assert volume_init_spec.image == "unittest-image" - assert volume_init_spec.image_pull_policy == "IfNotPresent" - assert volume_init_spec.security_context.run_as_user == 0 + self.assertEqual(volume_init_spec.image, "unittest-image") + self.assertEqual( + volume_init_spec.image_pull_policy, "IfNotPresent" + ) + self.assertEqual(volume_init_spec.security_context.run_as_user, 0) assert volume_init_spec.args[0].startswith("cp --verbose -r") for node_name in ["data-volume-init", "node1", "node2"]: volumes = dsl_pipeline.ops[node_name].container.volume_mounts - assert len(volumes) == 1 - assert volumes[0].name == "data-volume-create" + self.assertEqual(len(volumes), 1) + self.assertEqual(volumes[0].name, "data-volume-create") assert ( dsl_pipeline.ops[ node_name @@ -172,14 +174,14 @@ def test_should_support_inter_steps_volume_with_given_spec(self): pipeline() # then - assert len(dsl_pipeline.ops) == 5 + self.assertEqual(len(dsl_pipeline.ops), 6) assert "on-exit" in dsl_pipeline.ops volume_spec = dsl_pipeline.ops[ "data-volume-create" ].k8s_resource.spec - assert volume_spec.resources.requests["storage"] == "1Mi" - assert volume_spec.access_modes == ["ReadWriteOnce"] - assert volume_spec.storage_class_name == "nfs" + self.assertEqual(volume_spec.resources.requests["storage"], "1Mi") + self.assertEqual(volume_spec.access_modes, ["ReadWriteOnce"]) + self.assertEqual(volume_spec.storage_class_name, "nfs") def test_should_change_effective_user_if_to_volume_owner(self): # given @@ -207,7 +209,7 @@ def test_should_change_effective_user_if_to_volume_owner(self): # then volume_init_spec = dsl_pipeline.ops["data-volume-init"].container - assert volume_init_spec.security_context.run_as_user == 47 + self.assertEqual(volume_init_spec.security_context.run_as_user, 47) for node_name in ["data-volume-init", "node1", "node2"]: assert ( dsl_pipeline.ops[ @@ -233,16 +235,19 @@ def test_should_add_mlflow_init_step_if_enabled(self): pipeline() # then - assert len(dsl_pipeline.ops) == 3 + self.assertEqual(len(dsl_pipeline.ops), 4) init_step = dsl_pipeline.ops["mlflow-start-run"].container - assert init_step.image == "unittest-image" - assert init_step.args == [ - "kubeflow", - "--env", - "unittests", - "mlflow-start", - "{{workflow.uid}}", - ] + self.assertEqual(init_step.image, "unittest-image") + self.assertEqual( + init_step.args, + [ + "kubeflow", + "--env", + "unittests", + "mlflow-start", + "{{workflow.uid}}", + ], + ) assert "MLFLOW_RUN_ID" not in {e.name for e in init_step.env} for node_name in ["node1", "node2"]: env = { @@ -271,14 +276,14 @@ def test_should_skip_volume_init_if_requested(self): pipeline() # then - assert len(dsl_pipeline.ops) == 4 + self.assertEqual(len(dsl_pipeline.ops), 5) assert "data-volume-create" in dsl_pipeline.ops assert "on-exit" in dsl_pipeline.ops assert "data-volume-init" not in dsl_pipeline.ops for node_name in ["node1", "node2"]: volumes = dsl_pipeline.ops[node_name].container.volume_mounts - assert len(volumes) == 1 - assert volumes[0].name == "data-volume-create" + self.assertEqual(len(volumes), 1) + self.assertEqual(volumes[0].name, "data-volume-create") def test_should_support_params_and_inject_them_to_the_nodes(self): # given @@ -297,18 +302,21 @@ def test_should_support_params_and_inject_them_to_the_nodes(self): pipeline() # then - assert len(default_params) == 2 - assert default_params["param1"].default == 0.3 - assert default_params["param2"].default == 42 + self.assertEqual(len(default_params), 2) + self.assertEqual(default_params["param1"].default, 0.3) + self.assertEqual(default_params["param2"].default, 42) for node_name in ["node1", "node2"]: args = dsl_pipeline.ops[node_name].container.args - assert args == [ - "_", - "param1", - "{{pipelineparam:op=;name=param1}}", - "param2", - "{{pipelineparam:op=;name=param2}}", - ] + self.assertEqual( + args, + [ + "_", + "param1", + "{{pipelineparam:op=;name=param1}}", + "param2", + "{{pipelineparam:op=;name=param2}}", + ], + ) def test_should_fallbackto_default_resources_spec_if_not_requested(self): # given @@ -337,6 +345,7 @@ def test_should_add_resources_spec(self): "resources": { "__default__": {"cpu": "100m"}, "node1": {"cpu": "400m", "memory": "64Gi"}, + "node3": {"memory": "32Gi", "nvidia.com/gpu": "1"}, } } ) @@ -355,10 +364,21 @@ def test_should_add_resources_spec(self): # then node1_spec = dsl_pipeline.ops["node1"].container.resources node2_spec = dsl_pipeline.ops["node2"].container.resources - assert node1_spec.limits == {"cpu": "400m", "memory": "64Gi"} - assert node1_spec.requests == {"cpu": "400m", "memory": "64Gi"} - assert node2_spec.limits == {"cpu": "100m"} - assert node2_spec.requests == {"cpu": "100m"} + node3_spec = dsl_pipeline.ops["node3"].container.resources + for spec, result in zip( + (node1_spec, node2_spec, node3_spec), + ( + {"cpu": "400m", "memory": "64Gi"}, + {"cpu": "100m"}, + {"cpu": "100m", "memory": "32Gi", "nvidia.com/gpu": "1"}, + ), + ): + for precise_spec in (spec.limits, spec.requests): + self.assertDictEqual(precise_spec, result) + # self.assertEqual(node1_spec.limits , {"cpu": "400m", "memory": "64Gi"}) + # self.assertEqual(node1_spec.requests , {"cpu": "400m", "memory": "64Gi"}) + # self.assertEqual(node2_spec.limits , {"cpu": "100m"}) + # self.assertEqual(node2_spec.requests , {"cpu": "100m"}) def test_can_add_extra_volumes(self): self.create_generator( @@ -391,7 +411,7 @@ def test_can_add_extra_volumes(self): pipeline() volume_mounts = dsl_pipeline.ops["node1"].container.volume_mounts - assert len(volume_mounts) == 1 + self.assertEqual(len(volume_mounts), 1) def test_should_not_add_retry_policy_if_not_requested(self): # given @@ -411,7 +431,7 @@ def test_should_not_add_retry_policy_if_not_requested(self): # then for node_name in ["node1", "node2"]: op = dsl_pipeline.ops[node_name] - assert op.num_retries == 0 + self.assertEqual(op.num_retries, 0) assert op.retry_policy is None assert op.backoff_factor is None assert op.backoff_duration is None @@ -449,16 +469,16 @@ def test_should_add_retry_policy(self): # then op1 = dsl_pipeline.ops["node1"] - assert op1.num_retries == 100 - assert op1.retry_policy == "Always" - assert op1.backoff_factor == 1 - assert op1.backoff_duration == "5m" + self.assertEqual(op1.num_retries, 100) + self.assertEqual(op1.retry_policy, "Always") + self.assertEqual(op1.backoff_factor, 1) + self.assertEqual(op1.backoff_duration, "5m") assert op1.backoff_max_duration is None op2 = dsl_pipeline.ops["node2"] - assert op2.num_retries == 4 - assert op2.retry_policy == "Always" - assert op2.backoff_factor == 2 - assert op2.backoff_duration == "60s" + self.assertEqual(op2.num_retries, 4) + self.assertEqual(op2.retry_policy, "Always") + self.assertEqual(op2.backoff_factor, 2) + self.assertEqual(op2.backoff_duration, "60s") assert op2.backoff_max_duration is None def test_should_add_max_cache_staleness(self): @@ -494,7 +514,7 @@ def test_should_set_description(self): ) # then - assert pipeline._component_description == "DESC" + self.assertEqual(pipeline._component_description, "DESC") def test_artifact_registration(self): # given @@ -520,11 +540,15 @@ def test_artifact_registration(self): # then outputs1 = dsl_pipeline.ops["node1"].file_outputs - assert len(outputs1) == 1 + self.assertEqual(len(outputs1), 1) assert "B" in outputs1 - assert outputs1["B"] == "/home/kedro/data/02_intermediate/b.csv" + self.assertEqual( + outputs1["B"], "/home/kedro/data/02_intermediate/b.csv" + ) outputs2 = dsl_pipeline.ops["node2"].file_outputs - assert len(outputs2) == 0 # output "C" is missing in the catalog + self.assertEqual( + len(outputs2), 0 + ) # output "C" is missing in the catalog) def test_should_skip_artifact_registration_if_requested(self): # given @@ -551,7 +575,7 @@ def test_should_skip_artifact_registration_if_requested(self): # then outputs1 = dsl_pipeline.ops["node1"].file_outputs - assert len(outputs1) == 0 + self.assertEqual(len(outputs1), 0) def test_should_skip_volume_removal_if_requested(self): # given @@ -596,7 +620,7 @@ def test_should_pass_kedro_config_env_to_nodes(self): for e in dsl_pipeline.ops[node_name].container.env } assert "KEDRO_CONFIG_MY_KEY" in env_values - assert env_values["KEDRO_CONFIG_MY_KEY"] == "42" + self.assertEqual(env_values["KEDRO_CONFIG_MY_KEY"], "42") assert "SOME_VALUE" not in env_values finally: del os.environ["KEDRO_CONFIG_MY_KEY"] @@ -620,6 +644,7 @@ def create_generator(self, config=None, params=None, catalog=None): [ node(identity, "A", "B", name="node1"), node(identity, "B", "C", name="node2"), + node(identity, "B", "D", name="node3"), ] ) } From a42bc957565b998f5ba62e45a81482de35f3b748 Mon Sep 17 00:00:00 2001 From: Artur Dobrogowski Date: Tue, 22 Nov 2022 15:21:48 +0100 Subject: [PATCH 3/5] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8353ef4..277c3fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased] +- Removed field validation from resources configuration field - now it can take any custom parameters such as "nvidia.com/gpu":1 + ## [0.7.3] - 2022-09-23 - Fixed plugin config provider so it respects environment provided by the user From 6f2514d0e04cdfce78ce5c644a46709943cb5a3c Mon Sep 17 00:00:00 2001 From: Artur Dobrogowski Date: Wed, 23 Nov 2022 11:22:04 +0100 Subject: [PATCH 4/5] cleanup --- kedro_kubeflow/config.py | 7 +++---- kedro_kubeflow/generators/utils.py | 8 ++------ 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/kedro_kubeflow/config.py b/kedro_kubeflow/config.py index eb9b36d..215c6c5 100644 --- a/kedro_kubeflow/config.py +++ b/kedro_kubeflow/config.py @@ -180,7 +180,7 @@ def __getitem__(self, key): class ResourceConfig(dict): def __getitem__(self, key): - defaults: dict = super().__getitem__("__default__") + defaults: dict = super().get("__default__") this: dict = super().get(key, {}) updated_defaults = defaults.copy() updated_defaults.update(this) @@ -295,9 +295,8 @@ def _validate_resources(cls, value): ) if isinstance(value, dict): default.update(value) - # else: - # # throw some error? - # logger.error(value, "Unknown type") + elif value is not None: + logger.error(f"Unknown type for resource config {type(value)}") return default @validator("retry_policy", always=True) diff --git a/kedro_kubeflow/generators/utils.py b/kedro_kubeflow/generators/utils.py index aa0f34a..a4e4e48 100644 --- a/kedro_kubeflow/generators/utils.py +++ b/kedro_kubeflow/generators/utils.py @@ -130,12 +130,8 @@ def customize_op(op, image_pull_policy, run_config: RunConfig): op.container.set_security_context( k8s.V1SecurityContext(run_as_user=run_config.volume.owner) ) - resources = run_config.resources.get( - op.name, run_config.resources["__default__"] - ) - for k, v in run_config.resources["__default__"].items(): - if k not in resources: - resources[k] = v + + resources = run_config.resources[op.name] op.container.resources = k8s.V1ResourceRequirements( limits=resources, requests=resources, From 17e80b27b9811a69de0667f422a7c3cfd502f6b3 Mon Sep 17 00:00:00 2001 From: Artur Dobrogowski Date: Wed, 23 Nov 2022 11:24:58 +0100 Subject: [PATCH 5/5] refactor: added raising exception apart from logging error --- kedro_kubeflow/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro_kubeflow/config.py b/kedro_kubeflow/config.py index 215c6c5..612f3a3 100644 --- a/kedro_kubeflow/config.py +++ b/kedro_kubeflow/config.py @@ -297,6 +297,7 @@ def _validate_resources(cls, value): default.update(value) elif value is not None: logger.error(f"Unknown type for resource config {type(value)}") + raise TypeError(f"Unknown type for resource config {type(value)}") return default @validator("retry_policy", always=True)