From 09aa685aadbb6dc45f66b2c87d4c13af9b88e6b2 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 14 Jul 2022 08:24:37 -0700 Subject: [PATCH] Alex/configurable retrier (#14330) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * checkout files from test branch * read_incremental works * reset to master * remove dead code * comment * fix * Add test * comments * utc * format * small fix * Add test with rfc3339 * remove unused param * fix test * configurable state checkpointing * update test * start working on retrier * retry predicate * return response status * look in error message * cleanup test * constant backoff strategy * chain backoff strategy * chain retrier * Add to class types registry * extract backoff time from header * wait until * update * split file * parse_records * classmethod * delete dead code * comment * comment * comments * fix * test for instantiating chain retrier * fix parsing * cleanup * fix * reset * never raise on http error * remove print * comment * comment * comment * comment * remove prints * add declarative stream to registry * Delete dead code * Add docstrings * quick fix * exponential backoff * fix test * fix * delete unused properties * fix * missing unit tests * uppercase * docstrings * rename to success * compare full request instead of just url * renmae module * rename test file * rename interface * rename default retrier * rename to compositeerrorhandler * fix missing renames * move action to filter * str -> minmaxdatetime * small fixes * plural * add example * handle header variations * also fix wait time from * allow using a regex to extract the value * group() * docstring * add docs * update comment * docstrings * update comment * Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py Co-authored-by: Sherif A. Nada * version: Update Parquet library to latest release (#14502) The upstream Parquet library that is currently pinned for use in the S3 destination plugin is over a year old. The current version is generating invalid schemas for date-time with time-zone fields which appears to be addressed in the `1.12.3` release of the library in commit https://github.com/apache/parquet-mr/commit/c72862b61399ff516e968fbd02885e573d4be81c * merge * 🎉 Source Github: improve schema for stream `pull_request_commits` added "null" (#14613) Signed-off-by: Sergey Chvalyuk * Docs: Fixed broken links (#14622) * fixing broken links * more broken links * source-hubspot: change mentioning of Mailchimp into HubSpot doc (#14620) * Helm Chart: Add external temporal option (#14597) * conflict env configmap and chart lock * reverting lock * add eof lines and documentation on values yaml * conflict json file * rollback json * solve conflict * correct minio with new version Co-authored-by: Guy Feldman * 🎉 Add YAML format to source-file reader (#14588) * Add yaml reader * Update docs * Bumpversion of connector * bump docs * Update pyarrow dependency * Upgrade pandas dependency * auto-bump connector version Co-authored-by: Octavia Squidington III * :tada: Source Okta: add GroupMembers stream (#14380) * add Group_Members stream to okta source - Group_Members return a list of users, the same schema of Users stream. - Create a shared schema users, and both group_members and users sechema use it as a reference. - Add Group_Members stream to source connector * add tests and fix logs schema - fix the test error: None is not one of enums though the enum type includes both string and null, it comes from json schema validator https://github.com/python-jsonschema/jsonschema/blob/ddb87afad8f5d5c40600b5ede0ab96e4d4bdf7d3/jsonschema/_validators.py#L279-L285 - change grouop_members to use id as the cursor field since `filter` is not supported in the query string - fix the abnormal state test on logs stream, when since is abnormally large, until has to defined, an equal or a larger value - remove logs stream from full sync test, because 2 full sync always has a gap -- at least a new log about users or groups api. * last polish before submit the PR - bump docker version - update changelog - add the right abnormal value for logs stream - correct the sample catalog * address comments:: - improve comments for until parameter under the logs stream - add use_cache on groupMembers * add use_cache to Group_Members * change configured_catalog to test * auto-bump connector version Co-authored-by: marcosmarxm Co-authored-by: Octavia Squidington III * split test files * renames * missing unit test * add missing unit tests * rename * assert isinstance * start extracting to their own files * use final instead of classmethod * assert we retry 429 errors * Add log * replace asserts with valueexceptions * delete superfluous print statement * fix factory so we don't need to union everything with strings * get class_name from type * remove from class types registry * process error handlers one at a time * sort * delete print statement * comment * comment * format * delete unused file Co-authored-by: Sherif A. Nada Co-authored-by: Tobias Macey Co-authored-by: Serhii Chvaliuk Co-authored-by: Amruta Ranade <11484018+Amruta-Ranade@users.noreply.github.com> Co-authored-by: Bas Beelen Co-authored-by: Marcos Marx Co-authored-by: Guy Feldman Co-authored-by: Christophe Duong Co-authored-by: Octavia Squidington III Co-authored-by: Yiyang Li Co-authored-by: marcosmarxm --- .../sources/declarative/create_partial.py | 2 +- .../declarative/extractors/record_filter.py | 4 +- .../parsers/class_types_registry.py | 22 ++- .../default_implementation_registry.py | 15 +- .../sources/declarative/parsers/factory.py | 32 +++- .../declarative/parsers/yaml_parser.py | 16 +- .../requesters/error_handlers/__init__.py | 3 + .../backoff_strategies/__init__.py | 3 + .../constant_backoff_strategy.py | 23 +++ .../exponential_backoff_strategy.py | 23 +++ .../backoff_strategies/header_helper.py | 39 +++++ .../wait_time_from_header_backoff_strategy.py | 28 ++++ ...until_time_from_header_backoff_strategy.py | 45 +++++ .../error_handlers/backoff_strategy.py | 19 +++ .../error_handlers/composite_error_handler.py | 55 ++++++ .../error_handlers/default_error_handler.py | 137 +++++++++++++++ .../error_handlers/error_handler.py | 28 ++++ .../error_handlers/http_response_filter.py | 55 ++++++ .../error_handlers/response_action.py | 16 ++ .../error_handlers/response_status.py | 50 ++++++ .../declarative/requesters/http_requester.py | 42 ++--- .../requesters/paginators/offset_paginator.py | 2 +- .../declarative/requesters/requester.py | 36 +--- .../requesters/retriers/__init__.py | 3 - .../requesters/retriers/default_retrier.py | 28 ---- .../requesters/retriers/retrier.py | 28 ---- .../retrievers/simple_retriever.py | 41 +++-- .../sources/declarative/schema/json_schema.py | 13 +- .../stream_slicers/datetime_stream_slicer.py | 10 +- .../airbyte_cdk/sources/streams/http/http.py | 3 +- .../declarative/parsers/test_yaml_parser.py | 12 ++ .../requesters/error_handlers/__init__.py | 3 + .../backoff_strategies/__init__.py | 3 + .../test_constant_backoff.py | 24 +++ .../test_exponential_backoff.py | 24 +++ .../backoff_strategies/test_header_helper.py | 38 +++++ .../test_wait_time_from_header.py | 31 ++++ .../test_wait_until_time_from_header.py | 36 ++++ .../test_composite_error_handler.py | 110 ++++++++++++ .../test_default_error_handler.py | 157 ++++++++++++++++++ .../error_handlers/test_response_status.py | 32 ++++ .../requesters/test_http_requester.py | 19 +-- .../retrievers/test_simple_retriever.py | 92 ++++++++-- .../test_datetime_stream_slicer.py | 1 + .../sources/declarative/test_factory.py | 84 ++++++++-- 45 files changed, 1274 insertions(+), 213 deletions(-) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/__init__.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/__init__.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/header_helper.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_until_time_from_header_backoff_strategy.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_action.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py delete mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/__init__.py delete mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/default_retrier.py delete mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/retrier.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/__init__.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/__init__.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_header_helper.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_until_time_from_header.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_response_status.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py index 238c57294cbf58..25d45f0389e222 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/create_partial.py @@ -41,7 +41,7 @@ def newfunc(*fargs, **fkeywords): # interpolate the parameters interpolated_keywords = InterpolatedMapping(fully_created, interpolation).eval(config, **{"options": options}) - interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v is not None} + interpolated_keywords = {k: v for k, v in interpolated_keywords.items() if v} all_keywords.update(interpolated_keywords) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py index 8351bd2c03ef70..22a3862b2a5f38 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -5,11 +5,11 @@ from typing import Any, List, Mapping from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean -from airbyte_cdk.sources.declarative.types import Record +from airbyte_cdk.sources.declarative.types import Config, Record class RecordFilter: - def __init__(self, config, condition: str = None): + def __init__(self, config: Config, condition: str = None): self._config = config self._filter_interpolator = InterpolatedBoolean(condition) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py index 90a25a16ab6ef1..6865d1831459c0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py @@ -5,19 +5,39 @@ from typing import Mapping, Type from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import ConstantBackoffStrategy +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.exponential_backoff_strategy import ( + ExponentialBackoffStrategy, +) +from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler +from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester from airbyte_cdk.sources.declarative.requesters.paginators.interpolated_paginator import InterpolatedPaginator from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator from airbyte_cdk.sources.declarative.requesters.paginators.offset_paginator import OffsetPaginator +from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer from airbyte_cdk.sources.declarative.transformations import RemoveFields from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator CLASS_TYPES_REGISTRY: Mapping[str, Type] = { + "CartesianProductStreamSlicer": CartesianProductStreamSlicer, + "CompositeErrorHandler": CompositeErrorHandler, + "ConstantBackoffStrategy": ConstantBackoffStrategy, "DatetimeStreamSlicer": DatetimeStreamSlicer, + "DeclarativeStream": DeclarativeStream, + "DefaultErrorHandler": DefaultErrorHandler, + "ExponentialBackoffStrategy": ExponentialBackoffStrategy, + "HttpRequester": HttpRequester, "InterpolatedPaginator": InterpolatedPaginator, + "JelloExtractor": JelloExtractor, + "ListStreamSlicer": ListStreamSlicer, "MinMaxDatetime": MinMaxDatetime, "NextPageUrlPaginator": NextPageUrlPaginator, "OffsetPaginator": OffsetPaginator, - "TokenAuthenticator": TokenAuthenticator, "RemoveFields": RemoveFields, + "TokenAuthenticator": TokenAuthenticator, } diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py index acb1ab0eff51bb..7f2e53a3349242 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py @@ -6,17 +6,21 @@ from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker +from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.requester import Requester -from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier -from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema @@ -25,6 +29,7 @@ from airbyte_cdk.sources.declarative.states.state import State from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.streams.core import Stream DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = { Requester: HttpRequester, @@ -32,10 +37,14 @@ SchemaLoader: JsonSchema, HttpSelector: RecordSelector, ConnectionChecker: CheckStream, - Retrier: DefaultRetrier, + ErrorHandler: DefaultErrorHandler, Decoder: JsonDecoder, JelloExtractor: JelloExtractor, State: DictState, StreamSlicer: SingleSlice, Paginator: NoPagination, + HttpResponseFilter: HttpResponseFilter, + Stream: DeclarativeStream, + MinMaxDatetime: MinMaxDatetime, + InterpolatedString: InterpolatedString, } diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py index 1ca2eef42d3c63..50dd5c26ba9c50 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py @@ -27,7 +27,12 @@ def create_component(self, component_definition: Mapping[str, Any], config: Conf :return: the object to create """ kwargs = copy.deepcopy(component_definition) - class_name = kwargs.pop("class_name") + if "class_name" in kwargs: + class_name = kwargs.pop("class_name") + elif "type" in kwargs: + class_name = CLASS_TYPES_REGISTRY[kwargs.pop("type")] + else: + raise ValueError(f"Failed to create component because it has no class_name or type. Definition: {component_definition}") return self.build(class_name, config, **kwargs) def build(self, class_or_class_name: Union[str, Type], config, **kwargs): @@ -92,7 +97,13 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class): for sub in definition ] else: - return definition + expected_type = self.get_default_type(key, parent_class) + if expected_type and not isinstance(definition, expected_type): + # call __init__(definition) if definition is not a dict and is not of the expected type + # for instance, to turn a string into an InterpolatedString + return expected_type(definition) + else: + return definition @staticmethod def is_object_definition_with_class_name(definition): @@ -106,13 +117,16 @@ def is_object_definition_with_type(definition): def get_default_type(parameter_name, parent_class): type_hints = get_type_hints(parent_class.__init__) interface = type_hints.get(parameter_name) - origin = get_origin(interface) - if origin == Union: - # Handling Optional, which are implement as a Union[T, None] - # the interface we're looking for being the first type argument - args = get_args(interface) - interface = args[0] - + while True: + origin = get_origin(interface) + if origin: + # Unnest types until we reach the raw type + # List[T] -> T + # Optional[List[T]] -> T + args = get_args(interface) + interface = args[0] + else: + break expected_type = DEFAULT_IMPLEMENTATIONS_REGISTRY.get(interface) return expected_type diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py index 27213faa25a027..e404378c969a39 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py @@ -93,7 +93,21 @@ def preprocess(self, value, evaluated_config, path): elif isinstance(value, dict): return self.preprocess_dict(value, evaluated_config, path) elif type(value) == list: - evaluated_list = [self.preprocess(v, evaluated_config, path) for v in value] + evaluated_list = [ + # pass in elem's path instead of the list's path + self.preprocess(v, evaluated_config, self._get_path_for_list_item(path, index)) + for index, v in enumerate(value) + ] + # Add the list's element to the evaluated config so they can be referenced + for index, elem in enumerate(evaluated_list): + evaluated_config[self._get_path_for_list_item(path, index)] = elem return evaluated_list else: return value + + def _get_path_for_list_item(self, path, index): + # An elem's path is {path_to_list}[{index}] + if len(path) > 1: + return path[:-1], f"{path[-1]}[{index}]" + else: + return (f"{path[-1]}[{index}]",) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/__init__.py new file mode 100644 index 00000000000000..1100c1c58cf510 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/__init__.py new file mode 100644 index 00000000000000..1100c1c58cf510 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py new file mode 100644 index 00000000000000..d100dff0d2e4d2 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/constant_backoff_strategy.py @@ -0,0 +1,23 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Optional + +import requests +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy + + +class ConstantBackoffStrategy(BackoffStrategy): + """ + Backoff strategy with a constant backoff interval + """ + + def __init__(self, backoff_time_in_seconds: float): + """ + :param backoff_time_in_seconds: time to backoff before retrying a retryable request + """ + self._backoff_time_in_seconds = backoff_time_in_seconds + + def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: + return self._backoff_time_in_seconds diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py new file mode 100644 index 00000000000000..71c24ff3ea525b --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/exponential_backoff_strategy.py @@ -0,0 +1,23 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Optional + +import requests +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy + + +class ExponentialBackoffStrategy(BackoffStrategy): + """ + Backoff strategy with an exponential backoff interval + """ + + def __init__(self, factor: float = 5): + """ + :param factor: multiplicative factor + """ + self._factor = factor + + def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: + return self._factor * 2**attempt_count diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/header_helper.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/header_helper.py new file mode 100644 index 00000000000000..f3a17b73885093 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/header_helper.py @@ -0,0 +1,39 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import numbers +from re import Pattern +from typing import Optional + +import requests + + +def get_numeric_value_from_header(response: requests.Response, header: str, regex: Optional[Pattern]) -> Optional[float]: + """ + Extract a header value from the response as a float + :param response: response the extract header value from + :param header: Header to extract + :param regex: optional regex to apply on the header to obtain the value + :return: header value as float if it's a number. None otherwise + """ + header_value = response.headers.get(header, None) + if not header_value: + return None + if isinstance(header_value, str): + if regex: + match = regex.match(header_value) + if match: + header_value = match.group() + return _as_float(header_value) + elif isinstance(header_value, numbers.Number): + return float(header_value) + else: + return None + + +def _as_float(s: str) -> Optional[float]: + try: + return float(s) + except ValueError: + return None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py new file mode 100644 index 00000000000000..7a30053554c667 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_time_from_header_backoff_strategy.py @@ -0,0 +1,28 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import re +from typing import Optional + +import requests +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy + + +class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy): + """ + Extract wait time from http header + """ + + def __init__(self, header: str, regex: Optional[str] = None): + """ + :param header: header to read wait time from + :param regex: optional regex to apply on the header to extract its value + """ + self._header = header + self._regex = re.compile(regex) if regex else None + + def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: + header_value = get_numeric_value_from_header(response, self._header, self._regex) + return header_value diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_until_time_from_header_backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_until_time_from_header_backoff_strategy.py new file mode 100644 index 00000000000000..a406bd5d058327 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategies/wait_until_time_from_header_backoff_strategy.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import numbers +import re +import time +from typing import Optional + +import requests +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy + + +class WaitUntilTimeFromHeaderBackoffStrategy(BackoffStrategy): + """ + Extract time at which we can retry the request from response header + and wait for the difference between now and that time + """ + + def __init__(self, header: str, min_wait: Optional[float] = None, regex: Optional[str] = None): + """ + + :param header: header to read wait time from + :param min_wait: minimum time to wait for safety + :param regex: optional regex to apply on the header to extract its value + """ + self._header = header + self._min_wait = min_wait + self._regex = re.compile(regex) if regex else None + + def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: + now = time.time() + wait_until = get_numeric_value_from_header(response, self._header, self._regex) + if wait_until is None or not wait_until: + return self._min_wait + if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance(wait_until, numbers.Number): + wait_time = float(wait_until) - now + else: + return self._min_wait + if self._min_wait: + return max(wait_time, self._min_wait) + elif wait_time < 0: + return None + return wait_time diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py new file mode 100644 index 00000000000000..0d4313a7519b07 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/backoff_strategy.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from abc import abstractmethod +from typing import Optional + +import requests + + +class BackoffStrategy: + @abstractmethod + def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]: + """ + Return time to wait before retrying the request. + :param response: response received for the request to retry + :param attempt_count: number of attempts to submit the request + :return: time to wait in seconds + """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py new file mode 100644 index 00000000000000..c6b966b3d704c4 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -0,0 +1,55 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import List, Union + +import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status +import requests +from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus + + +class CompositeErrorHandler(ErrorHandler): + """ + Sample config chaining 2 different retriers: + error_handler: + type: "CompositeErrorHandler" + error_handlers: + - response_filters: + - predicate: "{{ 'codase' in decoded_response }}" + action: RETRY + backoff_strategies: + - type: "ConstantBackoffStrategy" + backoff_time_in_seconds: 5 + - response_filters: + - http_codes: [ 403 ] + action: RETRY + backoff_strategies: + - type: "ConstantBackoffStrategy" + backoff_time_in_seconds: 10 + """ + + def __init__(self, error_handlers: List[ErrorHandler]): + """ + + :param error_handlers: list of error handlers + """ + self._error_handlers = error_handlers + if not self._error_handlers: + raise ValueError("CompositeErrorHandler expects at least 1 underlying error handler") + + @property + def max_retries(self) -> Union[int, None]: + return self._error_handlers[0].max_retries + + def should_retry(self, response: requests.Response) -> ResponseStatus: + should_retry = None + for retrier in self._error_handlers: + should_retry = retrier.should_retry(response) + if should_retry.action == ResponseAction.SUCCESS: + return response_status.SUCCESS + if should_retry == response_status.IGNORE or should_retry.action == ResponseAction.RETRY: + return should_retry + return should_retry diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py new file mode 100644 index 00000000000000..fb8614797847a5 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py @@ -0,0 +1,137 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import List, MutableMapping, Optional, Union + +import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status +import requests +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.exponential_backoff_strategy import ( + ExponentialBackoffStrategy, +) +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy +from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus + + +class DefaultErrorHandler(ErrorHandler): + """ + Default error handler. + By default, the handler will only retry server errors (HTTP 5XX) and too many requests (HTTP 429) with exponential backoff. + + If the response is successful, then return SUCCESS + Otherwise, iterate over the response_filters. + If any of the filter match the response, then return the appropriate status. + If the match is RETRY, then iterate sequentially over the backoff_strategies and return the first non-None backoff time. + + Sample configs: + + 1. retry 10 times + ` + error_handler: + max_retries: 10 + ` + 2. backoff for 5 seconds + ` + error_handler: + backoff_strategies: + - type: "ConstantBackoffStrategy" + backoff_time_in_seconds: 5 + ` + 3. retry on HTTP 404 + ` + error_handler: + response_filters: + - http_codes: [ 404 ] + action: RETRY + ` + 4. ignore HTTP 404 + ` + error_handler: + - http_codes: [ 404 ] + action: IGNORE + ` + 5. retry if error message contains `retrythisrequest!` substring + ` + error_handler: + response_filters: + - error_message_contain: "retrythisrequest!" + action: IGNORE + ` + 6. retry if 'code' is a field present in the response body + ` + error_handler: + response_filters: + - predicate: "{{ 'code' in decoded_response }}" + action: IGNORE + ` + + 7. ignore 429 and retry on 404 + ` + error_handler: + - http_codes: [ 429 ] + action: IGNORE + - http_codes: [ 404 ] + action: RETRY + ` + """ + + DEFAULT_BACKOFF_STRATEGY = ExponentialBackoffStrategy + + def __init__( + self, + response_filters: Optional[List[HttpResponseFilter]] = None, + max_retries: Optional[int] = 5, + backoff_strategies: Optional[List[BackoffStrategy]] = None, + ): + """ + :param response_filters: response filters to iterate on + :param max_retries: maximum retry attemps + :param backoff_strategies: list of backoff strategies to use to determine how long to wait before retrying + """ + self._max_retries = max_retries + self._response_filters = response_filters or [] + + if not response_filters: + self._response_filters.append(HttpResponseFilter(ResponseAction.RETRY, http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS)) + self._response_filters.append(HttpResponseFilter(ResponseAction.IGNORE)) + + if backoff_strategies: + self._backoff_strategies = backoff_strategies + else: + self._backoff_strategies = [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY()] + + self._last_request_to_attempt_count: MutableMapping[requests.PreparedRequest, int] = {} + + @property + def max_retries(self) -> Union[int, None]: + return self._max_retries + + def should_retry(self, response: requests.Response) -> ResponseStatus: + request = response.request + if response.ok: + return response_status.SUCCESS + if request not in self._last_request_to_attempt_count: + self._last_request_to_attempt_count = {request: 1} + else: + self._last_request_to_attempt_count[request] += 1 + + for response_filter in self._response_filters: + filter_action = response_filter.matches(response) + if filter_action is not None: + if filter_action == ResponseAction.RETRY: + return ResponseStatus(ResponseAction.RETRY, self._backoff_time(response, self._last_request_to_attempt_count[request])) + else: + return ResponseStatus(filter_action) + # Fail if the response matches no filters + return response_status.FAIL + + def _backoff_time(self, response: requests.Response, attempt_count: int) -> Optional[float]: + backoff = None + for backoff_strategies in self._backoff_strategies: + backoff = backoff_strategies.backoff(response, attempt_count) + if backoff: + return backoff + return backoff diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py new file mode 100644 index 00000000000000..de2db2f53981dd --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py @@ -0,0 +1,28 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from abc import ABC, abstractmethod +from typing import Union + +import requests +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus + + +class ErrorHandler(ABC): + @property + @abstractmethod + def max_retries(self) -> Union[int, None]: + """ + Specifies maximum amount of retries for backoff policy. Return None for no limit. + """ + pass + + @abstractmethod + def should_retry(self, response: requests.Response) -> ResponseStatus: + """ + Evaluate response status describing whether a failing request should be retried or ignored. + :param response: response to evaluate + :return: response status + """ + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py new file mode 100644 index 00000000000000..aaa325e1e307ff --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py @@ -0,0 +1,55 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Optional, Set, Union + +import requests +from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction +from airbyte_cdk.sources.streams.http.http import HttpStream + + +class HttpResponseFilter: + TOO_MANY_REQUESTS_ERRORS = {429} + DEFAULT_RETRIABLE_ERRORS = set([x for x in range(500, 600)]).union(TOO_MANY_REQUESTS_ERRORS) + + def __init__( + self, action: Union[ResponseAction, str], *, http_codes: Set[int] = None, error_message_contain: str = None, predicate: str = "" + ): + """ + + :param action: action to execute if a request matches + :param http_codes: http code of matching requests + :param error_message_contain: error substring of matching requests + :param predicate: predicate to apply to determine if a request is matching + """ + if isinstance(action, str): + action = ResponseAction[action] + self._http_codes = http_codes or set() + self._predicate = InterpolatedBoolean(predicate) + self._error_message_contains = error_message_contain + self._action = action + + @property + def action(self): + return self._action + + def matches(self, response: requests.Response) -> Optional[ResponseAction]: + if ( + response.status_code in self._http_codes + or (self._response_matches_predicate(response)) + or (self._response_contains_error_message(response)) + ): + return self._action + else: + return None + + def _response_matches_predicate(self, response: requests.Response) -> bool: + return self._predicate and self._predicate.eval(None, decoded_response=response.json()) + + def _response_contains_error_message(self, response: requests.Response) -> bool: + if not self._error_message_contains: + return False + else: + return self._error_message_contains in HttpStream.parse_response_error_message(response) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_action.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_action.py new file mode 100644 index 00000000000000..2a5fd7e00c5ee1 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_action.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from enum import Enum + + +class ResponseAction(Enum): + """ + Response statuses for non retriable responses + """ + + SUCCESS = "SUCCESS" # "Request was successful" + FAIL = "FAIL" # "Request failed unexpectedly" + IGNORE = "IGNORE" # "Request failed but can be ignored" + RETRY = "RETRY" # Request failed and should be retried diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py new file mode 100644 index 00000000000000..e84ca581f1973e --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py @@ -0,0 +1,50 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Final, Optional, Union + +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction + + +class ResponseStatus: + """ + ResponseAction amended with backoff time if a action is RETRY + """ + + def __init__(self, response_action: Union[ResponseAction, str], retry_in: Optional[float] = None): + """ + :param response_action: response action to execute + :param retry_in: backoff time (if action is RETRY) + """ + if isinstance(response_action, str): + response_action = ResponseAction[response_action] + if retry_in and response_action != ResponseAction.RETRY: + raise ValueError(f"Unexpected backoff time ({retry_in} for non-retryable response action {response_action}") + self._retry_in = retry_in + self._action = response_action + + @property + def action(self): + return self._action + + @property + def retry_in(self) -> Optional[float]: + return self._retry_in + + @classmethod + def retry(cls, retry_in: Optional[float]): + return ResponseStatus(ResponseAction.RETRY, retry_in) + + def __eq__(self, other): + if not other: + return not self + return self.action == other.action and self.retry_in == other.retry_in + + def __hash__(self): + return hash([self.action, self.retry_in]) + + +SUCCESS: Final[ResponseStatus] = ResponseStatus(ResponseAction.SUCCESS) +FAIL: Final[ResponseStatus] = ResponseStatus(ResponseAction.FAIL) +IGNORE: Final[ResponseStatus] = ResponseStatus(ResponseAction.IGNORE) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py index 9c20a5f73f8e78..df22c01439db69 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -2,17 +2,19 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from functools import lru_cache from typing import Any, Mapping, MutableMapping, Optional, Union import requests from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester -from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier -from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier from airbyte_cdk.sources.declarative.types import Config from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, NoAuth @@ -22,12 +24,12 @@ def __init__( self, *, name: str, - url_base: [str, InterpolatedString], - path: [str, InterpolatedString], + url_base: InterpolatedString, + path: InterpolatedString, http_method: Union[str, HttpMethod] = HttpMethod.GET, request_options_provider: Optional[RequestOptionsProvider] = None, authenticator: HttpAuthenticator = None, - retrier: Optional[Retrier] = None, + error_handler: Optional[ErrorHandler] = None, config: Config, ): if request_options_provider is None: @@ -36,17 +38,13 @@ def __init__( request_options_provider = InterpolatedRequestOptionsProvider(config=config, **request_options_provider) self._name = name self._authenticator = authenticator or NoAuth() - if type(url_base) == str: - url_base = InterpolatedString(url_base) self._url_base = url_base - if type(path) == str: - path = InterpolatedString(path) self._path: InterpolatedString = path if type(http_method) == str: http_method = HttpMethod[http_method] self._method = http_method self._request_options_provider = request_options_provider - self._retrier = retrier or DefaultRetrier() + self._error_handler = error_handler or DefaultErrorHandler() self._config = config def get_authenticator(self): @@ -63,24 +61,12 @@ def get_path(self, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str def get_method(self): return self._method - @property - def raise_on_http_errors(self) -> bool: - # TODO this should be declarative - return True - - @property - def max_retries(self) -> Union[int, None]: - return self._retrier.max_retries - - @property - def retry_factor(self) -> float: - return self._retrier.retry_factor - - def should_retry(self, response: requests.Response) -> bool: - return self._retrier.should_retry(response) - - def backoff_time(self, response: requests.Response) -> Optional[float]: - return self._retrier.backoff_time(response) + # use a tiny cache to limit the memory footprint. It doesn't have to be large because we mostly + # only care about the status of the last response received + @lru_cache(maxsize=10) + def should_retry(self, response: requests.Response) -> ResponseStatus: + # Cache the result because the HttpStream first checks if we should retry before looking at the backoff time + return self._error_handler.should_retry(response) def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py index 5c808e31deff8c..11ade15b8f33e8 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/offset_paginator.py @@ -17,7 +17,7 @@ def __init__(self, page_size: int, state: Optional[DictState] = None, offset_key self._update_state_with_offset(0) def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: - if len(last_records) < self._limit: + if not last_records or len(last_records) < self._limit: return None offset = self._get_offset() + self._limit token_map = {self._offsetKey: offset} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py index 7ce5f3aeeb815a..77620d9b6c23f1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py @@ -7,6 +7,7 @@ from typing import Any, Mapping, MutableMapping, Optional, Union import requests +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus from requests.auth import AuthBase @@ -54,29 +55,8 @@ def request_params( E.g: you might want to define query parameters for paging if next_page_token is not None. """ - @property - @abstractmethod - def raise_on_http_errors(self) -> bool: - """ - If set to False, allows opting-out of raising HTTP code exception. - """ - - @property - @abstractmethod - def max_retries(self) -> Union[int, None]: - """ - Specifies maximum amount of retries for backoff policy. Return None for no limit. - """ - - @property @abstractmethod - def retry_factor(self) -> float: - """ - Specifies factor for backoff policy. - """ - - @abstractmethod - def should_retry(self, response: requests.Response) -> bool: + def should_retry(self, response: requests.Response) -> ResponseStatus: """ Specifies conditions for backoff based on the response from the server. @@ -87,18 +67,6 @@ def should_retry(self, response: requests.Response) -> bool: Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default. """ - @abstractmethod - def backoff_time(self, response: requests.Response) -> Optional[float]: - """ - Dynamically determine backoff time e.g: by reading the X-Retry-After header. - - This method is called only if should_backoff() returns True for the input request. - - :param response: - :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff - to the default backoff behavior (e.g using an exponential algorithm). - """ - @abstractmethod def request_headers( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/__init__.py deleted file mode 100644 index 46b7376756ec6e..00000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/default_retrier.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/default_retrier.py deleted file mode 100644 index e0d2f07c15d3f8..00000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/default_retrier.py +++ /dev/null @@ -1,28 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from typing import Optional, Union - -import requests -from airbyte_cdk.sources.declarative.requesters.retriers.retrier import Retrier - - -class DefaultRetrier(Retrier): - def __init__(self, max_retries: Optional[int] = 5, retry_factor: float = 5): - self._max_retries = max_retries - self._retry_factor = retry_factor - - @property - def max_retries(self) -> Union[int, None]: - return self._max_retries - - @property - def retry_factor(self) -> float: - return self._retry_factor - - def should_retry(self, response: requests.Response) -> bool: - return response.status_code == 429 or 500 <= response.status_code < 600 - - def backoff_time(self, response: requests.Response) -> Optional[float]: - return None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/retrier.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/retrier.py deleted file mode 100644 index 2aa42b757a887c..00000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/retriers/retrier.py +++ /dev/null @@ -1,28 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from abc import ABC, abstractmethod -from typing import Optional, Union - -import requests - - -class Retrier(ABC): - @property - @abstractmethod - def max_retries(self) -> Union[int, None]: - pass - - @property - @abstractmethod - def retry_factor(self) -> float: - pass - - @abstractmethod - def should_retry(self, response: requests.Response) -> bool: - pass - - @abstractmethod - def backoff_time(self, response: requests.Response) -> Optional[float]: - pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index fd82bac2624a63..ea4332214d3e6c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -7,6 +7,7 @@ import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.requester import Requester @@ -25,7 +26,7 @@ def __init__( primary_key, requester: Requester, record_selector: HttpSelector, - paginator: Paginator = None, + paginator: Optional[Paginator] = None, stream_slicer: Optional[StreamSlicer] = SingleSlice(), state: Optional[State] = None, ): @@ -57,24 +58,8 @@ def http_method(self) -> str: @property def raise_on_http_errors(self) -> bool: - """ - If set to False, allows opting-out of raising HTTP code exception. - """ - return self._requester.raise_on_http_errors - - @property - def max_retries(self) -> Union[int, None]: - """ - Specifies maximum amount of retries for backoff policy. Return None for no limit. - """ - return self._requester.max_retries - - @property - def retry_factor(self) -> float: - """ - Specifies factor to multiply the exponentiation by for backoff policy. - """ - return self._requester.retry_factor + # never raise on http_errors because this overrides the error handler logic... + return False def should_retry(self, response: requests.Response) -> bool: """ @@ -86,7 +71,7 @@ def should_retry(self, response: requests.Response) -> bool: Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default. """ - return self._requester.should_retry(response) + return self._requester.should_retry(response).action == ResponseAction.RETRY def backoff_time(self, response: requests.Response) -> Optional[float]: """ @@ -98,7 +83,11 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm). """ - return self._requester.backoff_time(response) + should_retry = self._requester.should_retry(response) + if should_retry.action != ResponseAction.RETRY: + raise ValueError(f"backoff_time can only be applied on retriable response action. Got {should_retry.action}") + assert should_retry.action == ResponseAction.RETRY + return should_retry.retry_in def request_headers( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None @@ -197,6 +186,16 @@ def parse_response( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> Iterable[Mapping]: + # if fail -> raise exception + # if ignore -> ignore response and return no records + # else -> delegate to record selector + response_status = self._requester.should_retry(response) + if response_status.action == ResponseAction.FAIL: + response.raise_for_status() + elif response_status.action == ResponseAction.IGNORE: + self.logger.info(f"Ignoring response for failed request with error message {HttpStream.parse_response_error_message(response)}") + return [] + # Warning: use self.state instead of the stream_state passed as argument! self._last_response = response records = self._record_selector.select_records( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py index 21dd9ba2153ca5..10e5321ec650a6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/schema/json_schema.py @@ -3,20 +3,23 @@ # import json -from typing import Any, Mapping, Union +from typing import Any, Mapping from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader class JsonSchema(SchemaLoader): - def __init__(self, file_path: Union[str, InterpolatedString], config, **kwargs): - if type(file_path) == str: - file_path = InterpolatedString(file_path) + def __init__(self, file_path: InterpolatedString, name: str, config, **kwargs): self._file_path = file_path self._config = config self._kwargs = kwargs + self._name = name def get_json_schema(self) -> Mapping[str, Any]: - with open(self._file_path.eval(self._config, **self._kwargs), "r") as f: + json_schema_path = self._get_json_filepath() + with open(json_schema_path, "r") as f: return json.loads(f.read()) + + def _get_json_filepath(self): + return self._file_path.eval(self._config, name=self._name, **self._kwargs) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index 28f37bfa96dfac..876b9293f5c183 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -4,7 +4,7 @@ import datetime import re -from typing import Any, Iterable, Mapping, Union +from typing import Any, Iterable, Mapping, Optional from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime @@ -32,17 +32,13 @@ def __init__( start_datetime: MinMaxDatetime, end_datetime: MinMaxDatetime, step: str, - cursor_value: Union[InterpolatedString, str], + cursor_value: InterpolatedString, datetime_format: str, config: Config, - lookback_window: Union[InterpolatedString, str] = None, + lookback_window: Optional[InterpolatedString] = None, ): self._timezone = datetime.timezone.utc self._interpolation = JinjaInterpolation() - if isinstance(cursor_value, str): - cursor_value = InterpolatedString(cursor_value) - if isinstance(lookback_window, str): - lookback_window = InterpolatedString(lookback_window) self._datetime_format = datetime_format self._start_datetime = start_datetime self._end_datetime = end_datetime diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 1570c9a25a752a..8bcba22f1573a4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -338,7 +338,8 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor) return backoff_handler(user_backoff_handler)(request, request_kwargs) - def parse_response_error_message(self, response: requests.Response) -> Optional[str]: + @classmethod + def parse_response_error_message(cls, response: requests.Response) -> Optional[str]: """ Parses the raw response object from a failed request into a user-friendly error message. By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently. diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_yaml_parser.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_yaml_parser.py index 82f1ad98c6c96d..64620d35cd96b5 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_yaml_parser.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_yaml_parser.py @@ -130,3 +130,15 @@ def test_collision(): assert config["reference_to_nested_path"] == "uh oh" assert config["example"]["nested"]["more_nested"]["value"] == "found it!" assert config["reference_to_nested_nested_value"] == "found it!" + + +def test_list(): + content = """ + list: + - "A" + - "B" + elem_ref: "*ref(list[0])" + """ + config = parser.parse(content) + elem_ref = config["elem_ref"] + assert elem_ref == "A" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/__init__.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/__init__.py new file mode 100644 index 00000000000000..1100c1c58cf510 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/__init__.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/__init__.py new file mode 100644 index 00000000000000..1100c1c58cf510 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py new file mode 100644 index 00000000000000..8ee5af1e57ff64 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_constant_backoff.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +import pytest +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import ConstantBackoffStrategy + +BACKOFF_TIME = 10 + + +@pytest.mark.parametrize( + "test_name, attempt_count, expected_backoff_time", + [ + ("test_exponential_backoff", 1, BACKOFF_TIME), + ("test_exponential_backoff", 2, BACKOFF_TIME), + ], +) +def test_exponential_backoff(test_name, attempt_count, expected_backoff_time): + response_mock = MagicMock() + backoff_strategy = ConstantBackoffStrategy(backoff_time_in_seconds=BACKOFF_TIME) + backoff = backoff_strategy.backoff(response_mock, attempt_count) + assert backoff == expected_backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py new file mode 100644 index 00000000000000..71518b09980d45 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_exponential_backoff.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +import pytest +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.exponential_backoff_strategy import ( + ExponentialBackoffStrategy, +) + + +@pytest.mark.parametrize( + "test_name, attempt_count, expected_backoff_time", + [ + ("test_exponential_backoff", 1, 10), + ("test_exponential_backoff", 2, 20), + ], +) +def test_exponential_backoff(test_name, attempt_count, expected_backoff_time): + response_mock = MagicMock() + backoff_strategy = ExponentialBackoffStrategy(factor=5) + backoff = backoff_strategy.backoff(response_mock, attempt_count) + assert backoff == expected_backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_header_helper.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_header_helper.py new file mode 100644 index 00000000000000..8142322118916f --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_header_helper.py @@ -0,0 +1,38 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import re +from unittest.mock import MagicMock + +import pytest +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header + + +@pytest.mark.parametrize( + "test_name, headers, requested_header, regex, expected_value", + [ + ("test_get_numeric_value_from_header", {"header": 1}, "header", None, 1), + ("test_get_numeric_value_float_from_header", {"header": 1.2}, "header", None, 1.2), + ("test_get_numeric_value_from_string_value", {"header": "10.9"}, "header", None, 10.9), + ("test_get_numeric_value_from_non_numeric", {"header": "60,120"}, "header", None, None), + ("test_get_numeric_value_from_missing_header", {"header": 1}, "notheader", None, None), + ("test_get_numeric_value_with_regex", {"header": "61,60"}, "header", re.compile("([-+]?\d+)"), 61), # noqa + ("test_get_numeric_value_with_regex_no_header", {"header": "61,60"}, "notheader", re.compile("([-+]?\d+)"), None), # noqa + ("test_get_numeric_value_with_regex_not_matching", {"header": "abc61,60"}, "header", re.compile("([-+]?\d+)"), None), # noqa + ], +) +def test_get_numeric_value_from_header(test_name, headers, requested_header, regex, expected_value): + response_mock = create_response(headers=headers) + numeric_value = get_numeric_value_from_header(response_mock, requested_header, regex) + assert numeric_value == expected_value + + +def create_response(headers=None, json_body=None): + url = "https://airbyte.io" + + response_mock = MagicMock() + response_mock.url = url + response_mock.headers = headers or {} + response_mock.json.return_value = json_body or {} + return response_mock diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py new file mode 100644 index 00000000000000..915a13762d282f --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py @@ -0,0 +1,31 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +import pytest +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.wait_time_from_header_backoff_strategy import ( + WaitTimeFromHeaderBackoffStrategy, +) + +SOME_BACKOFF_TIME = 60 + + +@pytest.mark.parametrize( + "test_name, header, header_value, regex, expected_backoff_time", + [ + ("test_wait_time_from_header", "wait_time", SOME_BACKOFF_TIME, None, SOME_BACKOFF_TIME), + ("test_wait_time_from_header_string", "wait_time", "60", None, SOME_BACKOFF_TIME), + ("test_wait_time_from_header_not_a_number", "wait_time", "61,60", None, None), + ("test_wait_time_from_header_with_regex", "wait_time", "61,60", "([-+]?\d+)", 61), # noqa + ("test_wait_time_fœrom_header_with_regex_no_match", "wait_time", "...", "[-+]?\d+", None), # noqa + ("test_wait_time_from_header", "absent_header", None, None, None), + ], +) +def test_wait_time_from_header(test_name, header, header_value, regex, expected_backoff_time): + response_mock = MagicMock() + response_mock.headers = {"wait_time": header_value} + backoff_stratery = WaitTimeFromHeaderBackoffStrategy(header, regex) + backoff = backoff_stratery.backoff(response_mock, 1) + assert backoff == expected_backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_until_time_from_header.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_until_time_from_header.py new file mode 100644 index 00000000000000..713ee4c4896fe1 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_until_time_from_header.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from unittest.mock import MagicMock, patch + +import pytest +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.wait_until_time_from_header_backoff_strategy import ( + WaitUntilTimeFromHeaderBackoffStrategy, +) + +SOME_BACKOFF_TIME = 60 + + +@pytest.mark.parametrize( + "test_name, header, wait_until, min_wait, regex, expected_backoff_time", + [ + ("test_wait_until_time_from_header", "wait_until", 1600000060.0, None, None, 60), + ("test_wait_until_negative_time", "wait_until", 1500000000.0, None, None, None), + ("test_wait_until_time_less_than_min", "wait_until", 1600000060.0, 120, None, 120), + ("test_wait_until_no_header", "absent_header", 1600000000.0, None, None, None), + ("test_wait_until_time_from_header_not_numeric", "wait_until", "1600000000,1600000000", None, None, None), + ("test_wait_until_time_from_header_is_numeric", "wait_until", "1600000060", None, None, 60), + ("test_wait_until_time_from_header_with_regex", "wait_until", "1600000060,60", None, "[-+]?\d+", 60), # noqa + ("test_wait_until_time_from_header_with_regex_no_match", "wait_time", "...", None, "[-+]?\d+", None), # noqa + ("test_wait_until_no_header_with_min", "absent_header", "1600000000.0", SOME_BACKOFF_TIME, None, SOME_BACKOFF_TIME), + ], +) +@patch("time.time", return_value=1600000000.0) +def test_wait_untiltime_from_header(time_mock, test_name, header, wait_until, min_wait, regex, expected_backoff_time): + response_mock = MagicMock() + response_mock.headers = {"wait_until": wait_until} + backoff_stratery = WaitUntilTimeFromHeaderBackoffStrategy(header, min_wait, regex) + backoff = backoff_stratery.backoff(response_mock, 1) + assert backoff == expected_backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py new file mode 100644 index 00000000000000..74fa5a30dc0208 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py @@ -0,0 +1,110 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status +import pytest +from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import ResponseStatus + +SOME_BACKOFF_TIME = 60 + + +@pytest.mark.parametrize( + "test_name, first_handler_behavior, second_handler_behavior, expected_behavior", + [ + ( + "test_chain_retrier_ok_ok", + response_status.SUCCESS, + response_status.SUCCESS, + response_status.SUCCESS, + ), + ( + "test_chain_retrier_ignore_fail", + response_status.IGNORE, + response_status.FAIL, + response_status.IGNORE, + ), + ( + "test_chain_retrier_fail_ignore", + response_status.FAIL, + response_status.IGNORE, + response_status.IGNORE, + ), + ( + "test_chain_retrier_ignore_retry", + response_status.IGNORE, + ResponseStatus.retry(SOME_BACKOFF_TIME), + response_status.IGNORE, + ), + ( + "test_chain_retrier_retry_ignore", + ResponseStatus.retry(SOME_BACKOFF_TIME), + response_status.IGNORE, + ResponseStatus.retry(SOME_BACKOFF_TIME), + ), + ( + "test_chain_retrier_retry_fail", + ResponseStatus.retry(SOME_BACKOFF_TIME), + response_status.FAIL, + ResponseStatus.retry(SOME_BACKOFF_TIME), + ), + ( + "test_chain_retrier_fail_retry", + response_status.FAIL, + ResponseStatus.retry(SOME_BACKOFF_TIME), + ResponseStatus.retry(SOME_BACKOFF_TIME), + ), + ( + "test_chain_retrier_ignore_ok", + response_status.IGNORE, + response_status.SUCCESS, + response_status.IGNORE, + ), + ( + "test_chain_retrier_ok_ignore", + response_status.SUCCESS, + response_status.IGNORE, + response_status.SUCCESS, + ), + ( + "test_chain_retrier_ok_retry", + response_status.SUCCESS, + ResponseStatus.retry(SOME_BACKOFF_TIME), + response_status.SUCCESS, + ), + ( + "test_chain_retrier_retry_ok", + ResponseStatus.retry(SOME_BACKOFF_TIME), + response_status.SUCCESS, + ResponseStatus.retry(SOME_BACKOFF_TIME), + ), + ( + "test_chain_retrier_return_first_retry", + ResponseStatus.retry(SOME_BACKOFF_TIME), + ResponseStatus.retry(2 * SOME_BACKOFF_TIME), + ResponseStatus.retry(SOME_BACKOFF_TIME), + ), + ], +) +def test_composite_error_handler(test_name, first_handler_behavior, second_handler_behavior, expected_behavior): + first_error_handler = MagicMock() + first_error_handler.should_retry.return_value = first_handler_behavior + second_error_handler = MagicMock() + second_error_handler.should_retry.return_value = second_handler_behavior + second_error_handler.should_retry.return_value = second_handler_behavior + retriers = [first_error_handler, second_error_handler] + retrier = CompositeErrorHandler(retriers) + response_mock = MagicMock() + response_mock.ok = first_handler_behavior == response_status.SUCCESS or second_handler_behavior == response_status.SUCCESS + assert retrier.should_retry(response_mock) == expected_behavior + + +def test_composite_error_handler_no_handlers(): + try: + CompositeErrorHandler([]) + assert False + except ValueError: + pass diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py new file mode 100644 index 00000000000000..e15d945e3542cf --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py @@ -0,0 +1,157 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from http import HTTPStatus +from unittest.mock import MagicMock + +import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status +import pytest +from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import ConstantBackoffStrategy +from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import ( + DefaultErrorHandler, + HttpResponseFilter, + ResponseStatus, +) +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction + +SOME_BACKOFF_TIME = 60 + + +@pytest.mark.parametrize( + "test_name, http_code, retry_response_filter, ignore_response_filter, response_headers, should_retry, backoff_strategy", + [ + ("test_bad_gateway", HTTPStatus.BAD_GATEWAY, None, None, {}, ResponseStatus.retry(10), None), + ("test_too_many_requests", HTTPStatus.TOO_MANY_REQUESTS, None, None, {}, ResponseStatus.retry(10), None), + ( + "test_bad_gateway_constant_retry", + HTTPStatus.BAD_GATEWAY, + None, + None, + {}, + ResponseStatus.retry(SOME_BACKOFF_TIME), + [ConstantBackoffStrategy(SOME_BACKOFF_TIME)], + ), + ("test_exponential_backoff", HTTPStatus.BAD_GATEWAY, None, None, {}, ResponseStatus.retry(10), None), + ( + "test_bad_gateway_exponential_backoff_explicit_parameter", + HTTPStatus.BAD_GATEWAY, + None, + None, + {}, + ResponseStatus.retry(10), + [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY()], + ), + ("test_chain_backoff_strategy", HTTPStatus.BAD_GATEWAY, None, None, {}, ResponseStatus.retry(10), None), + ( + "test_bad_gateway_chain_backoff", + HTTPStatus.BAD_GATEWAY, + None, + None, + {}, + ResponseStatus.retry(10), + [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(), ConstantBackoffStrategy(SOME_BACKOFF_TIME)], + ), + ("test_200", HTTPStatus.OK, None, None, {}, response_status.SUCCESS, None), + ("test_3XX", HTTPStatus.PERMANENT_REDIRECT, None, None, {}, response_status.SUCCESS, None), + ("test_403", HTTPStatus.FORBIDDEN, None, None, {}, response_status.FAIL, None), + ( + "test_403_ignore_error_message", + HTTPStatus.FORBIDDEN, + None, + HttpResponseFilter(action=ResponseAction.IGNORE, error_message_contain="found"), + {}, + response_status.IGNORE, + None, + ), + ( + "test_403_dont_ignore_error_message", + HTTPStatus.FORBIDDEN, + None, + HttpResponseFilter(action=ResponseAction.IGNORE, error_message_contain="not_found"), + {}, + response_status.FAIL, + None, + ), + ("test_429", HTTPStatus.TOO_MANY_REQUESTS, None, None, {}, ResponseStatus.retry(10), None), + ( + "test_ignore_403", + HTTPStatus.FORBIDDEN, + None, + HttpResponseFilter(action=ResponseAction.IGNORE, http_codes={HTTPStatus.FORBIDDEN}), + {}, + response_status.IGNORE, + None, + ), + ( + "test_403_with_predicate", + HTTPStatus.FORBIDDEN, + HttpResponseFilter(action=ResponseAction.RETRY, predicate="{{ 'code' in decoded_response }}"), + None, + {}, + ResponseStatus.retry(10), + None, + ), + ( + "test_403_with_predicate", + HTTPStatus.FORBIDDEN, + HttpResponseFilter(action=ResponseAction.RETRY, predicate="{{ 'some_absent_field' in decoded_response }}"), + None, + {}, + response_status.FAIL, + None, + ), + ( + "test_retry_403", + HTTPStatus.FORBIDDEN, + HttpResponseFilter(action=ResponseAction.RETRY, http_codes={HTTPStatus.FORBIDDEN}), + None, + {}, + ResponseStatus.retry(10), + None, + ), + ], +) +def test_default_error_handler( + test_name, http_code, retry_response_filter, ignore_response_filter, response_headers, should_retry, backoff_strategy +): + response_mock = create_response(http_code, headers=response_headers, json_body={"code": "1000", "error": "found"}) + response_mock.ok = http_code < 400 + response_filters = [f for f in [retry_response_filter, ignore_response_filter] if f] + error_handler = DefaultErrorHandler(response_filters=response_filters, backoff_strategies=backoff_strategy) + actual_should_retry = error_handler.should_retry(response_mock) + assert actual_should_retry == should_retry + if should_retry.action == ResponseAction.RETRY: + assert actual_should_retry.retry_in == should_retry.retry_in + + +def test_default_error_handler_attempt_count_increases(): + status_code = 500 + response_mock = create_response(status_code) + error_handler = DefaultErrorHandler() + actual_should_retry = error_handler.should_retry(response_mock) + assert actual_should_retry == ResponseStatus.retry(10) + assert actual_should_retry.retry_in == 10 + + # This is the same request, so the count should increase + actual_should_retry = error_handler.should_retry(response_mock) + assert actual_should_retry == ResponseStatus.retry(20) + assert actual_should_retry.retry_in == 20 + + # This is a different request, so the count should not increase + another_identical_request = create_response(status_code) + actual_should_retry = error_handler.should_retry(another_identical_request) + assert actual_should_retry == ResponseStatus.retry(10) + assert actual_should_retry.retry_in == 10 + + +def create_response(status_code: int, headers=None, json_body=None): + url = "https://airbyte.io" + + response_mock = MagicMock() + response_mock.status_code = status_code + response_mock.ok = status_code < 400 or status_code >= 600 + response_mock.url = url + response_mock.headers = headers or {} + response_mock.json.return_value = json_body or {} + return response_mock diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_response_status.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_response_status.py new file mode 100644 index 00000000000000..8e51abb1a7be3f --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/error_handlers/test_response_status.py @@ -0,0 +1,32 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus + + +@pytest.mark.parametrize( + "test_name, response_action, retry_in, expected_action, expected_backoff", + [ + ("test_fail_with_backoff", ResponseAction.FAIL, 10, None, None), + ("test_success_no_backoff", ResponseAction.FAIL, None, ResponseAction.FAIL, None), + ("test_ignore_with_backoff", ResponseAction.IGNORE, 10, None, None), + ("test_success_no_backoff", ResponseAction.IGNORE, None, ResponseAction.IGNORE, None), + ("test_success_with_backoff", ResponseAction.SUCCESS, 10, None, None), + ("test_success_no_backoff", ResponseAction.SUCCESS, None, ResponseAction.SUCCESS, None), + ("test_retry_with_backoff", ResponseAction.RETRY, 10, ResponseAction.RETRY, 10), + ("test_retry_no_backoff", ResponseAction.RETRY, None, ResponseAction.RETRY, None), + ], +) +def test_response_status(test_name, response_action, retry_in, expected_action, expected_backoff): + if expected_action or expected_backoff: + response_status = ResponseStatus(response_action, retry_in) + assert response_status.action == expected_action and response_status.retry_in == expected_backoff + else: + try: + ResponseStatus(response_action, retry_in) + assert False + except ValueError: + pass diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py index baeeae2fe715d5..f27b6f7fc2ff5b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -5,10 +5,11 @@ from unittest.mock import MagicMock import requests +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.http_requester import HttpMethod, HttpRequester -def test(): +def test_http_requester(): http_method = "GET" request_options_provider = MagicMock() @@ -25,13 +26,13 @@ def test(): authenticator = MagicMock() - retrier = MagicMock() + error_handler = MagicMock() max_retries = 10 should_retry = True backoff_time = 1000 - retrier.max_retries = max_retries - retrier.should_retry.return_value = should_retry - retrier.backoff_time.return_value = backoff_time + error_handler.max_retries = max_retries + error_handler.should_retry.return_value = should_retry + error_handler.backoff_time.return_value = backoff_time config = {"url": "https://airbyte.io"} stream_slice = {"id": "1234"} @@ -40,12 +41,12 @@ def test(): requester = HttpRequester( name=name, - url_base="{{ config['url'] }}", - path="v1/{{ stream_slice['id'] }}", + url_base=InterpolatedString("{{ config['url'] }}"), + path=InterpolatedString("v1/{{ stream_slice['id'] }}"), http_method=http_method, request_options_provider=request_options_provider, authenticator=authenticator, - retrier=retrier, + error_handler=error_handler, config=config, ) @@ -56,6 +57,4 @@ def test(): assert requester.request_params(stream_state={}, stream_slice=None, next_page_token=None) == request_params assert requester.request_body_data(stream_state={}, stream_slice=None, next_page_token=None) == request_body_data assert requester.request_body_json(stream_state={}, stream_slice=None, next_page_token=None) == request_body_json - assert requester.max_retries == max_retries assert requester.should_retry(requests.Response()) == should_retry - assert requester.backoff_time(requests.Response()) == backoff_time diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 21443416088a7c..69e78d12021745 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -4,8 +4,12 @@ from unittest.mock import MagicMock +import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status +import pytest import requests from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever @@ -41,16 +45,9 @@ def test_simple_retriever(): requester.get_path.return_value = path http_method = HttpMethod.GET requester.get_method.return_value = http_method - raise_on_http_errors = True - requester.raise_on_http_errors = raise_on_http_errors - max_retries = 10 - requester.max_retries = max_retries - retry_factor = 2 - requester.retry_factor = retry_factor - should_retry = True - requester.should_retry.return_value = should_retry backoff_time = 60 - requester.backoff_time.return_value = backoff_time + should_retry = ResponseStatus.retry(backoff_time) + requester.should_retry.return_value = should_retry request_body_data = {"body": "data"} requester.request_body_data.return_value = request_body_data request_body_json = {"body": "json"} @@ -90,13 +87,82 @@ def test_simple_retriever(): assert retriever._last_records == records assert retriever.http_method == "GET" - assert retriever.raise_on_http_errors == raise_on_http_errors - assert retriever.max_retries == max_retries - assert retriever.retry_factor == retry_factor - assert retriever.should_retry(requests.Response()) == should_retry + assert not retriever.raise_on_http_errors + assert retriever.should_retry(requests.Response()) assert retriever.backoff_time(requests.Response()) == backoff_time assert retriever.request_body_data(None, None, None) == request_body_data assert retriever.request_body_json(None, None, None) == request_body_json assert retriever.request_kwargs(None, None, None) == request_kwargs assert retriever.cache_filename == cache_filename assert retriever.use_cache == use_cache + + +@pytest.mark.parametrize( + "test_name, requester_response, expected_should_retry, expected_backoff_time", + [ + ("test_should_retry_fail", response_status.FAIL, False, None), + ("test_should_retry_none_backoff", ResponseStatus.retry(None), True, None), + ("test_should_retry_custom_backoff", ResponseStatus.retry(60), True, 60), + ], +) +def test_should_retry(test_name, requester_response, expected_should_retry, expected_backoff_time): + requester = MagicMock() + retriever = SimpleRetriever("stream_name", primary_key, requester=requester, record_selector=MagicMock()) + requester.should_retry.return_value = requester_response + assert retriever.should_retry(requests.Response()) == expected_should_retry + if requester_response.action == ResponseAction.RETRY: + assert retriever.backoff_time(requests.Response()) == expected_backoff_time + + +@pytest.mark.parametrize( + "test_name, status_code, response_status, len_expected_records", + [ + ("test_parse_response_fails_if_should_retry_is_fail", 404, response_status.FAIL, None), + ("test_parse_response_succeeds_if_should_retry_is_ok", 200, response_status.SUCCESS, 1), + ("test_parse_response_succeeds_if_should_retry_is_ignore", 404, response_status.IGNORE, 0), + ], +) +def test_parse_response(test_name, status_code, response_status, len_expected_records): + requester = MagicMock() + record_selector = MagicMock() + record_selector.select_records.return_value = [{"id": 100}] + retriever = SimpleRetriever("stream_name", primary_key, requester=requester, record_selector=record_selector) + response = requests.Response() + response.status_code = status_code + requester.should_retry.return_value = response_status + if len_expected_records is None: + try: + retriever.parse_response(response, stream_state={}) + assert False + except requests.exceptions.HTTPError: + pass + else: + records = retriever.parse_response(response, stream_state={}) + assert len(records) == len_expected_records + + +@pytest.mark.parametrize( + "test_name, response_action, retry_in, expected_backoff_time", + [ + ("test_backoff_retriable_request", ResponseAction.RETRY, 10, 10), + ("test_backoff_fail_request", ResponseAction.FAIL, 10, None), + ("test_backoff_ignore_request", ResponseAction.IGNORE, 10, None), + ("test_backoff_success_request", ResponseAction.IGNORE, 10, None), + ], +) +def test_backoff_time(test_name, response_action, retry_in, expected_backoff_time): + requester = MagicMock() + record_selector = MagicMock() + record_selector.select_records.return_value = [{"id": 100}] + response = requests.Response() + retriever = SimpleRetriever("stream_name", primary_key, requester=requester, record_selector=record_selector) + if expected_backoff_time: + requester.should_retry.return_value = ResponseStatus(response_action, retry_in) + actual_backoff_time = retriever.backoff_time(response) + assert expected_backoff_time == actual_backoff_time + else: + try: + retriever.backoff_time(response) + assert False + except ValueError: + pass diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py index 0dc7202ca47f5d..0f34bc88cbf016 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -231,6 +231,7 @@ def mock_datetime_now(monkeypatch): ], ) def test_stream_slices(mock_datetime_now, test_name, stream_state, start, end, cursor, step, lookback_window, expected_slices): + lookback_window = InterpolatedString(lookback_window) if lookback_window else None slicer = DatetimeStreamSlicer( start_datetime=start, end_datetime=end, diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 38fa66118997f5..d5c331f6eb84b5 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -7,17 +7,20 @@ from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder +from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser +from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler +from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester from airbyte_cdk.sources.declarative.requesters.paginators.next_page_url_paginator import NextPageUrlPaginator from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod -from airbyte_cdk.sources.declarative.requesters.retriers.default_retrier import DefaultRetrier from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer @@ -103,16 +106,14 @@ def test_list_based_stream_slicer_with_values_defined_in_config(): def test_datetime_stream_slicer(): content = """ stream_slicer: - class_name: airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer.DatetimeStreamSlicer + type: DatetimeStreamSlicer options: datetime_format: "%Y-%m-%d" start_datetime: type: MinMaxDatetime datetime: "{{ config['start_time'] }}" min_datetime: "{{ config['start_time'] + day_delta(2) }}" - end_datetime: - class_name: airbyte_cdk.sources.declarative.datetime.min_max_datetime.MinMaxDatetime - datetime: "{{ config['end_time'] }}" + end_datetime: "{{ config['end_time'] }}" step: "10d" cursor_value: "created" lookback_window: "5d" @@ -123,6 +124,7 @@ def test_datetime_stream_slicer(): assert type(stream_slicer) == DatetimeStreamSlicer assert stream_slicer._timezone == datetime.timezone.utc assert type(stream_slicer._start_datetime) == MinMaxDatetime + assert type(stream_slicer._end_datetime) == MinMaxDatetime assert stream_slicer._start_datetime._datetime_format == "%Y-%m-%d" assert stream_slicer._start_datetime._timezone == datetime.timezone.utc assert stream_slicer._start_datetime._datetime_interpolator._string == "{{ config['start_time'] }}" @@ -163,8 +165,8 @@ def test_full_config(): class_name: airbyte_cdk.sources.streams.http.requests_native_auth.token.TokenAuthenticator token: "{{ config['apikey'] }}" request_parameters_provider: "*ref(request_options_provider)" - retrier: - class_name: airbyte_cdk.sources.declarative.requesters.retriers.default_retrier.DefaultRetrier + error_handler: + type: DefaultErrorHandler retriever: class_name: "airbyte_cdk.sources.declarative.retrievers.simple_retriever.SimpleRetriever" name: "{{ options['name'] }}" @@ -179,7 +181,7 @@ def test_full_config(): class_name: "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" schema_loader: class_name: airbyte_cdk.sources.declarative.schema.json_schema.JsonSchema - file_path: "./source_sendgrid/schemas/{{options['name']}}.json" + file_path: "./source_sendgrid/schemas/{{ name }}.json" cursor_field: [ ] list_stream: ref: "*ref(partial_stream)" @@ -210,6 +212,9 @@ def test_full_config(): assert stream_config["class_name"] == "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" assert stream_config["cursor_field"] == [] stream = factory.create_component(stream_config, input_config)() + + assert isinstance(stream._retriever._record_selector._extractor, JelloExtractor) + assert type(stream) == DeclarativeStream assert stream.primary_key == "id" assert stream.name == "lists" @@ -219,10 +224,11 @@ def test_full_config(): assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"] assert type(stream._retriever._record_selector) == RecordSelector assert type(stream._retriever._record_selector._extractor._decoder) == JsonDecoder + assert stream._retriever._record_selector._extractor._transform == ".result[]" assert type(stream._retriever._record_selector._record_filter) == RecordFilter assert stream._retriever._record_selector._record_filter._filter_interpolator._condition == "{{ record['id'] > stream_state['id'] }}" - assert stream._schema_loader._file_path._string == "./source_sendgrid/schemas/lists.json" + assert stream._schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.json" checker = factory.create_component(config["check"], input_config)() streams_to_check = checker._stream_names @@ -232,10 +238,32 @@ def test_full_config(): assert stream._retriever._requester._path._default == "marketing/lists" +def test_create_record_selector(): + content = """ + extractor: + type: JelloExtractor + transform: "_" + selector: + class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector + record_filter: + class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + extractor: + ref: "*ref(extractor)" + transform: "_" + """ + config = parser.parse(content) + selector = factory.create_component(config["selector"], input_config)() + assert isinstance(selector, RecordSelector) + assert isinstance(selector._extractor, JelloExtractor) + assert selector._extractor._transform == "_" + assert isinstance(selector._record_filter, RecordFilter) + + def test_create_requester(): content = """ requester: - class_name: airbyte_cdk.sources.declarative.requesters.http_requester.HttpRequester + type: HttpRequester path: "/v3/marketing/lists" name: lists url_base: "https://api.sendgrid.com" @@ -251,7 +279,7 @@ def test_create_requester(): config = parser.parse(content) component = factory.create_component(config["requester"], input_config)() assert isinstance(component, HttpRequester) - assert isinstance(component._retrier, DefaultRetrier) + assert isinstance(component._error_handler, DefaultErrorHandler) assert component._path._string == "/v3/marketing/lists" assert component._url_base._string == "https://api.sendgrid.com" assert isinstance(component._authenticator, TokenAuthenticator) @@ -261,16 +289,38 @@ def test_create_requester(): assert component._name == "lists" -def test_full_config_with_defaults(): +def test_create_composite_error_handler(): + content = """ + error_handler: + type: "CompositeErrorHandler" + error_handlers: + - response_filters: + - predicate: "{{ 'code' in decoded_response }}" + action: RETRY + - response_filters: + - http_codes: [ 403 ] + action: RETRY + """ + config = parser.parse(content) + component = factory.create_component(config["error_handler"], input_config)() + assert len(component._error_handlers) == 2 + assert isinstance(component._error_handlers[0], DefaultErrorHandler) + assert isinstance(component._error_handlers[0]._response_filters[0], HttpResponseFilter) + assert component._error_handlers[0]._response_filters[0]._predicate._condition == "{{ 'code' in decoded_response }}" + assert component._error_handlers[1]._response_filters[0]._http_codes == [403] + assert isinstance(component, CompositeErrorHandler) + + +def test_config_with_defaults(): content = """ lists_stream: - class_name: "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" + type: "DeclarativeStream" options: name: "lists" primary_key: id url_base: "https://api.sendgrid.com" schema_loader: - file_path: "./source_sendgrid/schemas/{{options.name}}.yaml" + file_path: "./source_sendgrid/schemas/{{name}}.yaml" retriever: paginator: type: "NextPageUrlPaginator" @@ -301,7 +351,7 @@ def test_full_config_with_defaults(): assert stream._retriever._requester._method == HttpMethod.GET assert stream._retriever._requester._authenticator._tokens == ["verysecrettoken"] assert stream._retriever._record_selector._extractor._transform == ".result[]" - assert stream._schema_loader._file_path._string == "./source_sendgrid/schemas/lists.yaml" + assert stream._schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.yaml" assert isinstance(stream._retriever._paginator, NextPageUrlPaginator) assert stream._retriever._paginator._url_base == "https://api.sendgrid.com" assert stream._retriever._paginator._interpolated_paginator._next_page_token_template._mapping == { @@ -330,7 +380,7 @@ class TestCreateTransformations: def test_no_transformations(self): content = f""" the_stream: - class_name: airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream + type: DeclarativeStream options: {self.base_options} """ @@ -342,7 +392,7 @@ def test_no_transformations(self): def test_remove_fields(self): content = f""" the_stream: - class_name: airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream + type: DeclarativeStream options: {self.base_options} transformations: