/
commcare_hq_client.py
239 lines (191 loc) · 8.15 KB
/
commcare_hq_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
from __future__ import unicode_literals, print_function, absolute_import, division, generators, nested_scopes
import copy
import logging
from collections import OrderedDict
import backoff
import requests
from requests.auth import AuthBase
from requests.auth import HTTPDigestAuth
AUTH_MODE_PASSWORD = 'password'
AUTH_MODE_APIKEY = 'apikey'
try:
from urllib.request import urlopen
from urllib.parse import urlparse, urlencode, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
from urllib import urlopen, urlencode
import commcare_export
from commcare_export.repeatable_iterator import RepeatableIterator
logger = logging.getLogger(__name__)
LATEST_KNOWN_VERSION='0.5'
RESOURCE_REPEAT_LIMIT=10
def on_backoff(details):
_log_backoff(details, 'Waiting for retry.')
def on_giveup(details):
_log_backoff(details, 'Giving up.')
def _log_backoff(details, action_message):
details['__suffix'] = action_message
logger.warning("Request failed after {tries} attempts ({elapsed:.1f}s). {__suffix}".format(**details))
def is_client_error(ex):
logger.info(str(ex))
if hasattr(ex, 'response') and ex.response is not None:
if ex.response.status_code == 429:
# "Too Many Requests": HQ wants us to back off
return False
return 400 <= ex.response.status_code < 500
return False
class ResourceRepeatException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return self.message
class CommCareHqClient(object):
"""
A connection to CommCareHQ for a particular version, project, and user.
"""
def __init__(self, url, project, username, password,
auth_mode=AUTH_MODE_PASSWORD, version=LATEST_KNOWN_VERSION, checkpoint_manager=None):
self.version = version
self.url = url
self.project = project
self.__auth = self._get_auth(username, password, auth_mode)
self.__session = None
def _get_auth(self, username, password, mode):
if mode == AUTH_MODE_PASSWORD:
return HTTPDigestAuth(username, password)
elif mode == AUTH_MODE_APIKEY:
return ApiKeyAuth(username, password)
else:
raise Exception('Unknown auth mode: %s' % mode)
@property
def session(self):
if self.__session == None:
self.__session = requests.Session()
self.__session.headers.update({
'User-Agent': 'commcare-export/%s' % commcare_export.__version__
})
return self.__session
@session.setter
def session(self, session):
"""Used for overriding the session in unit tests"""
self.__session = session
@property
def api_url(self):
return '%s/a/%s/api/v%s' % (self.url, self.project, self.version)
@backoff.on_exception(
backoff.expo, requests.exceptions.RequestException,
max_time=300, giveup=is_client_error,
on_backoff=on_backoff, on_giveup=on_giveup
)
def get(self, resource, params=None):
"""
Gets the named resource.
Currently a bit of a vulnerable stub that works
for this particular use case in the hands of a trusted user; would likely
want this to work like (or via) slumber.
"""
logger.debug("Fetching '%s' batch: %s", resource, params)
resource_url = '%s/%s/' % (self.api_url, resource)
response = self.session.get(resource_url, params=params, auth=self.__auth, timeout=60)
response.raise_for_status()
return response.json()
def iterate(self, resource, paginator, params=None, checkpoint_manager=None):
"""
Assumes the endpoint is a list endpoint, and iterates over it
making a lot of assumptions that it is like a tastypie endpoint.
"""
params = dict(params or {})
def iterate_resource(resource=resource, params=params):
more_to_fetch = True
last_batch_ids = set()
total_count = None
fetched = 0
repeat_counter = 0
last_params = None
while more_to_fetch:
if params == last_params:
repeat_counter += 1
else:
repeat_counter = 0
if repeat_counter >= RESOURCE_REPEAT_LIMIT:
raise ResourceRepeatException("Requested resource '{}' {} times with same parameters".format(resource, repeat_counter))
batch = self.get(resource, params)
last_params = copy.copy(params)
if not total_count or total_count == 'unknown' or fetched >= total_count:
total_count = int(batch['meta']['total_count']) if batch['meta']['total_count'] else 'unknown'
fetched = 0
fetched += len(batch['objects'])
logger.debug('Received %s of %s', fetched, total_count)
if not batch['objects']:
more_to_fetch = False
else:
for obj in batch['objects']:
if obj['id'] not in last_batch_ids:
yield obj
if batch['meta']['next']:
last_batch_ids = {obj['id'] for obj in batch['objects']}
params = paginator.next_page_params_from_batch(batch)
if not params:
more_to_fetch = False
else:
more_to_fetch = False
self.checkpoint(checkpoint_manager, paginator, batch, not more_to_fetch)
return RepeatableIterator(iterate_resource)
def checkpoint(self, checkpoint_manager, paginator, batch, is_final):
from commcare_export.commcare_minilinq import DatePaginator
if isinstance(paginator, DatePaginator):
since_date = paginator.get_since_date(batch)
if since_date:
checkpoint_manager.set_checkpoint(since_date, is_final)
else:
logger.warning('Failed to get a checkpoint date from a batch of data.')
class MockCommCareHqClient(object):
"""
An in-memory mock of the hq client, instantiated
with a simple mapping of resource and params to results.
Since dictionaries are not hashable, the mapping is
written as a pair of tuples, handled appropriately
internallly.
MockCommCareHqClient({
'forms': [
(
{'_search': 'test1'},
[
... objects ...
]
),
]
})
"""
def __init__(self, mock_data):
self.mock_data = dict([(resource, dict([(urlencode(OrderedDict(sorted(params.items()))), result) for params, result in resource_results]))
for resource, resource_results in mock_data.items()])
def iterate(self, resource, paginator, params=None, checkpoint_manager=None):
logger.debug('Mock client call to resource "%s" with params "%s"', resource, params)
return self.mock_data[resource][urlencode(OrderedDict(sorted(params.items())))]
def get(self, resource):
logger.debug('Mock client call to get resource "%s"', resource)
objects = self.mock_data[resource][urlencode(OrderedDict([('get', True)]))]
if objects:
return {'meta': {'limit': len(objects), 'next': None,
'offset': 0, 'previous': None,
'total_count': len(objects)},
'objects': objects}
else:
return None
class ApiKeyAuth(AuthBase):
def __init__(self, username, apikey):
self.username = username
self.apikey = apikey
def __eq__(self, other):
return all([
self.username == getattr(other, 'username', None),
self.apikey == getattr(other, 'apikey', None)
])
def __hash__(self):
return hash((self.username, self.apikey))
def __ne__(self, other):
return not self == other
def __call__(self, r):
r.headers['Authorization'] = 'apikey %s:%s' % (self.username, self.apikey)
return r