-
Notifications
You must be signed in to change notification settings - Fork 4k
/
http.py
333 lines (281 loc) · 14.6 KB
/
http.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream
from requests.auth import AuthBase
from .auth.core import HttpAuthenticator, NoAuth
from .exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from .rate_limiting import default_backoff_handler, user_defined_backoff_handler
# list of all possible HTTP methods which can be used for sending of request bodies
BODY_REQUEST_METHODS = ("POST", "PUT", "PATCH")
class HttpStream(Stream, ABC):
"""
Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
"""
source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
page_size = None # Use this variable to define page size for API http requests with pagination support
# TODO: remove legacy HttpAuthenticator authenticator references
def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None):
self._session = requests.Session()
self._authenticator = NoAuth()
if isinstance(authenticator, AuthBase):
self._session.auth = authenticator
elif authenticator:
self._authenticator = authenticator
@property
@abstractmethod
def url_base(self) -> str:
"""
:return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
"""
@property
def http_method(self) -> str:
"""
Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
"""
return "GET"
@property
def raise_on_http_errors(self) -> bool:
"""
Override if needed. If set to False, allows opting-out of raising HTTP code exception.
"""
return True
@property
def max_retries(self) -> Union[int, None]:
"""
Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
"""
return 5
@property
def retry_factor(self) -> int:
"""
Override if needed. Specifies factor for backoff policy.
"""
return 5
@property
def authenticator(self) -> HttpAuthenticator:
return self._authenticator
@abstractmethod
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Override this method to define a pagination strategy.
The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
:return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
"""
@abstractmethod
def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
"""
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
"""
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
"""
Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
"""
return {}
def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
"""
Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
"""
return {}
def request_body_data(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Union[Mapping, str]]:
"""
Override when creating POST/PUT/PATCH requests to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is.
If returns a dict that it will be converted to a urlencoded form.
E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
"""
return None
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
"""
Override when creating POST/PUT/PATCH requests to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
"""
return None
def request_kwargs(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
"""
Override to return a mapping of keyword arguments to be used when creating the HTTP request.
Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
"""
return {}
@abstractmethod
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
"""
Parses the raw response object into a list of records.
By default, this returns an iterable containing the input. Override to parse differently.
:param response:
:return: An iterable containing the parsed response
"""
# TODO move all the retry logic to a functor/decorator which is input as an init parameter
def should_retry(self, response: requests.Response) -> bool:
"""
Override to set different conditions for backoff based on the response from the server.
By default, back off on the following HTTP response statuses:
- 429 (Too Many Requests) indicating rate limiting
- 500s to handle transient server errors
Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default.
"""
return response.status_code == 429 or 500 <= response.status_code < 600
def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
This method is called only if should_backoff() returns True for the input request.
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
to the default backoff behavior (e.g using an exponential algorithm).
"""
return None
def _create_prepared_request(
self, path: str, headers: Mapping = None, params: Mapping = None, json: Any = None, data: Any = None
) -> requests.PreparedRequest:
args = {"method": self.http_method, "url": self.url_base + path, "headers": headers, "params": params}
if self.http_method.upper() in BODY_REQUEST_METHODS:
if json and data:
raise RequestBodyException(
"At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
)
elif json:
args["json"] = json
elif data:
args["data"] = data
return self._session.prepare_request(requests.Request(**args))
def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
"""
Wraps sending the request in rate limit and error handlers.
Please note that error handling for HTTP status codes will be ignored if raise_on_http_errors is set to False
This method handles two types of exceptions:
1. Expected transient exceptions e.g: 429 status code.
2. Unexpected transient exceptions e.g: timeout.
To trigger a backoff, we raise an exception that is handled by the backoff decorator. If an exception is not handled by the decorator will
fail the sync.
For expected transient exceptions, backoff time is determined by the type of exception raised:
1. CustomBackoffException uses the user-provided backoff value
2. DefaultBackoffException falls back on the decorator's default behavior e.g: exponential backoff
Unexpected transient exceptions use the default backoff parameters.
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
response: requests.Response = self._session.send(request, **request_kwargs)
if self.should_retry(response):
custom_backoff_time = self.backoff_time(response)
if custom_backoff_time:
raise UserDefinedBackoffException(backoff=custom_backoff_time, request=request, response=response)
else:
raise DefaultBackoffException(request=request, response=response)
elif self.raise_on_http_errors:
# Raise any HTTP exceptions that happened in case there were unexpected ones
response.raise_for_status()
return response
def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
"""
Creates backoff wrappers which are responsible for retry logic
"""
"""
Backoff package has max_tries parameter that means total number of
tries before giving up, so if this number is 0 no calls expected to be done.
But for this class we call it max_REtries assuming there would be at
least one attempt and some retry attempts, to comply this logic we add
1 to expected retries attempts.
"""
max_tries = self.max_retries
"""
According to backoff max_tries docstring:
max_tries: The maximum number of attempts to make before giving
up ...The default value of None means there is no limit to
the number of tries.
This implies that if max_tries is excplicitly set to None there is no
limit to retry attempts, otherwise it is limited number of tries. But
this is not true for current version of backoff packages (1.8.0). Setting
max_tries to 0 or negative number would result in endless retry atempts.
Add this condition to avoid an endless loop if it hasnt been set
explicitly (i.e. max_retries is not None).
"""
if max_tries is not None:
max_tries = max(0, max_tries) + 1
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
while not pagination_complete:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True
# Always return an empty generator just in case no records were ever yielded
yield from []