-
Notifications
You must be signed in to change notification settings - Fork 20
/
api_query.py
166 lines (143 loc) · 4.74 KB
/
api_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Union, Optional
from flowclient.client import (
FlowclientConnectionError,
Connection,
run_query,
get_status,
get_result_by_query_id,
get_geojson_result_by_query_id,
wait_for_query_to_be_ready,
)
class APIQuery:
"""
Representation of a FlowKit query.
Parameters
----------
connection : Connection
Connection to FlowKit server on which to run this query
parameters : dict
Parameters that specify the query
Attributes
----------
parameters
connection
status
"""
def __init__(self, *, connection: Connection, parameters: dict):
self._connection = connection
self.parameters = dict(parameters)
def __repr__(self) -> str:
return f"<{self.__class__.__module__}.{self.__class__.__name__} object, connection={self.connection}, parameters={self.parameters}>"
def run(self) -> None:
"""
Set this query running in FlowKit
Raises
------
FlowclientConnectionError
if the query cannot be set running
"""
self._query_id = run_query(
connection=self.connection, query_spec=self.parameters
)
# TODO: Return a future?
@property
def connection(self) -> Connection:
"""
Connection that is used for running this query.
Returns
-------
Connection
Connection to FlowKit API
"""
return self._connection
@property
def status(self) -> str:
"""
Status of this query.
Returns
-------
str
One of:
- "not_running"
- "queued"
- "executing"
- "completed"
"""
if not hasattr(self, "_query_id"):
return "not_running"
return get_status(connection=self.connection, query_id=self._query_id)
def get_result(
self,
format: str = "pandas",
poll_interval: int = 1,
disable_progress: Optional[bool] = None,
) -> Union["pandas.DataFrame", dict]:
"""
Get the result of this query, as a pandas DataFrame or GeoJSON dict.
Parameters
----------
format : str, default 'pandas'
Result format. One of {'pandas', 'geojson'}
poll_interval : int, default 1
Number of seconds to wait between checks for the query being ready
disable_progress : bool, default None
Set to True to disable progress bar display entirely, None to disable on
non-TTY, or False to always enable
Returns
-------
pandas.DataFrame or dict
Query result
"""
if format == "pandas":
result_getter = get_result_by_query_id
elif format == "geojson":
result_getter = get_geojson_result_by_query_id
else:
raise ValueError(
f"Invalid format: '{format}'. Expected one of {{'pandas', 'geojson'}}."
)
# TODO: Cache result internally?
try:
return result_getter(
connection=self.connection,
query_id=self._query_id,
poll_interval=poll_interval,
disable_progress=disable_progress,
)
except (AttributeError, FileNotFoundError):
# TODO: Warn before running?
self.run()
return result_getter(
connection=self.connection,
query_id=self._query_id,
poll_interval=poll_interval,
disable_progress=disable_progress,
)
def wait_until_ready(
self, poll_interval: int = 1, disable_progress: Optional[bool] = None
) -> None:
"""
Wait until this query has finished running.
Parameters
----------
poll_interval : int, default 1
Number of seconds to wait between checks for the query being ready
disable_progress : bool, default None
Set to True to disable progress bar display entirely, None to disable on
non-TTY, or False to always enable
Raises
------
FlowclientConnectionError
if query is not running or has errored
"""
if not hasattr(self, "_query_id"):
raise FileNotFoundError("Query is not running.")
wait_for_query_to_be_ready(
connection=self.connection,
query_id=self._query_id,
poll_interval=poll_interval,
disable_progress=disable_progress,
)