From e9e0e96f5ce47fb8926fad8cb4aca7ffd5aebee0 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 1 Oct 2019 14:03:33 -0400 Subject: [PATCH 1/4] Allow for parameters from file for run cloud CLI command --- src/prefect/cli/run.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/prefect/cli/run.py b/src/prefect/cli/run.py index a4ef00dc30ca..5d991a6c260f 100644 --- a/src/prefect/cli/run.py +++ b/src/prefect/cli/run.py @@ -1,3 +1,4 @@ +import json import time import click @@ -45,6 +46,12 @@ def run(): hidden=True, ) @click.option("--version", "-v", type=int, help="A flow version to run.", hidden=True) +@click.option( + "--parameters", + help="A parameters JSON file.", + hidden=True, + type=click.Path(exists=True), +) @click.option( "--watch", "-w", @@ -55,17 +62,18 @@ def run(): @click.option( "--logs", "-l", is_flag=True, help="Live logs of the flow run.", hidden=True ) -def cloud(name, project, version, watch, logs): +def cloud(name, project, version, parameters, watch, logs): """ Run a deployed flow in Prefect Cloud. \b Options: - --name, -n TEXT The name of a flow to run [required] - --project, -p TEXT The name of a project that contains the flow [required] - --version, -v INTEGER A flow version to run - --watch, -w Watch current state of the flow run, stream output to stdout - --logs, -l Get logs of the flow run, stream output to stdout + --name, -n TEXT The name of a flow to run [required] + --project, -p TEXT The name of a project that contains the flow [required] + --version, -v INTEGER A flow version to run + --parameters FILE PATH A filepath of a JSON file containing parameters + --watch, -w Watch current state of the flow run, stream output to stdout + --logs, -l Get logs of the flow run, stream output to stdout """ if watch and logs: @@ -107,7 +115,11 @@ def cloud(name, project, version, watch, logs): click.secho("{} not found".format(name), fg="red") return - flow_run_id = client.create_flow_run(flow_id=flow_id) + loaded_params = None + if parameters: + loaded_params = json.load(open(parameters)) + + flow_run_id = client.create_flow_run(flow_id=flow_id, parameters=loaded_params) click.echo("Flow Run ID: {}".format(flow_run_id)) if watch: From 2587cc2735f5b81541926b1c4ea78ec6a44b1e16 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 1 Oct 2019 14:20:56 -0400 Subject: [PATCH 2/4] Add tests for run cloud parameters file arg --- tests/cli/test_run.py | 86 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 77 insertions(+), 9 deletions(-) diff --git a/tests/cli/test_run.py b/tests/cli/test_run.py index 2211d95daea0..66c9761c452a 100644 --- a/tests/cli/test_run.py +++ b/tests/cli/test_run.py @@ -1,4 +1,7 @@ +import json +import os import sys +import tempfile from unittest.mock import MagicMock import click @@ -38,7 +41,7 @@ def test_run_cloud(monkeypatch): session.return_value.post = post monkeypatch.setattr("requests.Session", session) - create_flow_run = MagicMock(resurn_value="id") + create_flow_run = MagicMock(return_value="id") monkeypatch.setattr( "prefect.client.Client.create_flow_run", MagicMock(return_value=create_flow_run) ) @@ -65,9 +68,6 @@ def test_run_cloud(monkeypatch): assert post.call_args[1]["json"]["query"].split() == query.split() -@pytest.mark.skipif( - sys.version_info < (3, 6), reason="3.5 does not preserve dictionary order" -) def test_run_cloud_watch(monkeypatch): post = MagicMock( return_value=MagicMock( @@ -90,7 +90,7 @@ def test_run_cloud_watch(monkeypatch): session.return_value.post = post monkeypatch.setattr("requests.Session", session) - create_flow_run = MagicMock(resurn_value="id") + create_flow_run = MagicMock(return_value="id") monkeypatch.setattr( "prefect.client.Client.create_flow_run", MagicMock(return_value=create_flow_run) ) @@ -118,9 +118,6 @@ def test_run_cloud_watch(monkeypatch): assert post.called -@pytest.mark.skipif( - sys.version_info < (3, 6), reason="3.5 does not preserve dictionary order" -) def test_run_cloud_logs(monkeypatch): post = MagicMock( return_value=MagicMock( @@ -149,7 +146,7 @@ def test_run_cloud_logs(monkeypatch): session.return_value.post = post monkeypatch.setattr("requests.Session", session) - create_flow_run = MagicMock(resurn_value="id") + create_flow_run = MagicMock(return_value="id") monkeypatch.setattr( "prefect.client.Client.create_flow_run", MagicMock(return_value=create_flow_run) ) @@ -195,3 +192,74 @@ def test_run_cloud_fails(monkeypatch): ) assert result.exit_code == 0 assert "flow not found" in result.output + + +def test_run_cloud_no_param_file(monkeypatch): + with set_temporary_config( + {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} + ): + runner = CliRunner() + result = runner.invoke( + run, + [ + "cloud", + "--name", + "flow", + "--project", + "project", + "--version", + "2", + "--parameters", + "no_file.json", + ], + ) + assert result.exit_code == 2 + assert ( + 'Invalid value for "--parameters": Path "no_file.json" does not exist.' + in result.output + ) + + +def test_run_cloud_param_file(monkeypatch): + post = MagicMock( + return_value=MagicMock( + json=MagicMock(return_value=dict(data=dict(flow=[{"id": "flow"}]))) + ) + ) + session = MagicMock() + session.return_value.post = post + monkeypatch.setattr("requests.Session", session) + + mock_client = MagicMock() + mock_client.create_flow_run.return_value = "id" + monkeypatch.setattr("prefect.cli.run.Client", MagicMock(return_value=mock_client)) + + with tempfile.TemporaryDirectory() as directory: + file_path = os.path.join(directory, "file.json") + with open(file_path, "w") as tmp: + json.dump({"test": 42}, tmp) + + with set_temporary_config( + {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} + ): + runner = CliRunner() + result = runner.invoke( + run, + [ + "cloud", + "--name", + "flow", + "--project", + "project", + "--version", + "2", + "--parameters", + file_path, + ], + ) + assert result.exit_code == 0 + assert "Flow Run ID" in result.output + assert mock_client.create_flow_run.called + assert mock_client.create_flow_run.call_args[1]["parameters"] == { + "test": 42 + } From 7bce1705707dd13ac964ef3573babe9372f2ea31 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Tue, 1 Oct 2019 14:25:25 -0400 Subject: [PATCH 3/4] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 03de01eb5ca2..1135593a0818 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ - Replace `DotDict` with `box.Box` - [#1518](https://github.com/PrefectHQ/prefect/pull/1518) - Store `cached_inputs` on Failed states and call their result handlers if they were provided - [#1557](https://github.com/PrefectHQ/prefect/pull/1557) - `raise_on_exception` no longer raises for Prefect Signals, as these are typically intentional / for control flow - [#1562](https://github.com/PrefectHQ/prefect/pull/1562) +- `run cloud` CLI command takes in optional `--parameters` as a file path pointing to a JSON file - [#1582](https://github.com/PrefectHQ/prefect/pull/1582) ### Task Library From b5a29d2960aa2914a9a05ab8b583dab56ea7997a Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Wed, 2 Oct 2019 11:11:36 -0400 Subject: [PATCH 4/4] Separate out paramaters from file and from string in run CLI command --- src/prefect/cli/run.py | 48 ++++++++++++++++------ tests/cli/test_run.py | 91 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 124 insertions(+), 15 deletions(-) diff --git a/src/prefect/cli/run.py b/src/prefect/cli/run.py index 5d991a6c260f..1979f9fcf42a 100644 --- a/src/prefect/cli/run.py +++ b/src/prefect/cli/run.py @@ -47,11 +47,15 @@ def run(): ) @click.option("--version", "-v", type=int, help="A flow version to run.", hidden=True) @click.option( - "--parameters", + "--parameters-file", + "-pf", help="A parameters JSON file.", hidden=True, type=click.Path(exists=True), ) +@click.option( + "--parameters-string", "--ps", help="A parameters JSON string.", hidden=True +) @click.option( "--watch", "-w", @@ -62,18 +66,29 @@ def run(): @click.option( "--logs", "-l", is_flag=True, help="Live logs of the flow run.", hidden=True ) -def cloud(name, project, version, parameters, watch, logs): +def cloud(name, project, version, parameters_file, parameters_string, watch, logs): """ Run a deployed flow in Prefect Cloud. \b Options: - --name, -n TEXT The name of a flow to run [required] - --project, -p TEXT The name of a project that contains the flow [required] - --version, -v INTEGER A flow version to run - --parameters FILE PATH A filepath of a JSON file containing parameters - --watch, -w Watch current state of the flow run, stream output to stdout - --logs, -l Get logs of the flow run, stream output to stdout + --name, -n TEXT The name of a flow to run [required] + --project, -p TEXT The name of a project that contains the flow [required] + --version, -v INTEGER A flow version to run + --parameters-file, -pf FILE PATH A filepath of a JSON file containing parameters + --parameters-string, -ps TEXT A string of JSON parameters + --watch, -w Watch current state of the flow run, stream output to stdout + --logs, -l Get logs of the flow run, stream output to stdout + + \b + If both `--parameters-file` and `--parameters-string` are provided then the values passed + in through the string will override the values provided from the file. + + \b + e.g. + File contains: {"a": 1, "b": 2} + String: '{"a": 3}' + Parameters passed to the flow run: {"a": 3, "b": 2} """ if watch and logs: @@ -115,11 +130,20 @@ def cloud(name, project, version, parameters, watch, logs): click.secho("{} not found".format(name), fg="red") return - loaded_params = None - if parameters: - loaded_params = json.load(open(parameters)) + # Load parameters from file if provided + file_params = {} + if parameters_file: + with open(parameters_file) as params_file: + file_params = json.load(params_file) + + # Load parameters from string if provided + string_params = {} + if parameters_string: + string_params = json.loads(parameters_string) - flow_run_id = client.create_flow_run(flow_id=flow_id, parameters=loaded_params) + flow_run_id = client.create_flow_run( + flow_id=flow_id, parameters={**file_params, **string_params} + ) click.echo("Flow Run ID: {}".format(flow_run_id)) if watch: diff --git a/tests/cli/test_run.py b/tests/cli/test_run.py index 66c9761c452a..fb01869e587c 100644 --- a/tests/cli/test_run.py +++ b/tests/cli/test_run.py @@ -209,13 +209,13 @@ def test_run_cloud_no_param_file(monkeypatch): "project", "--version", "2", - "--parameters", + "--parameters-file", "no_file.json", ], ) assert result.exit_code == 2 assert ( - 'Invalid value for "--parameters": Path "no_file.json" does not exist.' + 'Invalid value for "--parameters-file" / "-pf": Path "no_file.json" does not exist.' in result.output ) @@ -253,7 +253,7 @@ def test_run_cloud_param_file(monkeypatch): "project", "--version", "2", - "--parameters", + "--parameters-file", file_path, ], ) @@ -263,3 +263,88 @@ def test_run_cloud_param_file(monkeypatch): assert mock_client.create_flow_run.call_args[1]["parameters"] == { "test": 42 } + + +def test_run_cloud_param_string(monkeypatch): + post = MagicMock( + return_value=MagicMock( + json=MagicMock(return_value=dict(data=dict(flow=[{"id": "flow"}]))) + ) + ) + session = MagicMock() + session.return_value.post = post + monkeypatch.setattr("requests.Session", session) + + mock_client = MagicMock() + mock_client.create_flow_run.return_value = "id" + monkeypatch.setattr("prefect.cli.run.Client", MagicMock(return_value=mock_client)) + + with set_temporary_config( + {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} + ): + runner = CliRunner() + result = runner.invoke( + run, + [ + "cloud", + "--name", + "flow", + "--project", + "project", + "--version", + "2", + "--parameters-string", + '{"test": 42}', + ], + ) + assert result.exit_code == 0 + assert "Flow Run ID" in result.output + assert mock_client.create_flow_run.called + assert mock_client.create_flow_run.call_args[1]["parameters"] == {"test": 42} + + +def test_run_cloud_param_string_overwrites(monkeypatch): + post = MagicMock( + return_value=MagicMock( + json=MagicMock(return_value=dict(data=dict(flow=[{"id": "flow"}]))) + ) + ) + session = MagicMock() + session.return_value.post = post + monkeypatch.setattr("requests.Session", session) + + mock_client = MagicMock() + mock_client.create_flow_run.return_value = "id" + monkeypatch.setattr("prefect.cli.run.Client", MagicMock(return_value=mock_client)) + + with tempfile.TemporaryDirectory() as directory: + file_path = os.path.join(directory, "file.json") + with open(file_path, "w") as tmp: + json.dump({"test": 42}, tmp) + + with set_temporary_config( + {"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"} + ): + runner = CliRunner() + result = runner.invoke( + run, + [ + "cloud", + "--name", + "flow", + "--project", + "project", + "--version", + "2", + "--parameters-file", + file_path, + "--parameters-string", + '{"test": 43}', + ], + ) + assert result.exit_code == 0 + assert "Flow Run ID" in result.output + assert mock_client.create_flow_run.called + assert mock_client.create_flow_run.call_args[1]["parameters"] == { + "test": 43 + }