Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.7.4 #227

Merged
merged 9 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.7.3
current_version = 0.7.4

[bumpversion:file:pyproject.toml]

Expand Down
2 changes: 1 addition & 1 deletion .copier-answers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ description: Kedro plugin with Kubeflow Pipelines support
docs_url: https://kedro-kubeflow.readthedocs.io/
full_name: Kedro Kubeflow Plugin
github_url: https://github.com/getindata/kedro-kubeflow
initial_version: 0.7.3
initial_version: 0.7.4
keywords:
- kedro-plugin
- kubeflow
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:

- name: Test with tox
run: |
pip install tox-pip-version tox-gh-actions
pip install tox-pip-version tox-gh-actions "tox<4.0.0"
tox -v

- name: Store coverage reports
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", "--line-length=79"]
Expand Down
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased]

## [0.7.4] - 2023-02-27

- Removed field validation from resources configuration field - now it can take any custom parameters such as "nvidia.com/gpu":1
- Added support for kedro namespaces for parameters

## [0.7.3] - 2022-09-23

- Fixed plugin config provider so it respects environment provided by the user
Expand Down Expand Up @@ -167,7 +172,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.7.3...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.7.4...HEAD

[0.7.4]: https://github.com/getindata/kedro-kubeflow/compare/0.7.3...0.7.4

[0.7.3]: https://github.com/getindata/kedro-kubeflow/compare/0.7.2...0.7.3

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.7.3"
__version__ = "0.7.4"
20 changes: 15 additions & 5 deletions kedro_kubeflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,13 @@ def __getitem__(self, key):
)


class ResourceConfig(BaseModel):
cpu: Optional[str]
memory: Optional[str]
class ResourceConfig(dict):
def __getitem__(self, key):
defaults: dict = super().get("__default__")
this: dict = super().get(key, {})
updated_defaults = defaults.copy()
updated_defaults.update(this)
return updated_defaults


class TolerationConfig(BaseModel):
Expand Down Expand Up @@ -286,9 +290,15 @@ def _create_default_dict_with(

@validator("resources", always=True)
def _validate_resources(cls, value):
return RunConfig._create_default_dict_with(
value, ResourceConfig(cpu="500m", memory="1024Mi")
default = ResourceConfig(
{"__default__": {"cpu": "500m", "memory": "1024Mi"}}
)
if isinstance(value, dict):
default.update(value)
elif value is not None:
logger.error(f"Unknown type for resource config {type(value)}")
raise TypeError(f"Unknown type for resource config {type(value)}")
return default

@validator("retry_policy", always=True)
def _validate_retry_policy(cls, value):
Expand Down
14 changes: 9 additions & 5 deletions kedro_kubeflow/generators/one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
customize_op,
is_local_fs,
maybe_add_params,
merge_namespaced_params_to_dict,
)


Expand All @@ -26,8 +27,10 @@ def __init__(self, config, project_name, context):
self.catalog = context.config_loader.get("catalog*")

def generate_pipeline(self, pipeline, image, image_pull_policy):
merged_params = merge_namespaced_params_to_dict(self.context.params)

@dsl.pipeline(self.project_name, self.run_config.description)
@maybe_add_params(self.context.params)
@maybe_add_params(merged_params)
def convert_kedro_pipeline_to_kfp() -> None:
dsl.get_pipeline_conf().set_ttl_seconds_after_finished(
self.run_config.ttl
Expand All @@ -39,13 +42,16 @@ def convert_kedro_pipeline_to_kfp() -> None:
self.run_config,
self.context,
):
self._build_kfp_op(pipeline, image, image_pull_policy)
self._build_kfp_op(
pipeline, merged_params, image, image_pull_policy
)

return convert_kedro_pipeline_to_kfp

def _build_kfp_op(
self,
pipeline,
params,
image,
image_pull_policy,
) -> dsl.ContainerOp:
Expand All @@ -59,9 +65,7 @@ def _build_kfp_op(
f"--pipeline {pipeline} "
f"--config config.yaml"
),
arguments=create_arguments_from_parameters(
self.context.params.keys()
),
arguments=create_arguments_from_parameters(params.keys()),
container_kwargs={"env": create_container_environment()},
file_outputs={
output: f"/home/kedro/{self.catalog[output]['filepath']}"
Expand Down
16 changes: 11 additions & 5 deletions kedro_kubeflow/generators/pod_per_node_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
customize_op,
is_local_fs,
maybe_add_params,
merge_namespaced_params_to_dict,
)


Expand All @@ -36,11 +37,13 @@ def configure_max_cache_staleness(self, kfp_ops):
)

def generate_pipeline(self, pipeline, image, image_pull_policy):
merged_params = merge_namespaced_params_to_dict(self.context.params)

@dsl.pipeline(
name=self.project_name,
description=self.run_config.description,
)
@maybe_add_params(self.context.params)
@maybe_add_params(merged_params)
def convert_kedro_pipeline_to_kfp() -> None:
"""Convert from a Kedro pipeline into a kfp container graph."""

Expand All @@ -58,7 +61,11 @@ def convert_kedro_pipeline_to_kfp() -> None:
self.context,
):
kfp_ops = self._build_kfp_ops(
pipeline, node_dependencies, image, image_pull_policy
pipeline,
merged_params,
node_dependencies,
image,
image_pull_policy,
)

self.configure_max_cache_staleness(kfp_ops)
Expand All @@ -71,6 +78,7 @@ def convert_kedro_pipeline_to_kfp() -> None:
def _build_kfp_ops(
self,
pipeline,
params,
node_dependencies: Dict[Node, Set[Node]],
image,
image_pull_policy,
Expand Down Expand Up @@ -129,9 +137,7 @@ def _build_kfp_ops(
f"--node {node.name} "
f"--config config.yaml"
),
arguments=create_arguments_from_parameters(
self.context.params.keys()
),
arguments=create_arguments_from_parameters(params.keys()),
pvolumes=node_volumes,
container_kwargs={"env": nodes_env},
file_outputs={
Expand Down
47 changes: 45 additions & 2 deletions kedro_kubeflow/generators/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import itertools
import json
import os
from functools import wraps
from functools import reduce, wraps
from inspect import Parameter, signature

import fsspec
Expand Down Expand Up @@ -40,6 +40,49 @@ def wrapper(*args, **kwargs):
return decorator


def merge_namespaced_params_to_dict(kedro_parameters):
def dict_merge(dct, merge_dct):
"""Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
updating only top-level keys, dict_merge recurses down into dicts nested
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
``dct``.
:param dct: dict onto which the merge is executed
:param merge_dct: dct merged into dct
:return: None
"""
for k, v in merge_dct.items():
if (
k in dct
and isinstance(dct[k], dict)
and isinstance(merge_dct[k], dict)
): # noqa
dict_merge(dct[k], merge_dct[k])
else:
dct[k] = merge_dct[k]

return dct

normalized_params = []
for key, param in kedro_parameters.items():
if "." in key:
nested_keys = key.split(".")
top_key = nested_keys.pop(0)
bottom_key = nested_keys.pop()
inner_dict = {bottom_key: param}
for nested_key in nested_keys[-1:]:
inner_dict = {nested_key: inner_dict}

inner_dict = {top_key: inner_dict}
normalized_params.append(inner_dict)
else:
normalized_params.append({key: param})

if normalized_params:
return reduce(dict_merge, normalized_params)
else:
return dict()


def create_container_environment():
env_vars = [
k8s.V1EnvVar(
Expand Down Expand Up @@ -131,7 +174,7 @@ def customize_op(op, image_pull_policy, run_config: RunConfig):
k8s.V1SecurityContext(run_as_user=run_config.volume.owner)
)

resources = run_config.resources[op.name].dict(exclude_none=True)
resources = run_config.resources[op.name]
op.container.resources = k8s.V1ResourceRequirements(
limits=resources,
requests=resources,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "kedro-kubeflow"
version = "0.7.3"
version = "0.7.4"
description = "Kedro plugin with Kubeflow Pipelines support"
readme = "README.md"
authors = ['Mateusz Pytel <mateusz.pytel@getindata.com>', 'Mariusz Strzelecki <mariusz.strzelecki@getindata.com>']
Expand Down
2 changes: 1 addition & 1 deletion sonar-project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ sonar.tests=tests/
sonar.python.coverage.reportPaths=coverage.xml
sonar.python.version=3.9

sonar.projectVersion=0.7.3
sonar.projectVersion=0.7.4
sonar.projectDescription=Kedro plugin with Kubeflow Pipelines support
sonar.links.homepage=https://kedro-kubeflow.readthedocs.io/
sonar.links.ci=https://github.com/getindata/kedro-kubeflow/actions
Expand Down
50 changes: 12 additions & 38 deletions tests/e2e/catalog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,38 @@ model_input_table:
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/03_primary/model_input_table.csv
layer: primary

data_processing.preprocessed_companies:
preprocessed_companies:
type: pandas.CSVDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/02_intermediate/preprocessed_companies.csv
layer: intermediate

data_processing.preprocessed_shuttles:
preprocessed_shuttles:
type: pandas.CSVDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/02_intermediate/preprocessed_shuttles.csv
layer: intermediate

data_science.active_modelling_pipeline.X_train:
X_train:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/05_model_input/X_train.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/05_model_input/X_train.pickle
layer: model_input

data_science.active_modelling_pipeline.y_train:
y_train:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/05_model_input/y_train.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/05_model_input/y_train.pickle
layer: model_input

data_science.active_modelling_pipeline.X_test:
X_test:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/05_model_input/X_test.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/05_model_input/X_test.pickle
layer: model_input

data_science.active_modelling_pipeline.y_test:
y_test:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/05_model_input/y_test.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/05_model_input/y_test.pickle
layer: model_input

data_science.active_modelling_pipeline.regressor:
regressor:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/active_modelling_pipeline/06_models/regressor.pickle
versioned: true
layer: models

data_science.candidate_modelling_pipeline.X_train:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/05_model_input/X_train.pickle
layer: model_input

data_science.candidate_modelling_pipeline.y_train:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/05_model_input/y_train.pickle
layer: model_input

data_science.candidate_modelling_pipeline.X_test:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/05_model_input/X_test.pickle
layer: model_input

data_science.candidate_modelling_pipeline.y_test:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/05_model_input/y_test.pickle
layer: model_input

data_science.candidate_modelling_pipeline.regressor:
type: pickle.PickleDataSet
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/candidate_modelling_pipeline/06_models/regressor.pickle
filepath: gs://gid-ml-ops-sandbox-plugin-tests/kube_${run_id}/06_models/regressor.pickle
versioned: true
layer: models
Loading