This repository has been archived by the owner on Apr 22, 2024. It is now read-only.
/
__init__.py
313 lines (280 loc) · 12.3 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# Copyright (C) 2019 by eHealth Africa : http://www.eHealthAfrica.org
#
# See the NOTICE file distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from oauthlib import oauth2
from time import sleep
from urllib.parse import urlparse
# monkey patch so that bulk insertion works
from .patches import patched__marshal_object, patched__unmarshal_object
import bravado_core
bravado_core.marshal._marshal_object = patched__marshal_object # noqa
bravado_core.unmarshal._unmarshal_object = patched__unmarshal_object # noqa
import bravado
from bravado.client import (
SwaggerClient,
ResourceDecorator,
CallableOperation,
construct_request
)
from bravado.config import bravado_config_from_config_dict
from bravado.swagger_model import Loader
from .exceptions import AetherAPIException
from .basic_auth import BasicRealmClient
from .oidc import OauthClient
from .logger import LOG
_SPEC_URL = '{}/v1/schema/?format=openapi'
class Client(SwaggerClient):
AUTH_METHODS = ['oauth', 'basic']
def __init__(
self,
url,
user=None,
pw=None,
offline_token=None,
log_level='ERROR',
config=None,
realm=None,
# if using a gateway from a non-standard location
keycloak_url=None,
auth_type='oauth',
# used to specify gateway endpoint ({realm}/{endpoint_name})
endpoint_name='kernel',
):
if auth_type not in Client.AUTH_METHODS:
raise ValueError(f'allowed auth_types are {Client.AUTH_METHODS}')
log_level = logging.getLevelName(log_level)
LOG.setLevel(log_level)
# Our Swagger spec is apparently somewhat problematic
# we default to no validation.
config = config or {
'validate_swagger_spec': False,
'validate_requests': False,
'validate_responses': False
}
url_info = urlparse(url)
server = f'{url_info.scheme}://{url_info.netloc}'
if auth_type == 'basic':
LOG.debug(f'Using basic auth on {server}')
http_client = BasicRealmClient()
http_client.set_realm_basic_auth(
host=url_info.netloc,
username=user,
password=pw,
realm=realm,
)
loader = Loader(http_client, request_headers=None)
try:
spec_url = _SPEC_URL.format(url)
LOG.debug(f'Loading schema from: {spec_url}')
spec_dict = loader.load_spec(spec_url)
except bravado.exception.HTTPForbidden as forb:
LOG.error('Could not authenticate with provided credentials')
raise forb
except (
bravado.exception.HTTPBadGateway,
bravado.exception.BravadoConnectionError
) as bgwe:
LOG.error('Server Unavailable')
raise bgwe
else:
LOG.debug(f'getting OIDC session on realm {realm}')
http_client = OauthClient()
http_client.set_oauth(
url_info.netloc,
keycloak_url or f'{server}/auth', realm,
user, pw, offline_token, endpoint_name)
spec_url = _SPEC_URL.format(f'{server}/{realm}/{endpoint_name}')
spec_dict = http_client.authenticator.get_spec(spec_url)
# We take this from the from_url class method of SwaggerClient
# Apply bravado config defaults
bravado_config = bravado_config_from_config_dict(config)
# remove bravado configs from config dict
for key in set(bravado_config._fields).intersection(set(config)):
del config[key]
# set bravado config object
config['bravado'] = bravado_config
swagger_spec = bravado_core.spec.Spec.from_dict(
spec_dict, spec_url, http_client, config)
self.__also_return_response = True
self.swagger_spec = swagger_spec
super(Client, self).__init__(
swagger_spec, also_return_response=self.__also_return_response)
def _get_resource(self, item):
# We override this method to use our AetherDecorator class
resource = self.swagger_spec.resources.get(item)
if not resource:
raise AttributeError(
'Resource {0} not found. Available resources: {1}'
.format(item, ', '.join(dir(self))))
# Wrap bravado-core's Resource and Operation objects in order to
# execute a service call via the http_client.
# Replaces with AetherSpecific handler
return AetherDecorator(
resource,
self.__also_return_response,
self.swagger_spec
)
def __getitem__(self, name):
return getattr(self, name)
# useful for debugging issues with outgoing requests. Only called when ll == DEBUG
def show_request(operation, *args, **kwargs):
request_options = kwargs.pop('_request_options', {})
request_params = construct_request(
operation, request_options, **kwargs)
return([kwargs, request_params])
'''
Some arguments don't properly display in the swagger specification so we have
to add them at runtime for the client to support them. This includes all payload
filters like payload__name=John. Normally payload__name wouldn't be found in the
spec and an error would be produced.
'''
def mockParam(name, op, swagger_spec):
param_spec = {'name': name, 'in': 'query',
'description': '', 'required': False, 'type': 'string'}
return bravado_core.param.Param(swagger_spec, op, param_spec)
class AetherDecorator(ResourceDecorator):
def __init__(self, resource, also_return_response=True, swagger_spec=None):
self.name = resource.name
# The only way to be able to form coherent exceptions is to catch these
# common types and wrap them in our own, exposing the status and error
# feeback from the API.
self.handled_exceptions = [
bravado.exception.HTTPBadRequest,
bravado.exception.HTTPBadGateway,
bravado.exception.HTTPNotFound,
bravado.exception.HTTPForbidden,
oauth2.rfc6749.errors.InvalidGrantError
]
# Errors in connection worthy of a retry
self.retry_exceptions = [
bravado.exception.BravadoTimeoutError,
bravado.exception.BravadoConnectionError
]
self.swagger_spec = swagger_spec
super(AetherDecorator, self).__init__(
resource, also_return_response)
def __getitem__(self, name):
return getattr(self, name)
def __getattr__(self, name):
fn = CallableOperation(
getattr(self.resource, self._get_full_name(name)),
self.also_return_response)
# It was annoying to constantly call .response().result to get to the most
# valuable data. Also errors were being swallowed by the inner workings of
# Bravado. Wrapping the returned function handles this.
def resultant_function(*args, **kwargs):
# try:
future = fn(*args, **kwargs)
# On debug, show outgoing requests
LOG.debug(show_request(
getattr(self.resource, self._get_full_name(name)),
*args,
**kwargs
))
# This is an attempt to fix an error that only occurs in travis where kernel
# connections are dropped in transit or by kernel.
dropped_retries = 5
for x in range(dropped_retries):
try:
# We just want to give the exception right back, but maintain
# access to the response object so that we can grab the error.
# When the exception is caught and handled normally, this is impossible.
# Hence the lambda returning the exception itself when an exception occurs.
response = future.response(
timeout=10,
fallback_result=lambda x: x,
exceptions_to_catch=tuple(self.handled_exceptions)
)
break
except tuple(self.retry_exceptions) as err:
LOG.debug('error %s in connection to client' % (err))
if x == dropped_retries - 1:
LOG.error('failed after %s connections to %s' %
(x, future.operation.operation_id))
raise err
LOG.debug('dropped connection %s to %s, retry' %
(x, future.operation.operation_id))
sleep(.25 + (.25 * x))
result = response.result
# If the result is an exception, we expose it's parts along with
# content from the request response and raise it
if any([isinstance(result, i) for i in self.handled_exceptions]):
details = {
'operation': future.operation.operation_id,
'response': str(result)
}
http_response = response.incoming_response
assert isinstance(http_response, bravado_core.response.IncomingResponse)
details['status_code'] = http_response.status_code
try:
details['response'] = http_response.json()
except Exception:
# JSON is unavailable, so we just use the original exception text.
pass
raise AetherAPIException(**details)
return result
return resultant_function
def _get_full_name(self, name):
# Allows us to use for example 'entities.create' instead of 'entities.entities_create'
return '%s_%s' % (self.name, name)
def _verify_param(self, name, param_name):
operation = getattr(self.resource, self._get_full_name(name))
# allow searching for arbitrary fields within the payload
if param_name.startswith('payload__') or param_name == 'many':
# add it to the allowed list of parameters
operation.params[param_name] = mockParam(param_name, operation, self.swagger_spec)
return True
if param_name not in operation.params:
raise ValueError('%s has no parameter %s' % (name, param_name))
return True
def _verify_params(self, name, params):
return all([self._verify_param(name, i) for i in params])
def __iter__(self):
# show available rpc calls
return iter([i.lstrip('%s_' % self.name) for i in self.__dir__()])
def paginated(self, remote_function, start_page=1, ordering='modified', **kwargs):
fn = getattr(self, remote_function)
params = dict(kwargs)
self._verify_params(remote_function, params.keys())
page = start_page
params['page'] = page
params['ordering'] = ordering
_next = True
while _next:
params['page'] = page
result = fn(**params)
_next = result.get('next')
page += 1
results = result.get('results')
# if not results: # WARNING: this is true with empty lists []
if results is None:
raise StopIteration
for item in results:
yield item
def count(self, remote_function, **kwargs):
fn = getattr(self, remote_function)
params = dict(kwargs)
self._verify_params(remote_function, params.keys())
result = fn(**params)
return result.get('count')
def first(self, remote_function, **kwargs):
fn = getattr(self, remote_function)
params = dict(kwargs)
self._verify_params(remote_function, params.keys())
result = fn(**params)
return result.get('results', [])[0]