-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alex/configurable retrier #14330
Alex/configurable retrier #14330
Changes from all commits
114daea
6d47a6d
dbff403
24f54db
e3f8d50
cbe4e57
ca44a97
3d3f321
4ea9a2b
2f09e79
67f174d
096b54e
db01257
f097ee2
b40eb69
eae5bb5
e1a59aa
f534b20
22a2296
5ade34d
eef3a2b
16c33a5
355fdc4
2b4da45
e58b977
94b0ab1
d9d9004
6296403
a97d730
814088a
7972c20
f94be2c
d50f832
892270c
57ed90e
fb88553
51fc8eb
efc40b5
e33e004
fc41d54
e2ef317
b8faa8d
6f96b26
d864b01
d209dd6
04fca0d
0301e9a
0367f0f
994d125
f3a0af4
2140c28
77a0325
ecbfcf3
f507f91
b510b17
d03c809
6471b15
b8b3c3a
71fa551
e0da35d
f7bc427
0fca96c
04fc03e
dda7b9c
a35c9bc
9862b4b
ad1cce6
15c3f4e
eca2f45
5a756bb
9b289cb
fff16ed
918e31a
82fb6e8
98e17eb
9ab1008
2a6c9d2
8a72ac0
73279d7
181fb74
104ec1b
cf17cae
08a3663
764eca0
34f3a77
5cde974
573dd90
efdca4a
45df5d9
9bf3808
db47529
9e5fddc
7aae31d
6656af3
1a49c95
d30aa5b
a525afe
64750ec
f3fc604
2d2ef71
8b422c7
73b47b1
f97cfee
18e562a
2d1312e
a0ab86b
d4c7db1
32e34dc
187858f
bbc8cf5
4647033
a201c3f
198c1d2
0dbe4a9
d996baa
47a56d8
22521bf
6b77952
598c36a
72dc105
2d291b6
7673b52
7bdc243
ee7de21
58d154a
a00c6a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,17 +6,21 @@ | |
|
||
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream | ||
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker | ||
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime | ||
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream | ||
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder | ||
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder | ||
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector | ||
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor | ||
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector | ||
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter | ||
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester | ||
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination | ||
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator | ||
from airbyte_cdk.sources.declarative.requesters.requester import Requester | ||
from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier | ||
from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier | ||
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever | ||
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever | ||
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema | ||
|
@@ -25,17 +29,22 @@ | |
from airbyte_cdk.sources.declarative.states.state import State | ||
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice | ||
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer | ||
from airbyte_cdk.sources.streams.core import Stream | ||
|
||
DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = { | ||
Requester: HttpRequester, | ||
Retriever: SimpleRetriever, | ||
SchemaLoader: JsonSchema, | ||
HttpSelector: RecordSelector, | ||
ConnectionChecker: CheckStream, | ||
Retrier: DefaultRetrier, | ||
ErrorHandler: DefaultErrorHandler, | ||
Decoder: JsonDecoder, | ||
JelloExtractor: JelloExtractor, | ||
State: DictState, | ||
StreamSlicer: SingleSlice, | ||
Paginator: NoPagination, | ||
HttpResponseFilter: HttpResponseFilter, | ||
Stream: DeclarativeStream, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no more |
||
MinMaxDatetime: MinMaxDatetime, | ||
InterpolatedString: InterpolatedString, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,7 +93,21 @@ def preprocess(self, value, evaluated_config, path): | |
elif isinstance(value, dict): | ||
return self.preprocess_dict(value, evaluated_config, path) | ||
elif type(value) == list: | ||
evaluated_list = [self.preprocess(v, evaluated_config, path) for v in value] | ||
evaluated_list = [ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we were not correctly adding a list's elements to the evaluated_config |
||
# pass in elem's path instead of the list's path | ||
self.preprocess(v, evaluated_config, self._get_path_for_list_item(path, index)) | ||
for index, v in enumerate(value) | ||
] | ||
# Add the list's element to the evaluated config so they can be referenced | ||
for index, elem in enumerate(evaluated_list): | ||
evaluated_config[self._get_path_for_list_item(path, index)] = elem | ||
return evaluated_list | ||
else: | ||
return value | ||
|
||
def _get_path_for_list_item(self, path, index): | ||
# An elem's path is {path_to_list}[{index}] | ||
if len(path) > 1: | ||
return path[:-1], f"{path[-1]}[{index}]" | ||
else: | ||
return (f"{path[-1]}[{index}]",) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from typing import Optional | ||
|
||
import requests | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy | ||
|
||
|
||
class ConstantBackoffStrategy(BackoffStrategy): | ||
""" | ||
Backoff strategy with a constant backoff interval | ||
""" | ||
|
||
def __init__(self, backoff_time_in_seconds: float): | ||
""" | ||
:param backoff_time_in_seconds: time to backoff before retrying a retryable request | ||
""" | ||
self._backoff_time_in_seconds = backoff_time_in_seconds | ||
|
||
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: | ||
return self._backoff_time_in_seconds |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from typing import Optional | ||
|
||
import requests | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy | ||
|
||
|
||
class ExponentialBackoffStrategy(BackoffStrategy): | ||
""" | ||
Backoff strategy with an exponential backoff interval | ||
""" | ||
|
||
def __init__(self, factor: float = 5): | ||
""" | ||
:param factor: multiplicative factor | ||
""" | ||
self._factor = factor | ||
|
||
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: | ||
return self._factor * 2**attempt_count |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import numbers | ||
from re import Pattern | ||
from typing import Optional | ||
|
||
import requests | ||
|
||
|
||
def get_numeric_value_from_header(response: requests.Response, header: str, regex: Optional[Pattern]) -> Optional[float]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add unit tests for this guy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
""" | ||
Extract a header value from the response as a float | ||
:param response: response the extract header value from | ||
:param header: Header to extract | ||
:param regex: optional regex to apply on the header to obtain the value | ||
:return: header value as float if it's a number. None otherwise | ||
""" | ||
header_value = response.headers.get(header, None) | ||
if not header_value: | ||
return None | ||
if isinstance(header_value, str): | ||
if regex: | ||
match = regex.match(header_value) | ||
if match: | ||
header_value = match.group() | ||
return _as_float(header_value) | ||
elif isinstance(header_value, numbers.Number): | ||
return float(header_value) | ||
else: | ||
return None | ||
|
||
|
||
def _as_float(s: str) -> Optional[float]: | ||
try: | ||
return float(s) | ||
except ValueError: | ||
return None |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import re | ||
from typing import Optional | ||
|
||
import requests | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy | ||
|
||
|
||
class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy): | ||
""" | ||
Extract wait time from http header | ||
""" | ||
|
||
def __init__(self, header: str, regex: Optional[str] = None): | ||
""" | ||
:param header: header to read wait time from | ||
:param regex: optional regex to apply on the header to extract its value | ||
""" | ||
self._header = header | ||
self._regex = re.compile(regex) if regex else None | ||
|
||
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: | ||
header_value = get_numeric_value_from_header(response, self._header, self._regex) | ||
return header_value |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import numbers | ||
import re | ||
import time | ||
from typing import Optional | ||
|
||
import requests | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header | ||
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy | ||
|
||
|
||
class WaitUntilTimeFromHeaderBackoffStrategy(BackoffStrategy): | ||
""" | ||
Extract time at which we can retry the request from response header | ||
and wait for the difference between now and that time | ||
""" | ||
|
||
def __init__(self, header: str, min_wait: Optional[float] = None, regex: Optional[str] = None): | ||
""" | ||
|
||
:param header: header to read wait time from | ||
:param min_wait: minimum time to wait for safety | ||
:param regex: optional regex to apply on the header to extract its value | ||
""" | ||
self._header = header | ||
self._min_wait = min_wait | ||
self._regex = re.compile(regex) if regex else None | ||
|
||
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: | ||
now = time.time() | ||
wait_until = get_numeric_value_from_header(response, self._header, self._regex) | ||
if wait_until is None or not wait_until: | ||
return self._min_wait | ||
if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance(wait_until, numbers.Number): | ||
wait_time = float(wait_until) - now | ||
else: | ||
return self._min_wait | ||
if self._min_wait: | ||
return max(wait_time, self._min_wait) | ||
elif wait_time < 0: | ||
return None | ||
return wait_time |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from abc import abstractmethod | ||
from typing import Optional | ||
|
||
import requests | ||
|
||
|
||
class BackoffStrategy: | ||
@abstractmethod | ||
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: | ||
""" | ||
Return time to wait before retrying the request. | ||
:param response: response received for the request to retry | ||
:param attempt_count: number of attempts to submit the request | ||
:return: time to wait in seconds | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we'll be able to get rid of "compile-time" interpolation" once we get the interpolated authenticators in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean compile-time interpolation? I think i don't understand the subtlety here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm referring to the strings evaluated by jinja, eg "{{ 1 + 1 }}"
they can either be evaluated when parsing the yaml file and instantiating the objects, or at runtime on every request/response.
we generally want the strings to be evaluated at runtime because they can depend on the requests/response/records/etc, but right now we also need to try evaluating them during the parsing step because the authenticators use normal strings, not InterpolatedString.
That won't be the case once #13993 is merged and we'll be able to delete these lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!