/
client.py
2217 lines (1889 loc) · 79.3 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import datetime
import json
import os
import random
import re
import time
import uuid
import warnings
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
NamedTuple,
Optional,
Union,
cast,
Mapping,
)
from urllib.parse import urljoin, urlparse
# if simplejson is installed, `requests` defaults to using it instead of json
# this allows the client to gracefully handle either json or simplejson
try:
from simplejson.errors import JSONDecodeError
except ImportError:
from json.decoder import JSONDecodeError
import pendulum
import toml
from slugify import slugify
import prefect
from prefect.run_configs import RunConfig
from prefect.exceptions import (
AuthorizationError,
ClientError,
VersionLockMismatchSignal,
)
from prefect.utilities.graphql import (
EnumValue,
GraphQLResult,
compress,
parse_graphql,
with_args,
format_graphql_request_error,
)
from prefect.utilities.logging import create_diagnostic_logger
if TYPE_CHECKING:
from prefect.core import Flow
import requests
JSONLike = Union[bool, dict, list, str, int, float, None]
# type definitions for GraphQL results
class TaskRunInfoResult(NamedTuple):
# TODO: Deprecate this result in favor of `prefect.backend.TaskRun`
id: str
task_id: str
task_slug: str
version: int
state: "prefect.engine.state.State"
class ProjectInfo(NamedTuple):
id: str
name: str
class FlowRunInfoResult(NamedTuple):
# TODO: Deprecate this result in favor of `prefect.backend.FlowRun`
id: str
name: str
flow_id: str
project: ProjectInfo
parameters: Dict[str, Any]
context: Dict[str, Any]
version: int
scheduled_start_time: datetime.datetime
state: "prefect.engine.state.State"
task_runs: List[TaskRunInfoResult]
class Client:
"""
Client for communication with Prefect Cloud
If the arguments aren't specified the client initialization first checks the prefect
configuration and if the server is not set there it checks the current context. The
token will only be present in the current context.
Args:
- api_server (str, optional): the URL to send all GraphQL requests to; if not
provided, will be pulled from the current backend's `api` config
variable
- api_key (str, optional): a Prefect Cloud API key. If not provided, loaded
from `config.cloud.api_key` or from the on disk cache from the
`prefect auth` CLI
- tenant_id (str, optional): the Prefect tenant to use. If not provided, loaded
from `config.cloud.tenant_id` or the on disk cache from the
`prefect auth` CLI
- api_token (str, optional): a Prefect Cloud API token, taken from
`config.cloud.auth_token` if not provided. If this token is USER-scoped, it may
be used to log in to any tenant that the user is a member of. In that case,
ephemeral JWTs will be loaded as necessary. Otherwise, the API token itself
will be used as authorization. DEPRECATED; use `api_key` instead.
"""
def __init__(
self,
api_server: str = None,
api_key: str = None,
tenant_id: str = None,
api_token: str = None,
):
self._access_token = None
self._refresh_token = None
self._access_token_expires_at = pendulum.now()
self._attached_headers = {} # type: Dict[str, str]
self.logger = create_diagnostic_logger("Diagnostics")
# Hard-code the auth filepath location
self._auth_file = Path(prefect.context.config.home_dir).absolute() / "auth.toml"
# Note the default is `cloud.api` which is `cloud.endpoint` or `server.endpoint`
# depending on the value of the `backend` key
# This must be set before `load_auth_from_disk()` can be called but if no API
# key is found this will default to a different value for backwards compat
self.api_server = api_server or prefect.context.config.cloud.api
# Load the API key
cached_auth = self.load_auth_from_disk()
self.api_key: Optional[str] = (
api_key
or prefect.context.config.cloud.get("api_key")
or cached_auth.get("api_key")
)
# Load the tenant id
self._tenant_id: Optional[str] = (
tenant_id
or prefect.context.config.cloud.get("tenant_id")
or cached_auth.get("tenant_id")
)
# If not set at this point, when `Client.tenant_id` is accessed the default
# tenant will be loaded and used for future requests.
# Backwards compatibility for API tokens ---------------------------------------
self._api_token = api_token or prefect.context.config.cloud.get("auth_token")
if (
not self.api_key
and not api_server
and prefect.context.config.backend == "cloud"
):
# The default value for the `api_server` changed for API keys but we want
# to load API tokens from the correct backwards-compatible location on disk
self.api_server = prefect.config.cloud.graphql
if (
not self.api_key
and not self._api_token
and prefect.config.backend == "cloud"
):
# If not using an API key and a token has not been passed or set in the
# config, attempt to load an API token from disk
self._init_tenant()
if self._api_token and not self.api_key:
warnings.warn(
"Client was created with an API token configured for authentication. "
"API tokens are deprecated, please use API keys instead."
)
# Warn if using both a token and API key, but only if they have different values
# because we pass the api key as an api token in some places for backwards
# compatibility
if self._api_token and self.api_key and self._api_token != self.api_key:
warnings.warn(
"Found both an API token and an API key. API tokens have been "
"deprecated and it will be ignored in favor of the API key."
+ (
# If they did not pass one explicitly, we can tell them how to fix
# this in the config
" Remove the token from the config at `prefect.config.auth_token`"
if not api_token
else ""
)
)
# API key authentication -----------------------------------------------------------
def _get_auth_tenant(self) -> str:
"""
Get the current tenant associated with the API key being used. If the client has
a specific tenant id set, this will verify that the given tenant id is
compatible with the API key because the tenant will be attached to the request.
"""
if not prefect.config.backend == "cloud":
raise ValueError(
"Authentication is only supported for Prefect Cloud. "
"Your backend is set to {prefect.config.backend!r}"
)
if not self.api_key:
raise ValueError("You have not set an API key for authentication.")
response = self.graphql({"query": {"auth_info": "tenant_id"}})
tenant_id = response.get("data", {}).get("auth_info", {}).get("tenant_id", "")
if tenant_id == "":
raise ClientError(
"Unexpected response from the API while querying for the default "
f"tenant: {response}"
)
elif tenant_id is None:
# If the backend returns a `None` value tenant id, it indicates that an API
# token was passed in as an API key
raise AuthorizationError(
"An API token was used as an API key. There is no tenant associated "
"with API tokens. Use an API key for authentication."
)
return tenant_id
def _get_default_server_tenant(self) -> str:
if prefect.config.backend == "server":
response = self.graphql({"query": {"tenant": {"id"}}})
tenants = response.get("data", {}).get("tenant", None)
if tenants is None:
raise ClientError(
f"Unexpected response from the API while querying for tenants: {response}"
)
if not tenants: # The user has not created a tenant yet
raise ClientError(
"Your Prefect Server instance has no tenants. "
"Create a tenant with `prefect server create-tenant`"
)
return tenants[0].id
elif prefect.config.backend == "cloud":
raise ValueError(
"Default tenants are determined by authentication in Prefect Cloud. "
"See `_get_auth_tenant` instead."
)
else:
raise ValueError("Unknown backend {prefect.config.backend!r}")
def load_auth_from_disk(self) -> Dict[str, str]:
"""
Get the stashed `api_key` and `tenant_id` for the current `api_server` from the
disk cache if it exists. If it does not, an empty dict is returned.
WARNING: This will not mutate the `Client`, you must use the returned dict
to set `api_key` and `tenant_id`. This is
"""
if not self._auth_file.exists():
return {}
return toml.loads(self._auth_file.read_text()).get(self._api_server_slug, {})
def save_auth_to_disk(self) -> None:
"""
Write the current auth information to the disk cache under a header for the
current `api_server`
"""
# Load the current contents of the entire file
contents = (
toml.loads(self._auth_file.read_text()) if self._auth_file.exists() else {}
)
# Update the data for this API server
contents[self._api_server_slug] = {
"api_key": self.api_key,
"tenant_id": self._tenant_id,
}
# Update the file, including a comment blurb
self._auth_file.parent.mkdir(parents=True, exist_ok=True)
self._auth_file.write_text(
"# This file is auto-generated and should not be manually edited\n"
"# Update the Prefect config or use the CLI to login instead\n\n"
+ toml.dumps(contents)
)
@property
def _api_server_slug(self) -> str:
"""
A slugified version of the API server's network location, used for loading
and saving settings for different API servers.
This should only be relevant for the 'cloud' backend since the 'server' backend
does not save authentication.
This should remain stable or users will not be able to load credentials
"""
# Parse the server to drop the protocol
netloc = urlparse(self.api_server).netloc
# Then return the slugified contents, falling back to the full server if it
# could not be parsed into a net location
return slugify(netloc or self.api_server, regex_pattern=r"[^-\.a-z0-9]+")
@property
def tenant_id(self) -> str:
"""
Retrieve the current tenant id the client is interacting with.
If it is has not been explicitly set, the default tenant id will be retrieved
"""
if prefect.config.backend == "cloud":
if self._api_token and not self.api_key:
# Backwards compatibility for API tokens
if not self._tenant_id:
self._init_tenant()
# Should be set by `_init_tenant()` but we will not guarantee it
return self._tenant_id # type: ignore
if not self._tenant_id:
self._tenant_id = self._get_auth_tenant()
elif prefect.config.backend == "server":
if not self._tenant_id:
self._tenant_id = self._get_default_server_tenant()
if not self._tenant_id:
raise ClientError(
"A tenant could not be determined. Please use `prefect auth status` "
"to get information about your authentication and file an issue."
)
return self._tenant_id
@tenant_id.setter
def tenant_id(self, tenant_id: str) -> None:
self._tenant_id = tenant_id
# ----------------------------------------------------------------------------------
def create_tenant(self, name: str, slug: str = None) -> str:
"""
Creates a new tenant.
Note this route only works when run against Prefect Server.
Args:
- name (str): the name of the tenant to create
- slug (str, optional): the slug of the tenant to create; defaults to name
Returns:
- str: the ID of the newly created tenant, or the ID of the currently active tenant
Raises:
- ValueError: if run against Prefect Cloud
"""
if prefect.config.backend != "server":
msg = "To create a tenant with Prefect Cloud, please signup at https://cloud.prefect.io/"
raise ValueError(msg)
if slug is None:
slug = slugify(name)
tenant_info = self.graphql(
{
"mutation($input: create_tenant_input!)": {
"create_tenant(input: $input)": {"id"}
}
},
variables=dict(input=dict(name=name, slug=slug)),
)
return tenant_info.data.create_tenant.id
# -------------------------------------------------------------------------
# Utilities
def get(
self,
path: str,
server: str = None,
headers: dict = None,
params: Dict[str, JSONLike] = None,
token: str = None,
retry_on_api_error: bool = True,
) -> dict:
"""
Convenience function for calling the Prefect API with token auth and GET request
Args:
- path (str): the path of the API url. For example, to GET
http://prefect-server/v1/auth/login, path would be 'auth/login'.
- server (str, optional): the server to send the GET request to;
defaults to `self.api_server`
- headers (dict, optional): Headers to pass with the request
- params (dict): GET parameters
- token (str): an auth token. If not supplied, the `client.access_token` is used.
- retry_on_api_error (bool): whether the operation should be retried if the API returns
an API_ERROR code
Returns:
- dict: Dictionary representation of the request made
"""
response = self._request(
method="GET",
path=path,
params=params,
server=server,
headers=headers,
token=token,
retry_on_api_error=retry_on_api_error,
)
if response.text:
return response.json()
else:
return {}
def post(
self,
path: str,
server: str = None,
headers: dict = None,
params: Dict[str, JSONLike] = None,
token: str = None,
retry_on_api_error: bool = True,
) -> dict:
"""
Convenience function for calling the Prefect API with token auth and POST request
Args:
- path (str): the path of the API url. For example, to POST
http://prefect-server/v1/auth/login, path would be 'auth/login'.
- server (str, optional): the server to send the POST request to;
defaults to `self.api_server`
- headers(dict): headers to pass with the request
- params (dict): POST parameters
- token (str): an auth token. If not supplied, the `client.access_token` is used.
- retry_on_api_error (bool): whether the operation should be retried if the API returns
an API_ERROR code
Returns:
- dict: Dictionary representation of the request made
"""
response = self._request(
method="POST",
path=path,
params=params,
server=server,
headers=headers,
token=token,
retry_on_api_error=retry_on_api_error,
)
if response.text:
return response.json()
else:
return {}
def _init_tenant(self) -> None:
"""
Init the tenant to contact the server.
If your backend is set to cloud the tenant will be read from: $HOME/.prefect/settings.toml.
For the server backend it will try to retrieve the default tenant. If the server is
protected with auth like BasicAuth do not forget to `attach_headers` before any call.
DEPRECATED.
- API keys no longer need to log in and out of a tenant
- The tenant is now set at __init__ or in the `tenant_id` property
"""
if prefect.config.backend == "cloud":
# if no api token was passed, attempt to load state from local storage
settings = self._load_local_settings()
if not self._api_token:
self._api_token = settings.get("api_token")
if self._api_token:
self._tenant_id = settings.get("active_tenant_id")
# Must refer to private variable since the property calls this function
if self._tenant_id:
try:
self.login_to_tenant(tenant_id=self._tenant_id)
except AuthorizationError:
# Either the token is invalid _or_ it is not USER scoped. Try
# pulling the correct tenant id from the API
try:
result = self.graphql({"query": {"tenant": {"id"}}})
tenants = result["data"]["tenant"]
# TENANT or RUNNER scoped tokens should have a single tenant
if len(tenants) != 1:
raise ValueError(
"Failed to authorize with Prefect Cloud. "
f"Could not log in to tenant {self._tenant_id!r}. "
f"Found available tenants: {tenants}"
)
self._tenant_id = tenants[0].id
except AuthorizationError:
# On failure, we've just been given an invalid token and should
# delete the auth information from disk
self.logout_from_tenant()
# This code should now be superceded by the `tenant_id` property but will remain
# here for backwards compat until API tokens are removed entirely
else:
tenant_info = self.graphql({"query": {"tenant": {"id"}}})
if tenant_info.data.tenant:
self._tenant_id = tenant_info.data.tenant[0].id
def graphql(
self,
query: Any,
raise_on_error: bool = True,
headers: Dict[str, str] = None,
variables: Mapping[str, JSONLike] = None,
token: str = None,
retry_on_api_error: bool = True,
) -> GraphQLResult:
"""
Convenience function for running queries against the Prefect GraphQL API
Args:
- query (Any): A representation of a graphql query to be executed. It will be
parsed by prefect.utilities.graphql.parse_graphql().
- raise_on_error (bool): if True, a `ClientError` will be raised if the GraphQL
returns any `errors`.
- headers (dict): any additional headers that should be passed as part of the
request
- variables (dict): Variables to be filled into a query with the key being
equivalent to the variables that are accepted by the query
- token (str): an auth token. If not supplied, the `client.access_token` is used.
- retry_on_api_error (bool): whether the operation should be retried if the API returns
an API_ERROR code
Returns:
- dict: Data returned from the GraphQL query
Raises:
- ClientError if there are errors raised by the GraphQL mutation
"""
result = self.post(
path="",
server=self.api_server,
headers=headers,
params=dict(query=parse_graphql(query), variables=json.dumps(variables)),
token=token,
retry_on_api_error=retry_on_api_error,
)
# TODO: It looks like this code is never reached because errors are raised
# in self._send_request by default
if raise_on_error and "errors" in result:
if "UNAUTHENTICATED" in str(result["errors"]):
raise AuthorizationError(result["errors"])
elif "Malformed Authorization header" in str(result["errors"]):
raise AuthorizationError(result["errors"])
elif (
result["errors"][0].get("extensions", {}).get("code")
== "VERSION_LOCKING_ERROR"
):
raise VersionLockMismatchSignal(result["errors"])
raise ClientError(result["errors"])
else:
return GraphQLResult(result) # type: ignore
def _send_request(
self,
session: "requests.Session",
method: str,
url: str,
params: Dict[str, JSONLike] = None,
headers: dict = None,
rate_limit_counter: int = 1,
) -> "requests.models.Response":
import requests
if prefect.context.config.cloud.get("diagnostics") is True:
self.logger.debug(f"Preparing request to {url}")
clean_headers = {
head: re.sub("Bearer .*", "Bearer XXXX", val)
for head, val in headers.items() # type: ignore
}
self.logger.debug(f"Headers: {clean_headers}")
self.logger.debug(f"Request: {params}")
start_time = time.time()
if method == "GET":
response = session.get(
url,
headers=headers,
params=params,
timeout=prefect.context.config.cloud.request_timeout,
)
elif method == "POST":
response = session.post(
url,
headers=headers,
json=params,
timeout=prefect.context.config.cloud.request_timeout,
)
elif method == "DELETE":
response = session.delete(
url,
headers=headers,
timeout=prefect.context.config.cloud.request_timeout,
)
else:
raise ValueError("Invalid method: {}".format(method))
if prefect.context.config.cloud.get("diagnostics") is True:
end_time = time.time()
self.logger.debug(f"Response: {response.json()}")
self.logger.debug(
f"Request duration: {round(end_time - start_time, 4)} seconds"
)
# custom logic when encountering an API rate limit:
# each time we encounter a rate limit, we sleep for
# 3 minutes + random amount, where the random amount
# is uniformly sampled from (0, 10 * 2 ** rate_limit_counter)
# up to (0, 640), at which point an error is raised if the limit
# is still being hit
rate_limited = response.status_code == 429
if rate_limited and rate_limit_counter <= 6:
jitter = random.random() * 10 * (2 ** rate_limit_counter)
naptime = 3 * 60 + jitter # 180 second sleep + increasing jitter
self.logger.debug(f"Rate limit encountered; sleeping for {naptime}s...")
time.sleep(naptime)
response = self._send_request(
session=session,
method=method,
url=url,
params=params,
headers=headers,
rate_limit_counter=rate_limit_counter + 1,
)
# Check if request returned a successful status
try:
response.raise_for_status()
except requests.HTTPError as exc:
if response.status_code == 400 and params and "query" in params:
# Create a custom-formatted err message for graphql errors which always
# return a 400 status code and have "query" in the parameter dict
try:
graphql_msg = format_graphql_request_error(response)
except Exception:
# Fallback to a general message
graphql_msg = (
"This is likely caused by a poorly formatted GraphQL query or "
"mutation but the response could not be parsed for more details"
)
raise ClientError(f"{exc}\n{graphql_msg}") from exc
# Server-side and non-graphql errors will be raised without modification
raise
return response
def _request(
self,
method: str,
path: str,
params: Dict[str, JSONLike] = None,
server: str = None,
headers: dict = None,
token: str = None,
retry_on_api_error: bool = True,
) -> "requests.models.Response":
"""
Runs any specified request (GET, POST, DELETE) against the server
Args:
- method (str): The type of request to be made (GET, POST, DELETE)
- path (str): Path of the API URL
- params (dict, optional): Parameters used for the request
- server (str, optional): The server to make requests against, base API
server is used if not specified
- headers (dict, optional): Headers to pass with the request
- token (str): an auth token. If not supplied, the `client.access_token` is used.
- retry_on_api_error (bool): whether the operation should be retried if the API returns
an API_ERROR code
Returns:
- requests.models.Response: The response returned from the request
Raises:
- ClientError: if the client token is not in the context (due to not being logged in)
- ValueError: if a method is specified outside of the accepted GET, POST, DELETE
- requests.HTTPError: if a status code is returned that is not `200` or `401`
"""
if server is None:
server = self.api_server
assert isinstance(server, str) # mypy assert
if token is None:
token = self.get_auth_token()
# 'import requests' is expensive time-wise, we should do this just-in-time to keep
# the 'import prefect' time low
import requests
url = urljoin(server, path.lstrip("/")).rstrip("/")
params = params or {}
headers = headers or {}
if token:
headers["Authorization"] = "Bearer {}".format(token)
if self.api_key and self._tenant_id:
# Attach a tenant id to the headers if using an API key since it can be
# used accross tenants. API tokens cannot and do not need this header.
headers["X-PREFECT-TENANT-ID"] = self._tenant_id
headers["X-PREFECT-CORE-VERSION"] = str(prefect.__version__)
if self._attached_headers:
headers.update(self._attached_headers)
session = requests.Session()
retry_total = 6 if prefect.config.backend == "cloud" else 1
retries = requests.packages.urllib3.util.retry.Retry(
total=retry_total,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
method_whitelist=["DELETE", "GET", "POST"],
)
session.mount("https://", requests.adapters.HTTPAdapter(max_retries=retries))
response = self._send_request(
session=session, method=method, url=url, params=params, headers=headers
)
# parse the response
try:
json_resp = response.json()
except JSONDecodeError as exc:
if prefect.config.backend == "cloud" and "Authorization" not in headers:
raise AuthorizationError(
"Malformed response received from Cloud - please ensure that you "
"are authenticated. See `prefect auth login --help`."
) from exc
else:
raise ClientError("Malformed response received from API.") from exc
# check if there was an API_ERROR code in the response
if "API_ERROR" in str(json_resp.get("errors")) and retry_on_api_error:
success, retry_count = False, 0
# retry up to six times
while success is False and retry_count < 6:
response = self._send_request(
session=session,
method=method,
url=url,
params=params,
headers=headers,
)
if "API_ERROR" in str(response.json().get("errors")):
retry_count += 1
time.sleep(0.25 * (2 ** (retry_count - 1)))
else:
success = True
return response
def attach_headers(self, headers: dict) -> None:
"""
Set headers to be attached to this Client
Args:
- headers (dict): A dictionary of headers to attach to this client. These headers
get added on to the existing dictionary of headers.
"""
self._attached_headers.update(headers)
# API Token Authentication ---------------------------------------------------------
# This is all deprecated and slated for removal in 0.16.0 when API token support is
# dropped
@property
def _api_token_settings_path(self) -> Path:
"""
Returns the local settings directory corresponding to the current API servers
when using an API token
DEPRECATED: API keys have replaced API tokens. API keys are stored in a new
location. See `_auth_file`.
"""
path = "{home}/client/{server}".format(
home=prefect.context.config.home_dir,
server=slugify(self.api_server, regex_pattern=r"[^-\.a-z0-9]+"),
)
return Path(os.path.expanduser(path)) / "settings.toml"
@property
def active_tenant_id(self) -> Optional[str]:
"""
DEPRECATED: This retains an old property used by API tokens. `tenant_id` is the
new implementation.
"""
return self.tenant_id
def _save_local_settings(self, settings: dict) -> None:
"""
Writes settings to local storage
DEPRECATED: API keys have replaced API tokens. API keys are stored in a new
location. See `save_auth_to_disk`
"""
self._api_token_settings_path.parent.mkdir(exist_ok=True, parents=True)
with self._api_token_settings_path.open("w+") as f:
toml.dump(settings, f)
def _load_local_settings(self) -> dict:
"""
Loads settings from local storage
DEPRECATED: API keys have replaced API tokens. API keys are stored in a new
location. See `load_auth_from_disk`
"""
if self._api_token_settings_path.exists():
with self._api_token_settings_path.open("r") as f:
return toml.load(f) # type: ignore
return {}
def save_api_token(self) -> None:
"""
Saves the API token in local storage.
DEPRECATED: API keys have replaced API tokens. API keys are stored in a new
location. See `save_auth_to_disk`
"""
settings = self._load_local_settings()
settings["api_token"] = self._api_token
self._save_local_settings(settings)
def get_auth_token(self) -> str:
"""
Returns an auth token:
- if there's an API key, return that immediately
- if no explicit access token is stored, returns the api token
- if there is an access token:
- if there's a refresh token and the access token expires in the next 30 seconds,
then we refresh the access token and store the result
- return the access token
DEPRECATED: API keys have replaced API tokens. We no longer need this refresh
logic for API keys.
Returns:
- str: the access token
"""
if self.api_key:
return self.api_key
if not self._access_token:
return self._api_token # type: ignore
expiration = self._access_token_expires_at or pendulum.now()
if self._refresh_token and pendulum.now().add(seconds=30) > expiration:
self._refresh_access_token()
return self._access_token
def get_available_tenants(self) -> List[Dict]:
"""
Returns a list of available tenants.
NOTE: this should only be called by users who have provided a USER-scoped API token.
Returns:
- List[Dict]: a list of dictionaries containing the id, slug, and name of
available tenants
"""
result = self.graphql(
{"query": {"tenant(order_by: {slug: asc})": {"id", "slug", "name"}}},
# API keys can see all available tenants. If not using an API key, we can't
# use the access token which is scoped to a single tenant
token=self.api_key or self._api_token,
)
return result.data.tenant # type: ignore
def login_to_tenant(self, tenant_slug: str = None, tenant_id: str = None) -> bool:
"""
Log in to a specific tenant
If using an API key, the client tenant will be updated but will not be saved to
disk without an explicit call.
If using an API token, it must be USER-scoped API token. The client tenant will
be updated and the new tenant will be saved to disk for future clients.
Args:
- tenant_slug (str): the tenant's slug
- tenant_id (str): the tenant's id
Returns:
- bool: True if the login was successful
Raises:
- ValueError: if at least one of `tenant_slug` or `tenant_id` isn't provided
- ValueError: if the `tenant_id` is not a valid UUID
- ValueError: if no matching tenants are found
"""
# Validate the given tenant id -------------------------------------------------
if tenant_slug is None and tenant_id is None:
raise ValueError(
"At least one of `tenant_slug` or `tenant_id` must be provided."
)
elif tenant_id:
# TODO: Consider removing this check. This would be caught by GraphQL
try:
uuid.UUID(tenant_id)
except ValueError as exc:
raise ValueError("The `tenant_id` must be a valid UUID.") from exc
tenant = self.graphql(
{
"query($slug: String, $id: uuid)": {
"tenant(where: {slug: { _eq: $slug }, id: { _eq: $id } })": {"id"}
}
},
variables=dict(slug=tenant_slug, id=tenant_id),
# API keys can see all available tenants. If not using an API key, we can't
# use the access token which is scoped to a single tenant
token=self.api_key or self._api_token,
)
if not tenant.data.tenant:
raise ValueError("No matching tenant found.")
# We may have been given just the slug so set the id
tenant_id = tenant.data.tenant[0].id
# Update the tenant the client is using ----------------------------------------
self._tenant_id = tenant_id
# Backwards compatibility for API tokens ---------------------------------------
# - Get a new access token for the tenant
# - Save it to disk
if not self.api_key and prefect.config.backend == "cloud":
payload = self.graphql(
{
"mutation($input: switch_tenant_input!)": {
"switch_tenant(input: $input)": {
"access_token",
"expires_at",
"refresh_token",
}
}
},
variables=dict(input=dict(tenant_id=tenant_id)),
# Use the API token to switch tenants
token=self._api_token,
)
self._access_token = payload.data.switch_tenant.access_token
self._access_token_expires_at = cast(
pendulum.DateTime, pendulum.parse(payload.data.switch_tenant.expires_at)
)
self._refresh_token = payload.data.switch_tenant.refresh_token
# Save the tenant setting to disk
settings = self._load_local_settings()
settings["active_tenant_id"] = self.tenant_id
self._save_local_settings(settings)
return True
def logout_from_tenant(self) -> None:
"""
DEPRECATED: API keys have replaced API tokens.
Logout can be accomplished for API keys with:
```
client = Client()
client.api_key = ""
client._tenant_id = ""
client.save_auth_to_disk()
```
"""
self._access_token = None
self._refresh_token = None
self._tenant_id = None
# remove the tenant setting
settings = self._load_local_settings()
settings["active_tenant_id"] = None
self._save_local_settings(settings)
def _refresh_access_token(self) -> bool: