Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use explicit keywords for parameters and variables #1259

Merged
merged 4 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Breaking Changes

- None
- `prefect.Client.graphql()` and `prefect.Client.post()` now use an explicit keyword, not `**kwargs`, for variables or parameters - [#1259](https://github.com/PrefectHQ/prefect/pull/1259)

### Contributors

Expand All @@ -48,7 +48,7 @@ Released July 16, 2019
- Have `execute cloud-flow` CLI immediately set the flow run state to `Failed` if environment fails - [#1122](https://github.com/PrefectHQ/prefect/pull/1122)
- Validate configuration objects on initial load - [#1136](https://github.com/PrefectHQ/prefect/pull/1136)
- Add `auto_generated` property to Tasks for convenient filtering - [#1135](https://github.com/PrefectHQ/prefect/pull/1135)
- Disable dask work-stealing in kubernetes via scheduler config - [#1166](https://github.com/PrefectHQ/prefect/pull/1166)
- Disable dask work-stealing in Kubernetes via scheduler config - [#1166](https://github.com/PrefectHQ/prefect/pull/1166)
- Implement backoff retry settings on Client calls - [#1187](https://github.com/PrefectHQ/prefect/pull/1187)
- Explicitly set Dask keys for a better Dask visualization experience - [#1218](https://github.com/PrefectHQ/prefect/issues/1218)
- Implement a local cache which persists for the duration of a Python session - [#1221](https://github.com/PrefectHQ/prefect/issues/1221)
Expand Down Expand Up @@ -103,7 +103,7 @@ Released May 28, 2019
- Allow for `SlackTask` to pull the Slack webhook URL from a custom named Secret - [#1023](https://github.com/PrefectHQ/prefect/pull/1023)
- Raise informative errors when Docker storage push / pull fails - [#1029](https://github.com/PrefectHQ/prefect/issues/1029)
- Standardized `__repr__`s for various classes, to remove inconsistencies - [#617](https://github.com/PrefectHQ/prefect/issues/617)
- Allow for use of local images in Docekr storage - [#1052](https://github.com/PrefectHQ/prefect/pull/1052)
- Allow for use of local images in Docker storage - [#1052](https://github.com/PrefectHQ/prefect/pull/1052)
- Allow for doc tests and doc generation to run without installing `all_extras` - [#1057](https://github.com/PrefectHQ/prefect/issues/1057)

### Task Library
Expand Down
76 changes: 44 additions & 32 deletions src/prefect/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union

import pendulum
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

Expand All @@ -21,9 +22,8 @@
)

if TYPE_CHECKING:
import requests
from prefect.core import Flow
BuiltIn = Union[bool, dict, list, str, set, tuple]
JSONLike = Union[bool, dict, list, str, int, float, None]

# type definitions for GraphQL results

Expand Down Expand Up @@ -93,7 +93,11 @@ def local_token_path(self) -> str:
# Utilities

def get(
self, path: str, server: str = None, headers: dict = None, **params: BuiltIn
self,
path: str,
server: str = None,
headers: dict = None,
params: Dict[str, JSONLike] = None,
) -> dict:
"""
Convenience function for calling the Prefect API with token auth and GET request
Expand All @@ -104,7 +108,7 @@ def get(
- server (str, optional): the server to send the GET request to;
defaults to `self.graphql_server`
- headers (dict, optional): Headers to pass with the request
- **params (dict): GET parameters
- params (dict): GET parameters

Returns:
- dict: Dictionary representation of the request made
Expand All @@ -118,7 +122,11 @@ def get(
return {}

def post(
self, path: str, server: str = None, headers: dict = None, **params: BuiltIn
self,
path: str,
server: str = None,
headers: dict = None,
params: Dict[str, JSONLike] = None,
) -> dict:
"""
Convenience function for calling the Prefect API with token auth and POST request
Expand All @@ -129,7 +137,7 @@ def post(
- server (str, optional): the server to send the POST request to;
defaults to `self.graphql_server`
- headers(dict): headers to pass with the request
- **params (dict): POST parameters
- params (dict): POST parameters

Returns:
- dict: Dictionary representation of the request made
Expand All @@ -146,8 +154,8 @@ def graphql(
self,
query: Any,
raise_on_error: bool = True,
headers: dict = None,
**variables: Union[bool, dict, str, int]
headers: Dict[str, str] = None,
variables: Dict[str, JSONLike] = None,
) -> GraphQLResult:
"""
Convenience function for running queries against the Prefect GraphQL API
Expand All @@ -159,7 +167,7 @@ def graphql(
returns any `errors`.
- headers (dict): any additional headers that should be passed as part of the
request
- **variables (kwarg): Variables to be filled into a query with the key being
- variables (dict): Variables to be filled into a query with the key being
equivalent to the variables that are accepted by the query

Returns:
Expand All @@ -170,10 +178,9 @@ def graphql(
"""
result = self.post(
path="",
query=parse_graphql(query),
variables=json.dumps(variables),
server=self.graphql_server,
headers=headers,
params=dict(query=parse_graphql(query), variables=json.dumps(variables)),
)

if raise_on_error and "errors" in result:
Expand All @@ -185,7 +192,7 @@ def _request(
self,
method: str,
path: str,
params: dict = None,
params: Dict[str, JSONLike] = None,
server: str = None,
headers: dict = None,
) -> "requests.models.Response":
Expand All @@ -208,9 +215,6 @@ def _request(
- ValueError: if a method is specified outside of the accepted GET, POST, DELETE
- requests.HTTPError: if a status code is returned that is not `200` or `401`
"""
# lazy import for performance
import requests

if server is None:
server = self.graphql_server
assert isinstance(server, str) # mypy assert
Expand Down Expand Up @@ -346,10 +350,12 @@ def deploy(
serialized_flow = compress(serialized_flow)
res = self.graphql(
create_mutation,
input=dict(
projectId=project[0].id,
serializedFlow=serialized_flow,
setScheduleActive=set_schedule_active,
variables=dict(
input=dict(
projectId=project[0].id,
serializedFlow=serialized_flow,
setScheduleActive=set_schedule_active,
)
),
) # type: Any

Expand Down Expand Up @@ -379,7 +385,9 @@ def create_project(self, project_name: str) -> str:
}
}

res = self.graphql(project_mutation, input=dict(name=project_name)) # type: Any
res = self.graphql(
project_mutation, variables=dict(input=dict(name=project_name))
) # type: Any

return res.data.createProject.id

Expand Down Expand Up @@ -427,7 +435,7 @@ def create_flow_run(
inputs.update(
scheduledStartTime=scheduled_start_time.isoformat()
) # type: ignore
res = self.graphql(create_mutation, input=inputs)
res = self.graphql(create_mutation, variables=dict(input=inputs))
return res.data.createFlowRun.flow_run.id # type: ignore

def get_flow_run_info(self, flow_run_id: str) -> FlowRunInfoResult:
Expand Down Expand Up @@ -562,7 +570,7 @@ def set_flow_run_state(

serialized_state = state.serialize()

self.graphql(mutation, state=serialized_state) # type: Any
self.graphql(mutation, variables=dict(state=serialized_state)) # type: Any

def get_latest_cached_states(
self, task_id: str, cache_key: Optional[str], created_after: datetime.datetime
Expand Down Expand Up @@ -686,7 +694,7 @@ def set_task_run_state(

serialized_state = state.serialize()

self.graphql(mutation, state=serialized_state) # type: Any
self.graphql(mutation, variables=dict(state=serialized_state)) # type: Any

def set_secret(self, name: str, value: Any) -> None:
"""
Expand All @@ -707,7 +715,9 @@ def set_secret(self, name: str, value: Any) -> None:
}
}

result = self.graphql(mutation, input=dict(name=name, value=value)) # type: Any
result = self.graphql(
mutation, variables=dict(input=dict(name=name, value=value))
) # type: Any

if not result.data.setSecret.success:
raise ValueError("Setting secret failed.")
Expand Down Expand Up @@ -749,14 +759,16 @@ def write_run_log(
timestamp_str = pendulum.instance(timestamp).isoformat()
result = self.graphql(
mutation,
input=dict(
flowRunId=flow_run_id,
taskRunId=task_run_id,
timestamp=timestamp_str,
name=name,
message=message,
level=level,
info=info,
variables=dict(
input=dict(
flowRunId=flow_run_id,
taskRunId=task_run_id,
timestamp=timestamp_str,
name=name,
message=message,
level=level,
info=info,
)
),
) # type: Any

Expand Down
2 changes: 1 addition & 1 deletion src/prefect/client/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ def get(self) -> Optional[Any]:
secretValue(name: $name)
}
""",
name=self.name,
variables=dict(name=self.name),
) # type: Any
return as_nested_dict(result.data.secretValue, dict)