Skip to content

Commit

Permalink
Merge pull request #97 from getindata/release-0.5.1
Browse files Browse the repository at this point in the history
Release 0.5.1
  • Loading branch information
Mariusz Strzelecki committed Jan 28, 2022
2 parents c464564 + 979f95a commit 75f2090
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 23 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

## [0.5.1] - 2022-01-28

- Possibility to run custom Kedro pipeline as on-exit-handler

## [0.5.0] - 2022-01-27

- Kedro paramters of complex types (lists and dicts) are now supported
Expand Down Expand Up @@ -108,7 +112,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.5.0...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.5.1...HEAD

[0.5.1]: https://github.com/getindata/kedro-kubeflow/compare/0.5.0...0.5.1

[0.5.0]: https://github.com/getindata/kedro-kubeflow/compare/0.4.8...0.5.0

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.5.0"
version = "0.5.1"
18 changes: 18 additions & 0 deletions kedro_kubeflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,21 @@ def mlflow_start(ctx, kubeflow_run_id: str, output: str):
with open(output, "w") as f:
f.write(run.info.run_id)
click.echo(f"Started run: {run.info.run_id}")


@kubeflow_group.command(hidden=True)
@click.argument("pvc_name", type=str)
def delete_pipeline_volume(pvc_name: str):
import kubernetes.client
import kubernetes.config

kubernetes.config.load_incluster_config()
current_namespace = open(
"/var/run/secrets/kubernetes.io/serviceaccount/namespace"
).read()

kubernetes.client.CoreV1Api().delete_namespaced_persistent_volume_claim(
pvc_name,
current_namespace,
)
click.echo(f"Volume removed: {pvc_name}")
8 changes: 8 additions & 0 deletions kedro_kubeflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
# volume after pipeline finishes) [in seconds]. Default: 1 week
ttl: 604800
# What Kedro pipeline should be run as the last step regardless of the
# pipeline status. Used to send notifications or raise the alerts
# on_exit_pipeline: notify_via_slack
# This sets the caching option for pipeline using
# execution_options.caching_strategy.max_cache_staleness
# See https://en.wikipedia.org/wiki/ISO_8601 in section 'Duration'
Expand Down Expand Up @@ -248,6 +252,10 @@ def max_cache_staleness(self):
def ttl(self):
return int(self._get_or_default("ttl", 3600 * 24 * 7))

@property
def on_exit_pipeline(self):
return self._get_or_default("on_exit_pipeline", None)

@property
def vertex_ai_networking(self):
return VertexAiNetworkingConfig(
Expand Down
55 changes: 39 additions & 16 deletions kedro_kubeflow/generators/pod_per_node_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def convert_kedro_pipeline_to_kfp() -> None:
node_dependencies = self.context.pipelines.get(
pipeline
).node_dependencies
with self._create_pipeline_exit_handler(pipeline):
with self._create_pipeline_exit_handler(
pipeline, image, image_pull_policy
):
kfp_ops = self._build_kfp_ops(
pipeline, node_dependencies, image, image_pull_policy
)
Expand All @@ -59,37 +61,58 @@ def convert_kedro_pipeline_to_kfp() -> None:

return convert_kedro_pipeline_to_kfp

def _create_pipeline_exit_handler(self, pipeline):
def _create_pipeline_exit_handler(
self, pipeline, image, image_pull_policy
):
enable_volume_cleaning = (
self.run_config.volume is not None
and not self.run_config.volume.keep
)

if not enable_volume_cleaning:
if not enable_volume_cleaning and not self.run_config.on_exit_pipeline:
return contextlib.nullcontext()

exit_container_op = dsl.ContainerOp(
name="schedule-volume-termination",
image="gcr.io/cloud-builders/kubectl",
command=[
"kubectl",
"delete",
"pvc",
commands = []

if enable_volume_cleaning:
commands.append(
"kedro kubeflow delete-pipeline-volume "
"{{workflow.name}}-"
+ sanitize_k8s_name(f"{pipeline}-data-volume"),
"--wait=false",
"--ignore-not-found",
"--output",
"name",
+ sanitize_k8s_name(f"{pipeline}-data-volume")
)

if self.run_config.on_exit_pipeline:
commands.append(
"kedro run "
"--config config.yaml "
f"--env {self.context.env} "
f"--pipeline {self.run_config.on_exit_pipeline}"
)

exit_container_op = dsl.ContainerOp(
name="on-exit",
image=image,
command=create_command_using_params_dumper(";".join(commands)),
arguments=create_arguments_from_parameters(
self.context.params.keys()
)
+ [
"status",
"{{workflow.status}}",
"failures",
"{{workflow.failures}}",
],
container_kwargs={"env": create_container_environment()},
)

if self.run_config.max_cache_staleness not in [None, ""]:
exit_container_op.execution_options.caching_strategy.max_cache_staleness = (
self.run_config.max_cache_staleness
)

return dsl.ExitHandler(exit_container_op)
return dsl.ExitHandler(
self._customize_op(exit_container_op, image_pull_policy)
)

def _build_kfp_ops(
self,
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.5.0
current_version = 0.5.1

[bumpversion:file:setup.py]

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

setup(
name="kedro-kubeflow",
version="0.5.0",
version="0.5.1",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
19 changes: 19 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import unittest
import unittest.mock as um
from collections import namedtuple
from pathlib import Path
from tempfile import TemporaryDirectory
Expand All @@ -9,6 +10,7 @@

from kedro_kubeflow.cli import (
compile,
delete_pipeline_volume,
init,
list_pipelines,
mlflow_start,
Expand Down Expand Up @@ -237,3 +239,20 @@ def test_mlflow_start(
assert f.read() == "MLFLOW_RUN_ID"

set_tag_mock.assert_called_with("kubeflow_run_id", "KUBEFLOW_RUN_ID")

@patch("kubernetes.client")
@patch("kubernetes.config")
def test_delete_pipeline_volume(self, k8s_config_mock, k8s_client_mock):
with um.patch(
"builtins.open", um.mock_open(read_data="unittest-namespace")
):
runner = CliRunner()
result = runner.invoke(
delete_pipeline_volume,
["workflow-name"],
)
assert result.exit_code == 0
core_api = k8s_client_mock.CoreV1Api()
core_api.delete_namespaced_persistent_volume_claim.assert_called_with(
"workflow-name", "unittest-namespace"
)
62 changes: 59 additions & 3 deletions tests/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@ def test_should_support_inter_steps_volume_with_defaults(self):

# then
assert len(dsl_pipeline.ops) == 5
assert "schedule-volume-termination" in dsl_pipeline.ops
assert "on-exit" in dsl_pipeline.ops
assert (
dsl_pipeline.ops["on-exit"]
.container.command[-1]
.endswith(
"kedro kubeflow delete-pipeline-volume "
"{{workflow.name}}-pipeline-data-volume"
)
)
volume_spec = dsl_pipeline.ops["data-volume-create"].k8s_resource.spec
assert volume_spec.resources.requests["storage"] == "1Gi"
assert volume_spec.access_modes == ["ReadWriteOnce"]
Expand All @@ -68,6 +76,54 @@ def test_should_support_inter_steps_volume_with_defaults(self):
== 0
)

def test_should_generate_on_exit_pipeline_run(self):
# given
self.create_generator(config={"on_exit_pipeline": "notify_via_slack"})

# when
pipeline = self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "IfNotPresent"
)
with kfp.dsl.Pipeline(None) as dsl_pipeline:
pipeline()

# then
assert "on-exit" in dsl_pipeline.ops
assert (
dsl_pipeline.ops["on-exit"]
.container.command[-1]
.endswith(
"kedro run --config config.yaml "
"--env unittests --pipeline notify_via_slack"
)
)

def test_should_generate_volume_removal_and_on_exit_pipeline_run(self):
# given
self.create_generator(
config={"volume": {}, "on_exit_pipeline": "notify_via_slack"}
)

# when
pipeline = self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "IfNotPresent"
)
with kfp.dsl.Pipeline(None) as dsl_pipeline:
pipeline()

# then
assert "on-exit" in dsl_pipeline.ops
assert (
dsl_pipeline.ops["on-exit"]
.container.command[-1]
.endswith(
"kedro kubeflow delete-pipeline-volume "
"{{workflow.name}}-pipeline-data-volume;"
"kedro run --config config.yaml "
"--env unittests --pipeline notify_via_slack"
)
)

def test_should_support_inter_steps_volume_with_given_spec(self):
# given
self.create_generator(
Expand All @@ -89,7 +145,7 @@ def test_should_support_inter_steps_volume_with_given_spec(self):

# then
assert len(dsl_pipeline.ops) == 5
assert "schedule-volume-termination" in dsl_pipeline.ops
assert "on-exit" in dsl_pipeline.ops
volume_spec = dsl_pipeline.ops["data-volume-create"].k8s_resource.spec
assert volume_spec.resources.requests["storage"] == "1Mi"
assert volume_spec.access_modes == ["ReadWriteOnce"]
Expand Down Expand Up @@ -174,7 +230,7 @@ def test_should_skip_volume_init_if_requested(self):
# then
assert len(dsl_pipeline.ops) == 4
assert "data-volume-create" in dsl_pipeline.ops
assert "schedule-volume-termination" in dsl_pipeline.ops
assert "on-exit" in dsl_pipeline.ops
assert "data-volume-init" not in dsl_pipeline.ops
for node_name in ["node1", "node2"]:
volumes = dsl_pipeline.ops[node_name].container.volume_mounts
Expand Down

0 comments on commit 75f2090

Please sign in to comment.