Skip to content

Commit

Permalink
Merge pull request #69 from getindata/release-0.4.2
Browse files Browse the repository at this point in the history
Release 0.4.2
  • Loading branch information
Mariusz Strzelecki committed Aug 19, 2021
2 parents f8f05ac + dd3b693 commit 1dba38f
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 11 deletions.
5 changes: 3 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
repos:
- repo: https://github.com/pre-commit/mirrors-isort
rev: v4.3.21
- repo: https://github.com/pycqa/isort
rev: 5.5.4
hooks:
- id: isort
args: ["--profile", "black", "--line-length=79"]
- repo: https://github.com/psf/black
rev: stable
hooks:
Expand Down
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

## [0.4.2] - 2021-08-19

- Improved Vertex scheduling: removal of stale schedules

## [0.4.1] - 2021-08-18

- Passing Kedro environment and pipeline name in Vertex nodes
Expand Down Expand Up @@ -75,7 +79,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.1...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.2...HEAD

[0.4.2]: https://github.com/getindata/kedro-kubeflow/compare/0.4.1...0.4.2

[0.4.1]: https://github.com/getindata/kedro-kubeflow/compare/0.4.0...0.4.1

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.4.1"
version = "0.4.2"
31 changes: 30 additions & 1 deletion kedro_kubeflow/vertex_ai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
Vertex AI Pipelines specific client, based on AIPlatformClient.
"""

import json
import logging
import os
from tempfile import NamedTemporaryFile

from google.cloud.scheduler_v1.services.cloud_scheduler import (
CloudSchedulerClient,
)
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from tabulate import tabulate
Expand All @@ -25,6 +29,10 @@ def __init__(self, config, project_name, context):
self.api_client = AIPlatformClient(
project_id=config.project_id, region=config.region
)
self.cloud_scheduler_client = CloudSchedulerClient()
self.location = (
f"projects/{config.project_id}/locations/{config.region}"
)
self.run_config = config.run_config

def list_pipelines(self):
Expand Down Expand Up @@ -116,6 +124,24 @@ def upload(self, pipeline, image, image_pull_policy="IfNotPresent"):
"""
raise NotImplementedError("Upload is not supported for VertexAI")

def _cleanup_old_schedule(self, pipeline_name):
"""
Removes old jobs scheduled for given pipeline name
"""
for job in self.cloud_scheduler_client.list_jobs(parent=self.location):
if "jobs/pipeline_pipeline" not in job.name:
continue

job_pipeline_name = json.loads(job.http_target.body)[
"pipelineSpec"
]["pipelineInfo"]["name"]
if job_pipeline_name == pipeline_name:
self.log.info(
"Found existing schedule for the pipeline at %s, deleting...",
job.schedule,
)
self.cloud_scheduler_client.delete_job(name=job.name)

def schedule(
self,
pipeline,
Expand All @@ -133,6 +159,7 @@ def schedule(
:param image_pull_policy:
:return:
"""
self._cleanup_old_schedule(self.generator.get_pipeline_name())
with NamedTemporaryFile(
mode="rt", prefix="kedro-kubeflow", suffix=".json"
) as spec_output:
Expand All @@ -144,8 +171,10 @@ def schedule(
)
self.api_client.create_schedule_from_job_spec(
job_spec_path=spec_output.name,
time_zone="Etc/UTC",
schedule=cron_expression,
parameter_values={},
pipeline_root=f"gs://{self.run_config.root}",
enable_caching=False,
)

self.log.info("Pipeline scheduled to %s", cron_expression)
8 changes: 7 additions & 1 deletion kedro_kubeflow/vertex_ai/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def __init__(self, config, project_name, context):
self.run_config = config.run_config
self.catalog = context.config_loader.get("catalog*")

def get_pipeline_name(self):
"""
Returns Vertex-compatible pipeline name
"""
return self.project_name.lower().replace(" ", "-")

def generate_pipeline(self, pipeline, image, image_pull_policy, token):
"""
This method return @dsl.pipeline annotated function that contains
Expand All @@ -58,7 +64,7 @@ def set_dependencies(node, dependencies, kfp_ops):
kfp_ops[name].after(kfp_ops[dependency_name])

@dsl.pipeline(
name=self.project_name.lower().replace(" ", "-"),
name=self.get_pipeline_name(),
description=self.run_config.description,
)
def convert_kedro_pipeline_to_kfp() -> None:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.1
current_version = 0.4.2

[bumpversion:file:setup.py]

Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
"recommonmark==0.7.1",
"sphinx_rtd_theme==0.5.1",
],
"vertexai": [
"google-cloud-scheduler>=2.3.2",
],
}

setup(
name="kedro-kubeflow",
version="0.4.1",
version="0.4.2",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
88 changes: 85 additions & 3 deletions tests/test_vertex_ai_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,25 @@
import unittest
from unittest.mock import MagicMock, patch

from kedro_kubeflow.config import PluginConfig
from kedro_kubeflow.utils import strip_margin
from kedro_kubeflow.vertex_ai.client import VertexAIPipelinesClient


class TestKubeflowClient(unittest.TestCase):
def create_client(self):
return VertexAIPipelinesClient(MagicMock(), MagicMock(), MagicMock())
@patch("kedro_kubeflow.vertex_ai.client.CloudSchedulerClient")
def create_client(self, cloud_scheduler_client_mock):
self.cloud_scheduler_client_mock = (
cloud_scheduler_client_mock.return_value
)
config = PluginConfig(
{
"project_id": "PROJECT_ID",
"region": "REGION",
"run_config": {"image": "IMAGE", "root": "BUCKET/PREFIX"},
}
)
return VertexAIPipelinesClient(config, MagicMock(), MagicMock())

def test_compile(self):
with patch(
Expand Down Expand Up @@ -101,7 +113,77 @@ def test_should_schedule_pipeline(self):

client_under_test = self.create_client()
client_under_test.schedule(
MagicMock("pipeline"), "image", "0 0 12 * *", "test-run"
MagicMock("pipeline"), None, None, "0 0 12 * *"
)

ai_client.create_schedule_from_job_spec.assert_called_once()
args, kwargs = ai_client.create_schedule_from_job_spec.call_args
assert kwargs["time_zone"] == "Etc/UTC"
assert kwargs["enable_caching"] is False
assert kwargs["schedule"] == "0 0 12 * *"
assert kwargs["pipeline_root"] == "gs://BUCKET/PREFIX"

def test_should_remove_old_schedule(self):
def mock_job(job_name, pipeline_name=None):
if pipeline_name:
body = (
'{"pipelineSpec": {"pipelineInfo": {"name": "'
+ pipeline_name
+ '"}}}'
)
else:
body = ""
return type(
"obj",
(object,),
{
"schedule": "* * * * *",
"name": job_name,
"http_target": type("obj", (object,), {"body": body}),
},
)

with patch(
"kedro_kubeflow.vertex_ai.client.PipelineGenerator"
) as generator, patch(
"kedro_kubeflow.vertex_ai.client.AIPlatformClient"
) as AIPlatformClient, patch(
"kfp.v2.compiler.Compiler"
):
# given
ai_client = AIPlatformClient.return_value
client_under_test = self.create_client()
generator.return_value.get_pipeline_name.return_value = (
"unittest-pipeline"
)
self.cloud_scheduler_client_mock.list_jobs.return_value = [
# not removed (some other job)
mock_job(job_name="some-job"),
# not removed (some other pipeline)
mock_job(
job_name="projects/.../locations/.../jobs/pipeline_pipeline_abc",
pipeline_name="some-other-pipeline",
),
# removed
mock_job(
job_name="projects/.../locations/.../jobs/pipeline_pipeline_def",
pipeline_name="unittest-pipeline",
),
]

# when
client_under_test.schedule(
MagicMock("pipeline"), None, None, "0 0 12 * *"
)

# then
ai_client.create_schedule_from_job_spec.assert_called_once()
self.cloud_scheduler_client_mock.delete_job.assert_called_once()
(
args,
kwargs,
) = self.cloud_scheduler_client_mock.delete_job.call_args
assert (
kwargs["name"]
== "projects/.../locations/.../jobs/pipeline_pipeline_def"
)
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pip_version =
extras =
mlflow
tests
vertexai
commands=
python -m pytest --cov kedro_kubeflow --cov-report xml --cov-report term-missing --ignore=venv

Expand Down

0 comments on commit 1dba38f

Please sign in to comment.