Skip to content

Commit

Permalink
Merge pull request #1582 from PrefectHQ/cli_params
Browse files Browse the repository at this point in the history
Add Parameter File Option to `run cloud` CLI command
  • Loading branch information
joshmeek committed Oct 2, 2019
2 parents f91fff6 + 9c0497a commit 6f2c2df
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Replace `DotDict` with `box.Box` - [#1518](https://github.com/PrefectHQ/prefect/pull/1518)
- Store `cached_inputs` on Failed states and call their result handlers if they were provided - [#1557](https://github.com/PrefectHQ/prefect/pull/1557)
- `raise_on_exception` no longer raises for Prefect Signals, as these are typically intentional / for control flow - [#1562](https://github.com/PrefectHQ/prefect/pull/1562)
- `run cloud` CLI command takes in optional `--parameters` as a file path pointing to a JSON file - [#1582](https://github.com/PrefectHQ/prefect/pull/1582)
- Always consider `Constant` tasks successful and unpack them immediately instead of submitting them for execution - [#1527](https://github.com/PrefectHQ/prefect/issues/1527)

### Task Library
Expand Down
50 changes: 43 additions & 7 deletions src/prefect/cli/run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import time

import click
Expand Down Expand Up @@ -45,6 +46,16 @@ def run():
hidden=True,
)
@click.option("--version", "-v", type=int, help="A flow version to run.", hidden=True)
@click.option(
"--parameters-file",
"-pf",
help="A parameters JSON file.",
hidden=True,
type=click.Path(exists=True),
)
@click.option(
"--parameters-string", "--ps", help="A parameters JSON string.", hidden=True
)
@click.option(
"--watch",
"-w",
Expand All @@ -55,17 +66,29 @@ def run():
@click.option(
"--logs", "-l", is_flag=True, help="Live logs of the flow run.", hidden=True
)
def cloud(name, project, version, watch, logs):
def cloud(name, project, version, parameters_file, parameters_string, watch, logs):
"""
Run a deployed flow in Prefect Cloud.
\b
Options:
--name, -n TEXT The name of a flow to run [required]
--project, -p TEXT The name of a project that contains the flow [required]
--version, -v INTEGER A flow version to run
--watch, -w Watch current state of the flow run, stream output to stdout
--logs, -l Get logs of the flow run, stream output to stdout
--name, -n TEXT The name of a flow to run [required]
--project, -p TEXT The name of a project that contains the flow [required]
--version, -v INTEGER A flow version to run
--parameters-file, -pf FILE PATH A filepath of a JSON file containing parameters
--parameters-string, -ps TEXT A string of JSON parameters
--watch, -w Watch current state of the flow run, stream output to stdout
--logs, -l Get logs of the flow run, stream output to stdout
\b
If both `--parameters-file` and `--parameters-string` are provided then the values passed
in through the string will override the values provided from the file.
\b
e.g.
File contains: {"a": 1, "b": 2}
String: '{"a": 3}'
Parameters passed to the flow run: {"a": 3, "b": 2}
"""

if watch and logs:
Expand Down Expand Up @@ -107,7 +130,20 @@ def cloud(name, project, version, watch, logs):
click.secho("{} not found".format(name), fg="red")
return

flow_run_id = client.create_flow_run(flow_id=flow_id)
# Load parameters from file if provided
file_params = {}
if parameters_file:
with open(parameters_file) as params_file:
file_params = json.load(params_file)

# Load parameters from string if provided
string_params = {}
if parameters_string:
string_params = json.loads(parameters_string)

flow_run_id = client.create_flow_run(
flow_id=flow_id, parameters={**file_params, **string_params}
)
click.echo("Flow Run ID: {}".format(flow_run_id))

if watch:
Expand Down
171 changes: 162 additions & 9 deletions tests/cli/test_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import json
import os
import sys
import tempfile
from unittest.mock import MagicMock

import click
Expand Down Expand Up @@ -38,7 +41,7 @@ def test_run_cloud(monkeypatch):
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

create_flow_run = MagicMock(resurn_value="id")
create_flow_run = MagicMock(return_value="id")
monkeypatch.setattr(
"prefect.client.Client.create_flow_run", MagicMock(return_value=create_flow_run)
)
Expand All @@ -65,9 +68,6 @@ def test_run_cloud(monkeypatch):
assert post.call_args[1]["json"]["query"].split() == query.split()


@pytest.mark.skipif(
sys.version_info < (3, 6), reason="3.5 does not preserve dictionary order"
)
def test_run_cloud_watch(monkeypatch):
post = MagicMock(
return_value=MagicMock(
Expand All @@ -90,7 +90,7 @@ def test_run_cloud_watch(monkeypatch):
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

create_flow_run = MagicMock(resurn_value="id")
create_flow_run = MagicMock(return_value="id")
monkeypatch.setattr(
"prefect.client.Client.create_flow_run", MagicMock(return_value=create_flow_run)
)
Expand Down Expand Up @@ -118,9 +118,6 @@ def test_run_cloud_watch(monkeypatch):
assert post.called


@pytest.mark.skipif(
sys.version_info < (3, 6), reason="3.5 does not preserve dictionary order"
)
def test_run_cloud_logs(monkeypatch):
post = MagicMock(
return_value=MagicMock(
Expand Down Expand Up @@ -149,7 +146,7 @@ def test_run_cloud_logs(monkeypatch):
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

create_flow_run = MagicMock(resurn_value="id")
create_flow_run = MagicMock(return_value="id")
monkeypatch.setattr(
"prefect.client.Client.create_flow_run", MagicMock(return_value=create_flow_run)
)
Expand Down Expand Up @@ -195,3 +192,159 @@ def test_run_cloud_fails(monkeypatch):
)
assert result.exit_code == 0
assert "flow not found" in result.output


def test_run_cloud_no_param_file(monkeypatch):
with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
runner = CliRunner()
result = runner.invoke(
run,
[
"cloud",
"--name",
"flow",
"--project",
"project",
"--version",
"2",
"--parameters-file",
"no_file.json",
],
)
assert result.exit_code == 2
assert (
'Invalid value for "--parameters-file" / "-pf": Path "no_file.json" does not exist.'
in result.output
)


def test_run_cloud_param_file(monkeypatch):
post = MagicMock(
return_value=MagicMock(
json=MagicMock(return_value=dict(data=dict(flow=[{"id": "flow"}])))
)
)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

mock_client = MagicMock()
mock_client.create_flow_run.return_value = "id"
monkeypatch.setattr("prefect.cli.run.Client", MagicMock(return_value=mock_client))

with tempfile.TemporaryDirectory() as directory:
file_path = os.path.join(directory, "file.json")
with open(file_path, "w") as tmp:
json.dump({"test": 42}, tmp)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
runner = CliRunner()
result = runner.invoke(
run,
[
"cloud",
"--name",
"flow",
"--project",
"project",
"--version",
"2",
"--parameters-file",
file_path,
],
)
assert result.exit_code == 0
assert "Flow Run ID" in result.output
assert mock_client.create_flow_run.called
assert mock_client.create_flow_run.call_args[1]["parameters"] == {
"test": 42
}


def test_run_cloud_param_string(monkeypatch):
post = MagicMock(
return_value=MagicMock(
json=MagicMock(return_value=dict(data=dict(flow=[{"id": "flow"}])))
)
)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

mock_client = MagicMock()
mock_client.create_flow_run.return_value = "id"
monkeypatch.setattr("prefect.cli.run.Client", MagicMock(return_value=mock_client))

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
runner = CliRunner()
result = runner.invoke(
run,
[
"cloud",
"--name",
"flow",
"--project",
"project",
"--version",
"2",
"--parameters-string",
'{"test": 42}',
],
)
assert result.exit_code == 0
assert "Flow Run ID" in result.output
assert mock_client.create_flow_run.called
assert mock_client.create_flow_run.call_args[1]["parameters"] == {"test": 42}


def test_run_cloud_param_string_overwrites(monkeypatch):
post = MagicMock(
return_value=MagicMock(
json=MagicMock(return_value=dict(data=dict(flow=[{"id": "flow"}])))
)
)
session = MagicMock()
session.return_value.post = post
monkeypatch.setattr("requests.Session", session)

mock_client = MagicMock()
mock_client.create_flow_run.return_value = "id"
monkeypatch.setattr("prefect.cli.run.Client", MagicMock(return_value=mock_client))

with tempfile.TemporaryDirectory() as directory:
file_path = os.path.join(directory, "file.json")
with open(file_path, "w") as tmp:
json.dump({"test": 42}, tmp)

with set_temporary_config(
{"cloud.graphql": "http://my-cloud.foo", "cloud.auth_token": "secret_token"}
):
runner = CliRunner()
result = runner.invoke(
run,
[
"cloud",
"--name",
"flow",
"--project",
"project",
"--version",
"2",
"--parameters-file",
file_path,
"--parameters-string",
'{"test": 43}',
],
)
assert result.exit_code == 0
assert "Flow Run ID" in result.output
assert mock_client.create_flow_run.called
assert mock_client.create_flow_run.call_args[1]["parameters"] == {
"test": 43
}

0 comments on commit 6f2c2df

Please sign in to comment.