-
Notifications
You must be signed in to change notification settings - Fork 63
/
api_context.py
65 lines (53 loc) · 2.44 KB
/
api_context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from __future__ import absolute_import
import time
from warnings import warn
from carto.auth import APIKeyAuthClient
from carto.exceptions import CartoRateLimitException
from carto.sql import BatchSQLClient, CopySQLClient, SQLClient
from ... import __version__
from .base_context import BaseContext
DEFAULT_RETRY_TIMES = 3
class APIContext(BaseContext):
def __init__(self, credentials):
self.auth_client = APIKeyAuthClient(
base_url=credentials.base_url,
api_key=credentials.api_key,
session=credentials.session,
client_id='cartoframes_{}'.format(__version__),
user_agent='cartoframes_{}'.format(__version__)
)
self.sql_client = SQLClient(self.auth_client)
self.copy_client = CopySQLClient(self.auth_client)
self.batch_sql_client = BatchSQLClient(self.auth_client)
def download(self, query, retry_times=DEFAULT_RETRY_TIMES):
try:
return self.copy_client.copyto_stream(query.strip())
except CartoRateLimitException as err:
if retry_times > 0:
retry_times -= 1
warn('Read call rate limited. Waiting {s} seconds'.format(s=err.retry_after))
time.sleep(err.retry_after)
warn('Retrying...')
return self.download(query.strip(), retry_times=retry_times)
else:
warn(('Read call was rate-limited. '
'This usually happens when there are multiple queries being read at the same time.'))
raise err
def upload(self, query, data):
return self.copy_client.copyfrom(query.strip(), data)
def execute_query(self, query, parse_json=True, do_post=True, format=None, **request_args):
return self.sql_client.send(query.strip(), parse_json, do_post, format, **request_args)
def execute_long_running_query(self, query):
return self.batch_sql_client.create_and_wait_for_completion(query.strip())
def get_schema(self):
"""Get user schema from current credentials"""
query = 'select current_schema()'
result = self.execute_query(query, do_post=False)
return result['rows'][0]['current_schema']
def exists(self, query):
exists_query = 'SELECT EXISTS({})'.format(query)
try:
self.execute_query(exists_query, do_post=False)
return True, ''
except Exception as e:
return False, str(e)