Skip to content

Commit

Permalink
Merge pull request #67 from getindata/release-0.4.1
Browse files Browse the repository at this point in the history
Release 0.4.1
  • Loading branch information
Mariusz Strzelecki committed Aug 18, 2021
2 parents eeffe7c + 25f7d2a commit f8f05ac
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 23 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## [Unreleased]

## [0.4.1] - 2021-08-18

- Passing Kedro environment and pipeline name in Vertex nodes
- Setting artifact type based on catalog layer in Vertex pipeline
- Added `pipeline` param to `schedule` in Vertex

## [0.4.0] - 2021-08-11

- Support for kedro-mlflow>=0.7
Expand Down Expand Up @@ -69,7 +75,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.0...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.1...HEAD

[0.4.1]: https://github.com/getindata/kedro-kubeflow/compare/0.4.0...0.4.1

[0.4.0]: https://github.com/getindata/kedro-kubeflow/compare/0.3.1...0.4.0

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.4.0"
version = "0.4.1"
16 changes: 14 additions & 2 deletions kedro_kubeflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ def upload_pipeline(ctx, image, pipeline) -> None:


@kubeflow_group.command()
@click.option(
"-p",
"--pipeline",
"pipeline",
type=str,
help="Name of pipeline to run",
default="__default__",
)
@click.option(
"-c",
"--cron-expression",
Expand All @@ -183,15 +191,19 @@ def upload_pipeline(ctx, image, pipeline) -> None:
)
@click.pass_context
def schedule(
ctx, experiment_namespace: str, experiment_name: str, cron_expression: str
ctx,
pipeline: str,
experiment_namespace: str,
experiment_name: str,
cron_expression: str,
):
"""Schedules recurring execution of latest version of the pipeline"""
context_helper = ctx.obj["context_helper"]
config = context_helper.config.run_config
experiment = experiment_name if experiment_name else config.experiment_name

context_helper.kfp_client.schedule(
experiment, experiment_namespace, cron_expression
pipeline, experiment, experiment_namespace, cron_expression
)


Expand Down
4 changes: 3 additions & 1 deletion kedro_kubeflow/kfpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ def _ensure_experiment_exists(self, experiment_name, experiment_namespace):

return experiment.id

def schedule(self, experiment_name, experiment_namespace, cron_expression):
def schedule(
self, pipeline, experiment_name, experiment_namespace, cron_expression
):
experiment_id = self._ensure_experiment_exists(
experiment_name, experiment_namespace
)
Expand Down
11 changes: 7 additions & 4 deletions kedro_kubeflow/vertex_ai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def list_pipelines(self):
"""
pipelines = self.api_client.list_jobs()["pipelineJobs"]
return tabulate(
map(lambda x: [x["displayName"], x["name"]], pipelines),
map(lambda x: [x.get("displayName"), x["name"]], pipelines),
headers=["Name", "ID"],
)

Expand All @@ -46,6 +46,7 @@ def run_once(
run_name,
wait=False,
image_pull_policy="IfNotPresent",
experiment_namespace=None,
):
"""
Runs the pipeline in Vertex AI Pipelines
Expand Down Expand Up @@ -118,14 +119,16 @@ def upload(self, pipeline, image, image_pull_policy="IfNotPresent"):
def schedule(
self,
pipeline,
image,
experiment_name,
experiment_namespace,
cron_expression,
image_pull_policy="IfNotPresent",
):
"""
Schedule pipeline to Vertex AI with given cron expression
:param pipeline:
:param image:
:param experiment_name:
:param experiment_namespace:
:param cron_expression:
:param image_pull_policy:
:return:
Expand All @@ -135,7 +138,7 @@ def schedule(
) as spec_output:
self.compile(
pipeline,
image,
self.run_config.image,
output=spec_output.name,
image_pull_policy=image_pull_policy,
)
Expand Down
23 changes: 19 additions & 4 deletions kedro_kubeflow/vertex_ai/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ def convert_kedro_pipeline_to_kfp() -> None:
node_dependencies = self.context.pipelines.get(
pipeline
).node_dependencies
kfp_ops = self._build_kfp_ops(node_dependencies, image, token)
kfp_ops = self._build_kfp_ops(
node_dependencies, image, pipeline, token
)
for node, dependencies in node_dependencies.items():
set_dependencies(node, dependencies, kfp_ops)

Expand Down Expand Up @@ -93,11 +95,12 @@ def _create_mlflow_op(self, image, tracking_token) -> dsl.ContainerOp:
"`dirname {{$.outputs.parameters['output'].output_file}}`",
"&&",
"MLFLOW_TRACKING_TOKEN={{$.inputs.parameters['mlflow_tracking_token']}} "
"kedro kubeflow mlflow-start "
f"kedro kubeflow -e {self.context.env} mlflow-start "
"--output {{$.outputs.parameters['output'].output_file}} "
+ self.run_config.run_name,
]
)

spec = ComponentSpec(
name="mlflow-start-run",
inputs=[InputSpec("mlflow_tracking_token", "String")],
Expand Down Expand Up @@ -132,6 +135,7 @@ def _build_kfp_ops(
self,
node_dependencies: Dict[Node, Set[Node]],
image,
pipeline,
tracking_token=None,
) -> Dict[str, dsl.ContainerOp]:
"""Build kfp container graph from Kedro node dependencies."""
Expand Down Expand Up @@ -162,8 +166,13 @@ def _build_kfp_ops(
else []
)

kedro_command = (
f'kedro run {params_parameter} --node "{node.name}"'
kedro_command = " ".join(
[
f"kedro run -e {self.context.env}",
f"--pipeline {pipeline}",
f"{params_parameter}",
f'--node "{node.name}"',
]
)
node_command = " ".join(
[
Expand Down Expand Up @@ -229,6 +238,12 @@ def _get_data_path(self):
f"{self.run_config.experiment_name}/{self.run_config.run_name}/data"
)

def _get_mlruns_path(self):
return (
f"{self.run_config.root}/"
f"{self.run_config.experiment_name}/{self.run_config.run_name}/mlruns"
)

def _setup_volume_op(self, image):
command = " ".join(
[
Expand Down
12 changes: 11 additions & 1 deletion kedro_kubeflow/vertex_ai/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ def is_file_path_input(input_data):
return input_params, input_specs


def get_output_type(output, catalog):
"""
Returns Vertex output type based on the layer in Kedro catalog
"""
if catalog[output].get("layer") == "models":
return "Model"
return "Dataset"


def generate_outputs(node: Node, catalog):
"""
Generates outputs for a particular kedro node
Expand All @@ -66,7 +75,8 @@ def generate_outputs(node: Node, catalog):
and ":/" not in catalog[o]["filepath"]
}
output_specs = [
structures.OutputSpec(o, "Dataset") for o in data_mapping.keys()
structures.OutputSpec(o, get_output_type(o, catalog))
for o in data_mapping.keys()
]
output_copy_commands = " ".join(
[
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.0
current_version = 0.4.1

[bumpversion:file:setup.py]

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

# Dev Requirements
EXTRA_REQUIRE = {
"mlflow": ["kedro-mlflow>=0.4.1,<0.5"],
"mlflow": ["kedro-mlflow>=0.4.1"],
"tests": [
"pytest>=5.4.0, <7.0.0",
"pytest-cov>=2.8.0, <3.0.0",
Expand All @@ -33,7 +33,7 @@

setup(
name="kedro-kubeflow",
version="0.4.0",
version="0.4.1",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
6 changes: 4 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ def test_schedule(self):
runner = CliRunner()

result = runner.invoke(
schedule, ["-c", "* * *", "-x", "test_experiment"], obj=config
schedule,
["-c", "* * *", "-x", "test_experiment", "-p", "my-pipeline"],
obj=config,
)

assert result.exit_code == 0
context_helper.kfp_client.schedule.assert_called_with(
"test_experiment", None, "* * *"
"my-pipeline", "test_experiment", None, "* * *"
)

@patch.object(Path, "cwd")
Expand Down
4 changes: 4 additions & 0 deletions tests/test_kfpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def test_should_schedule_pipeline(self):

# when
self.client_under_test.schedule(
pipeline=None,
experiment_name="EXPERIMENT",
cron_expression="0 * * * * *",
experiment_namespace=None,
Expand Down Expand Up @@ -228,6 +229,7 @@ def test_should_schedule_pipeline_and_create_experiment_if_needed(self):

# when
self.client_under_test.schedule(
pipeline=None,
experiment_name="EXPERIMENT",
cron_expression="0 * * * * *",
experiment_namespace=None,
Expand Down Expand Up @@ -258,6 +260,7 @@ def test_should_disable_old_runs_before_schedule(self):

# when
self.client_under_test.schedule(
pipeline=None,
experiment_name="EXPERIMENT",
cron_expression="0 * * * * *",
experiment_namespace=None,
Expand Down Expand Up @@ -352,4 +355,5 @@ def setUp(self):
self.create_client({})

def tearDown(self):
__builtins__["__import__"] = self.realimport
os.environ["IAP_CLIENT_ID"] = ""
9 changes: 7 additions & 2 deletions tests/test_vertex_ai_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def test_should_list_pipelines(self):
"name": "projects/29350373243/locations/"
"europe-west4/pipelineJobs/run2",
},
{
"name": "projects/123/locations/"
"europe-west4/pipelineJobs/no-display-name",
},
]
}

Expand All @@ -79,9 +83,10 @@ def test_should_list_pipelines(self):

expected_output = """
|Name ID
|------ -------------------------------------------------------------
|------ ----------------------------------------------------------------
|run1 projects/29350373243/locations/europe-west4/pipelineJobs/run1
|run2 projects/29350373243/locations/europe-west4/pipelineJobs/run2"""
|run2 projects/29350373243/locations/europe-west4/pipelineJobs/run2
| projects/123/locations/europe-west4/pipelineJobs/no-display-name"""
assert tabulation == strip_margin(expected_output)

def test_should_schedule_pipeline(self):
Expand Down
47 changes: 45 additions & 2 deletions tests/test_vertex_ai_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ def test_artifact_registration(self):
"B": {
"type": "pandas.CSVDataSet",
"filepath": "data/02_intermediate/b.csv",
}
},
"C": {
"type": "pickle.PickleDataSet",
"filepath": "data/06_models/model.pkl",
"layer": "models",
},
}
)

Expand All @@ -136,7 +141,7 @@ def test_artifact_registration(self):
name="B", op_name="node1", param_type="Dataset"
)
outputs2 = dsl_pipeline.ops["node2"].outputs
assert len(outputs2) == 0 # output "C" is missing in the catalog
assert outputs2["C"].param_type == "Model"

def test_should_skip_volume_removal_if_requested(self):
# given
Expand All @@ -152,6 +157,43 @@ def test_should_skip_volume_removal_if_requested(self):
# then
assert "schedule-volume-termination" not in dsl_pipeline.ops

def test_should_add_env_and_pipeline_in_the_invocations(self):
# given
self.create_generator()
self.mock_mlflow(True)

# when
pipeline = self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "Never", "MLFLOW_TRACKING_TOKEN"
)
with kfp.dsl.Pipeline(None) as dsl_pipeline:
pipeline()

# then
assert (
"kedro kubeflow -e unittests mlflow-start"
in dsl_pipeline.ops["mlflow-start-run"].container.args[0]
)
assert (
'kedro run -e unittests --pipeline pipeline --node "node1"'
in dsl_pipeline.ops["node1"].container.args[0]
)

def mock_mlflow(self, enabled=False):
def fakeimport(name, *args, **kw):
if not enabled and name == "mlflow":
raise ImportError
return self.realimport(name, *args, **kw)

__builtins__["__import__"] = fakeimport

def setUp(self):
self.realimport = __builtins__["__import__"]
self.mock_mlflow(False)

def tearDown(self):
__builtins__["__import__"] = self.realimport

def create_generator(self, config={}, params={}, catalog={}):
project_name = "my-awesome-project"
config_loader = MagicMock()
Expand All @@ -160,6 +202,7 @@ def create_generator(self, config={}, params={}, catalog={}):
"obj",
(object,),
{
"env": "unittests",
"params": params,
"config_loader": config_loader,
"pipelines": {
Expand Down

0 comments on commit f8f05ac

Please sign in to comment.