Skip to content

Commit

Permalink
Merge pull request #227 from getindata/release-0.7.4
Browse files Browse the repository at this point in the history
Release 0.7.4
  • Loading branch information
szczeles committed Feb 27, 2023
2 parents 4cb366c + e87bcf3 commit d3cff89
Show file tree
Hide file tree
Showing 17 changed files with 445 additions and 146 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.7.3
current_version = 0.7.4

[bumpversion:file:pyproject.toml]

Expand Down
2 changes: 1 addition & 1 deletion .copier-answers.yml
Expand Up @@ -8,7 +8,7 @@ description: Kedro plugin with Kubeflow Pipelines support
docs_url: https://kedro-kubeflow.readthedocs.io/
full_name: Kedro Kubeflow Plugin
github_url: https://github.com/getindata/kedro-kubeflow
initial_version: 0.7.3
initial_version: 0.7.4
keywords:
- kedro-plugin
- kubeflow
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Test with tox
run: |
pip install tox-pip-version tox-gh-actions
pip install tox-pip-version tox-gh-actions "tox<4.0.0"
tox -v
- name: Store coverage reports
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", "--line-length=79"]
Expand Down
9 changes: 8 additions & 1 deletion CHANGELOG.md
Expand Up @@ -2,6 +2,11 @@

## [Unreleased]

## [0.7.4] - 2023-02-27

- Removed field validation from resources configuration field - now it can take any custom parameters such as "nvidia.com/gpu":1
- Added support for kedro namespaces for parameters

## [0.7.3] - 2022-09-23

- Fixed plugin config provider so it respects environment provided by the user
Expand Down Expand Up @@ -167,7 +172,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.7.3...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.7.4...HEAD

[0.7.4]: https://github.com/getindata/kedro-kubeflow/compare/0.7.3...0.7.4

[0.7.3]: https://github.com/getindata/kedro-kubeflow/compare/0.7.2...0.7.3

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
@@ -1 +1 @@
__version__ = "0.7.3"
__version__ = "0.7.4"
20 changes: 15 additions & 5 deletions kedro_kubeflow/config.py
Expand Up @@ -178,9 +178,13 @@ def __getitem__(self, key):
)


class ResourceConfig(BaseModel):
cpu: Optional[str]
memory: Optional[str]
class ResourceConfig(dict):
def __getitem__(self, key):
defaults: dict = super().get("__default__")
this: dict = super().get(key, {})
updated_defaults = defaults.copy()
updated_defaults.update(this)
return updated_defaults


class TolerationConfig(BaseModel):
Expand Down Expand Up @@ -286,9 +290,15 @@ def _create_default_dict_with(

@validator("resources", always=True)
def _validate_resources(cls, value):
return RunConfig._create_default_dict_with(
value, ResourceConfig(cpu="500m", memory="1024Mi")
default = ResourceConfig(
{"__default__": {"cpu": "500m", "memory": "1024Mi"}}
)
if isinstance(value, dict):
default.update(value)
elif value is not None:
logger.error(f"Unknown type for resource config {type(value)}")
raise TypeError(f"Unknown type for resource config {type(value)}")
return default

@validator("retry_policy", always=True)
def _validate_retry_policy(cls, value):
Expand Down
14 changes: 9 additions & 5 deletions kedro_kubeflow/generators/one_pod_pipeline_generator.py
Expand Up @@ -12,6 +12,7 @@
customize_op,
is_local_fs,
maybe_add_params,
merge_namespaced_params_to_dict,
)


Expand All @@ -26,8 +27,10 @@ def __init__(self, config, project_name, context):
self.catalog = context.config_loader.get("catalog*")

def generate_pipeline(self, pipeline, image, image_pull_policy):
merged_params = merge_namespaced_params_to_dict(self.context.params)

@dsl.pipeline(self.project_name, self.run_config.description)
@maybe_add_params(self.context.params)
@maybe_add_params(merged_params)
def convert_kedro_pipeline_to_kfp() -> None:
dsl.get_pipeline_conf().set_ttl_seconds_after_finished(
self.run_config.ttl
Expand All @@ -39,13 +42,16 @@ def convert_kedro_pipeline_to_kfp() -> None:
self.run_config,
self.context,
):
self._build_kfp_op(pipeline, image, image_pull_policy)
self._build_kfp_op(
pipeline, merged_params, image, image_pull_policy
)

return convert_kedro_pipeline_to_kfp

def _build_kfp_op(
self,
pipeline,
params,
image,
image_pull_policy,
) -> dsl.ContainerOp:
Expand All @@ -59,9 +65,7 @@ def _build_kfp_op(
f"--pipeline {pipeline} "
f"--config config.yaml"
),
arguments=create_arguments_from_parameters(
self.context.params.keys()
),
arguments=create_arguments_from_parameters(params.keys()),
container_kwargs={"env": create_container_environment()},
file_outputs={
output: f"/home/kedro/{self.catalog[output]['filepath']}"
Expand Down
16 changes: 11 additions & 5 deletions kedro_kubeflow/generators/pod_per_node_pipeline_generator.py
Expand Up @@ -15,6 +15,7 @@
customize_op,
is_local_fs,
maybe_add_params,
merge_namespaced_params_to_dict,
)


Expand All @@ -36,11 +37,13 @@ def configure_max_cache_staleness(self, kfp_ops):
)

def generate_pipeline(self, pipeline, image, image_pull_policy):
merged_params = merge_namespaced_params_to_dict(self.context.params)

@dsl.pipeline(
name=self.project_name,
description=self.run_config.description,
)
@maybe_add_params(self.context.params)
@maybe_add_params(merged_params)
def convert_kedro_pipeline_to_kfp() -> None:
"""Convert from a Kedro pipeline into a kfp container graph."""

Expand All @@ -58,7 +61,11 @@ def convert_kedro_pipeline_to_kfp() -> None:
self.context,
):
kfp_ops = self._build_kfp_ops(
pipeline, node_dependencies, image, image_pull_policy
pipeline,
merged_params,
node_dependencies,
image,
image_pull_policy,
)

self.configure_max_cache_staleness(kfp_ops)
Expand All @@ -71,6 +78,7 @@ def convert_kedro_pipeline_to_kfp() -> None:
def _build_kfp_ops(
self,
pipeline,
params,
node_dependencies: Dict[Node, Set[Node]],
image,
image_pull_policy,
Expand Down Expand Up @@ -129,9 +137,7 @@ def _build_kfp_ops(
f"--node {node.name} "
f"--config config.yaml"
),
arguments=create_arguments_from_parameters(
self.context.params.keys()
),
arguments=create_arguments_from_parameters(params.keys()),
pvolumes=node_volumes,
container_kwargs={"env": nodes_env},
file_outputs={
Expand Down
47 changes: 45 additions & 2 deletions kedro_kubeflow/generators/utils.py
Expand Up @@ -2,7 +2,7 @@
import itertools
import json
import os
from functools import wraps
from functools import reduce, wraps
from inspect import Parameter, signature

import fsspec
Expand Down Expand Up @@ -40,6 +40,49 @@ def wrapper(*args, **kwargs):
return decorator


def merge_namespaced_params_to_dict(kedro_parameters):
def dict_merge(dct, merge_dct):
"""Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
updating only top-level keys, dict_merge recurses down into dicts nested
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
``dct``.
:param dct: dict onto which the merge is executed
:param merge_dct: dct merged into dct
:return: None
"""
for k, v in merge_dct.items():
if (
k in dct
and isinstance(dct[k], dict)
and isinstance(merge_dct[k], dict)
): # noqa
dict_merge(dct[k], merge_dct[k])
else:
dct[k] = merge_dct[k]

return dct

normalized_params = []
for key, param in kedro_parameters.items():
if "." in key:
nested_keys = key.split(".")
top_key = nested_keys.pop(0)
bottom_key = nested_keys.pop()
inner_dict = {bottom_key: param}
for nested_key in nested_keys[-1:]:
inner_dict = {nested_key: inner_dict}

inner_dict = {top_key: inner_dict}
normalized_params.append(inner_dict)
else:
normalized_params.append({key: param})

if normalized_params:
return reduce(dict_merge, normalized_params)
else:
return dict()


def create_container_environment():
env_vars = [
k8s.V1EnvVar(
Expand Down Expand Up @@ -131,7 +174,7 @@ def customize_op(op, image_pull_policy, run_config: RunConfig):
k8s.V1SecurityContext(run_as_user=run_config.volume.owner)
)

resources = run_config.resources[op.name].dict(exclude_none=True)
resources = run_config.resources[op.name]
op.container.resources = k8s.V1ResourceRequirements(
limits=resources,
requests=resources,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "kedro-kubeflow"
version = "0.7.3"
version = "0.7.4"
description = "Kedro plugin with Kubeflow Pipelines support"
readme = "README.md"
authors = ['Mateusz Pytel <mateusz.pytel@getindata.com>', 'Mariusz Strzelecki <mariusz.strzelecki@getindata.com>']
Expand Down
2 changes: 1 addition & 1 deletion sonar-project.properties
Expand Up @@ -6,7 +6,7 @@ sonar.tests=tests/
sonar.python.coverage.reportPaths=coverage.xml
sonar.python.version=3.9

sonar.projectVersion=0.7.3
sonar.projectVersion=0.7.4
sonar.projectDescription=Kedro plugin with Kubeflow Pipelines support
sonar.links.homepage=https://kedro-kubeflow.readthedocs.io/
sonar.links.ci=https://github.com/getindata/kedro-kubeflow/actions
Expand Down
50 changes: 12 additions & 38 deletions tests/e2e/catalog.yml
Expand Up @@ -20,64 +20,38 @@ model_input_table:
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/03_primary/model_input_table.csv
layer: primary

data_processing.preprocessed_companies:
preprocessed_companies:
type: pandas.CSVDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/02_intermediate/preprocessed_companies.csv
layer: intermediate

data_processing.preprocessed_shuttles:
preprocessed_shuttles:
type: pandas.CSVDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/02_intermediate/preprocessed_shuttles.csv
layer: intermediate

data_science.active_modelling_pipeline.X_train:
X_train:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/05_model_input/X_train.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/05_model_input/X_train.pickle
layer: model_input

data_science.active_modelling_pipeline.y_train:
y_train:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/05_model_input/y_train.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/05_model_input/y_train.pickle
layer: model_input

data_science.active_modelling_pipeline.X_test:
X_test:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/05_model_input/X_test.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/05_model_input/X_test.pickle
layer: model_input

data_science.active_modelling_pipeline.y_test:
y_test:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/05_model_input/y_test.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/05_model_input/y_test.pickle
layer: model_input

data_science.active_modelling_pipeline.regressor:
regressor:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/06_models/regressor.pickle
versioned: true
layer: models

data_science.candidate_modelling_pipeline.X_train:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/05_model_input/X_train.pickle
layer: model_input

data_science.candidate_modelling_pipeline.y_train:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/05_model_input/y_train.pickle
layer: model_input

data_science.candidate_modelling_pipeline.X_test:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/05_model_input/X_test.pickle
layer: model_input

data_science.candidate_modelling_pipeline.y_test:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/05_model_input/y_test.pickle
layer: model_input

data_science.candidate_modelling_pipeline.regressor:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/06_models/regressor.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/06_models/regressor.pickle
versioned: true
layer: models

0 comments on commit d3cff89

Please sign in to comment.