Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airbyte-cdk] Updates Low Code CDK ErrorHandlers and BackoffStrategies to align with Python CDK Interfaces #38743

Merged
merged 51 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e09401c
Update reference for `ResponseAction`
pnilan May 23, 2024
3d4037a
Updates `ErrorHandler`, `BackoffStrategy`, and `HttpRequester` for ne…
pnilan May 24, 2024
b49b0db
Update backoff strategies for new format
pnilan May 24, 2024
2f02df9
Updates factory method to provide default http response filter
pnilan May 28, 2024
1c0848e
Update composite handler to include valid max_retries
pnilan May 28, 2024
be1c8bf
Update error handler, http_requester, and model_to_componet_factory, …
pnilan May 28, 2024
d79e994
Updated tests for default_error_handler
pnilan May 28, 2024
5c629d8
Add method for determining optimal error resolution when provided mul…
pnilan May 29, 2024
2bd6c60
Update default error handler and composite error handler unit tests. …
pnilan May 29, 2024
1523bae
Updates error handlers to maintain current behavior on response actio…
pnilan May 29, 2024
d8f4857
Updates error handler tests to expected behavior
pnilan May 29, 2024
be01417
Updates reference to ResponseAction
pnilan May 29, 2024
a656980
Update http_response_filter tests to reflect updated default error ma…
pnilan May 29, 2024
153eea5
Update HttpResponseFilter to prioritize user inputted error messages
pnilan May 29, 2024
0a33f2f
Update declarative error handlers tests for updated backoff_time meth…
pnilan May 29, 2024
2dc8a89
chore: format code
pnilan May 29, 2024
3a93bbf
Update http_requester and associated tests
pnilan May 29, 2024
6c3315c
chore: lint
pnilan May 29, 2024
f737791
Merge branch 'master' into pnilan/airbyte-cdk-low-code-http-error-han…
pnilan May 29, 2024
bef56d2
Update http requester to return response if no error handler, alignin…
pnilan May 29, 2024
572933e
mypy type checking
pnilan May 29, 2024
0cc790d
chore: format code
pnilan May 29, 2024
42f9fd7
revert source-harvest files
pnilan May 29, 2024
d231eff
Updates poetry.lock
pnilan May 30, 2024
546c1d2
Update wait_until_header tests for MagicMock spec
pnilan May 30, 2024
9740aa6
Update test for http response filter for correct behavior
pnilan May 30, 2024
1467382
Update variable type
pnilan May 30, 2024
06258b2
Update conditional for type checking
pnilan May 30, 2024
a3cdc2e
Update http response filter test
pnilan May 30, 2024
f5fdf7d
Update tests
pnilan May 30, 2024
9ca5af1
Merge branch 'master' into pnilan/airbyte-cdk-low-code-http-error-han…
pnilan May 30, 2024
3a045ce
Merge branch 'master' into pnilan/airbyte-cdk-low-code-http-error-han…
pnilan May 30, 2024
fec77c3
Merge branch 'pnilan/airbyte-cdk-low-code-http-error-handler' of gith…
pnilan May 30, 2024
3949c2a
Update for `attempt_count`, CompositeErrorHandler compatibility, and …
pnilan Jun 4, 2024
a52315a
chore: format code
pnilan Jun 4, 2024
b6d3c14
chore: linting
pnilan Jun 4, 2024
c010124
Update per comments, adds DefaultHttpResponseFilter, adds request_att…
pnilan Jun 6, 2024
9e331ca
Update mapped_key type for mypy
pnilan Jun 6, 2024
34071f4
chore: format code
pnilan Jun 6, 2024
7cc7ffb
chore: format code
pnilan Jun 7, 2024
7fb7b4d
chore: lint & format code
pnilan Jun 7, 2024
ffec9c0
revert poetry.lock
pnilan Jun 7, 2024
522db58
Update for response with ResponseAction.FAIL to raise ReadException t…
pnilan Jun 7, 2024
33eef17
Merge branch 'master' into pnilan/airbyte-cdk-low-code-http-error-han…
pnilan Jun 9, 2024
83d598b
Adds ability for abstract source to emit control messages when Airbyt…
pnilan Jun 11, 2024
cb29218
Update declarative error handler and backoff strategy interface names
pnilan Jun 11, 2024
389cc39
Update http response filter per comments
pnilan Jun 11, 2024
ba759f0
Merge branch 'master' into pnilan/airbyte-cdk-low-code-http-error-han…
pnilan Jun 11, 2024
57031fe
update wait until header time backoff strategy
pnilan Jun 11, 2024
ed10145
Update tests for updated type
pnilan Jun 11, 2024
b6892a9
chore: format code
pnilan Jun 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from .sources.declarative.requesters import Requester, HttpRequester

from .sources.declarative.requesters.error_handlers import BackoffStrategy
from .sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from .sources.declarative.requesters.paginators import DefaultPaginator, PaginationStrategy
from .sources.declarative.requesters.paginators.strategies import OffsetIncrement, CursorPaginationStrategy, PageIncrement, StopConditionPaginationStrategyDecorator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,6 @@ definitions:
type: object
required:
- type
- action
properties:
type:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ class Action(Enum):

class HttpResponseFilter(BaseModel):
type: Literal['HttpResponseFilter']
action: Action = Field(
...,
action: Optional[Action] = Field(
None,
description='Action to execute if a response matches the filter.',
examples=['SUCCESS', 'FAIL', 'RETRY', 'IGNORE'],
title='Action',
Expand Down Expand Up @@ -1086,7 +1086,7 @@ class Config:
type: Literal['DeclarativeSource']
check: CheckStream
streams: List[DeclarativeStream]
version: str = Field(..., description='The version of the CDK used to build and test the source.')
version: str = Field(..., description='The version of the Airbyte CDK used to build and test the source.')
schemas: Optional[Schemas] = None
definitions: Optional[Dict[str, Any]] = None
spec: Optional[Spec] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
WaitTimeFromHeaderBackoffStrategy,
WaitUntilTimeFromHeaderBackoffStrategy,
)
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, NoPagination, PaginatorTestReadDecorator
from airbyte_cdk.sources.declarative.requesters.paginators.strategies import (
CursorPaginationStrategy,
Expand All @@ -133,6 +132,7 @@
from airbyte_cdk.sources.declarative.transformations import AddFields, RecordTransformation, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.message import InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, MessageRepository
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.transform import TypeTransformer
from isodate import parse_duration
Expand Down Expand Up @@ -333,7 +333,7 @@ def create_legacy_to_per_partition_state_migration(
declarative_stream: DeclarativeStreamModel,
) -> LegacyToPerPartitionStateMigration:
retriever = declarative_stream.retriever
partition_router = retriever.partition_router
partition_router = retriever.partition_router # type: ignore # CustomRetriever would inherit from SimpleRetriever and therefore have partition_router
pnilan marked this conversation as resolved.
Show resolved Hide resolved

if not isinstance(retriever, SimpleRetrieverModel):
raise ValueError(
Expand Down Expand Up @@ -696,15 +696,7 @@ def create_default_error_handler(self, model: DefaultErrorHandlerModel, config:
for response_filter_model in model.response_filters:
response_filters.append(self._create_component_from_model(model=response_filter_model, config=config))
else:
response_filters.append(
HttpResponseFilter(
ResponseAction.RETRY,
http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS,
config=config,
parameters=model.parameters or {},
)
)
response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, config=config, parameters=model.parameters or {}))
response_filters.append(HttpResponseFilter(config=config, parameters=model.parameters or {}))

return DefaultErrorHandler(
backoff_strategies=backoff_strategies,
Expand Down Expand Up @@ -800,7 +792,10 @@ def create_http_requester(self, model: HttpRequesterModel, config: Config, *, na

@staticmethod
def create_http_response_filter(model: HttpResponseFilterModel, config: Config, **kwargs: Any) -> HttpResponseFilter:
action = ResponseAction(model.action.value)
if model.action is not None:
pnilan marked this conversation as resolved.
Show resolved Hide resolved
action = ResponseAction(model.action.value)
else:
action = None
http_codes = (
set(model.http_codes) if model.http_codes else set()
) # JSON schema notation has no set data type. The schema enforces an array of unique elements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy
from airbyte_cdk.sources.types import Config


Expand All @@ -32,5 +32,5 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
else:
self.backoff_time_in_seconds = InterpolatedString.create(self.backoff_time_in_seconds, parameters=parameters)

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
def backoff_time(self, response_or_exception: Optional[Union[requests.Response, Exception]], attempt_count: int) -> Optional[float]: # type: ignore # attempt_count maintained for compatibility with low code CDK
return self.backoff_time_in_seconds.eval(self.config) # type: ignore # backoff_time_in_seconds is always cast to an interpolated string
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy
from airbyte_cdk.sources.types import Config


Expand All @@ -32,5 +32,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
else:
self.factor = InterpolatedString.create(self.factor, parameters=parameters)

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
def backoff_time( # type: ignore # attempt_count maintained for compatibility with low code CDK
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int
) -> Optional[float]:
return self.factor.eval(self.config) * 2**attempt_count # type: ignore # factor is always cast to an interpolated string
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.regex = InterpolatedString.create(self.regex, parameters=parameters) if self.regex else None
self.header = InterpolatedString.create(self.header, parameters=parameters)

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
def backoff_time(self, response: requests.Response, attempt_count: int) -> Optional[float]: # type: ignore # attempt_count maintained for compatibility with low code CDK
pnilan marked this conversation as resolved.
Show resolved Hide resolved
header = self.header.eval(config=self.config) # type: ignore # header is always cast to an interpolated stream
if self.regex:
evaled_regex = self.regex.eval(self.config) # type: ignore # header is always cast to an interpolated string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if not isinstance(self.min_wait, InterpolatedString):
self.min_wait = InterpolatedString.create(str(self.min_wait), parameters=parameters)

def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
def backoff_time(self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int) -> Optional[float]: # type: ignore # attempt_count maintained for compatibility with low code CDK
now = time.time()
header = self.header.eval(self.config) # type: ignore # header is always cast to an interpolated string
if self.regex:
evaled_regex = self.regex.eval(self.config) # type: ignore # header is always cast to an interpolated string
regex = re.compile(evaled_regex)
else:
regex = None
wait_until = get_numeric_value_from_header(response, header, regex)
wait_until = None
if isinstance(response_or_exception, requests.Response):
wait_until = get_numeric_value_from_header(response_or_exception, header, regex)
min_wait = self.min_wait.eval(self.config) # type: ignore # header is always cast to an interpolated string
if wait_until is None or not wait_until:
return float(min_wait) if min_wait else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import abstractmethod
from dataclasses import dataclass
from typing import Optional
from abc import ABC

import requests
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy as HttpBackoffStrategy


@dataclass
class BackoffStrategy:
class BackoffStrategy(HttpBackoffStrategy, ABC):
pnilan marked this conversation as resolved.
Show resolved Hide resolved
"""
Backoff strategy defining how long to wait before retrying a request that resulted in an error.
References Python CDK BackoffStrategy
"""

@abstractmethod
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
"""
Return time to wait before retrying the request.
:param response: response received for the request to retry
:param attempt_count: number of attempts to submit the request
:return: time to wait in seconds
"""
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Union
from typing import Any, List, Mapping, Optional, Union

import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION, ErrorResolution, ResponseAction


@dataclass
Expand Down Expand Up @@ -45,19 +43,28 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
raise ValueError("CompositeErrorHandler expects at least 1 underlying error handler")

@property
def max_retries(self) -> Union[int, None]:
return self.error_handlers[0].max_retries
def max_retries(self) -> Optional[int]:
return self.error_handlers[0].max_retries # type: ignore # property not defined in ErrorHandler

@property
def max_time(self) -> Union[int, None]:
return max([error_handler.max_time or 0 for error_handler in self.error_handlers])

def interpret_response(self, response: requests.Response) -> ResponseStatus:
should_retry = ResponseStatus(ResponseAction.FAIL)
for retrier in self.error_handlers:
should_retry = retrier.interpret_response(response)
if should_retry.action == ResponseAction.SUCCESS:
return response_status.SUCCESS
if should_retry == response_status.IGNORE or should_retry.action == ResponseAction.RETRY:
return should_retry
return should_retry
return max([error_handler.max_time or 0 for error_handler in self.error_handlers]) # type: ignore # property not defined in ErrorHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting. What is the exact error that mypy raises when we don't have this ignore annotation. The ErrorHandler interface does have a max_time property so I'm surprised this gets raised. Not a blocker, but it is curious to know if we can avoid the annotation because the code itself looks fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py:51: error: "ErrorHandler" has no attribute "max_time" [attr-defined]

This is because the new parent class is the Python CDK ErrorHandler which does not have the max_time/max_retries, etc.


def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
matched_error_resolution = None
for error_handler in self.error_handlers:
matched_error_resolution = error_handler.interpret_response(response_or_exception)

if matched_error_resolution.response_action == ResponseAction.SUCCESS:
return matched_error_resolution

if (
matched_error_resolution.response_action == ResponseAction.RETRY
or matched_error_resolution.response_action == ResponseAction.IGNORE
):
return matched_error_resolution

if matched_error_resolution is not None:
return matched_error_resolution

return DEFAULT_ERROR_RESOLUTION
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.exponential_backoff_strategy import (
ExponentialBackoffStrategy,
)
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION, SUCCESS_RESOLUTION, ErrorResolution
from airbyte_cdk.sources.types import Config


Expand Down Expand Up @@ -101,57 +98,46 @@ class DefaultErrorHandler(ErrorHandler):
backoff_strategies: Optional[List[BackoffStrategy]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.response_filters = self.response_filters or []

if not self.response_filters:
self.response_filters.append(
HttpResponseFilter(
ResponseAction.RETRY, http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS, config=self.config, parameters={}
)
)
self.response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, config={}, parameters={}))
self.response_filters = [HttpResponseFilter(config=self.config, parameters={})]

if not self.backoff_strategies:
self.backoff_strategies = [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(parameters=parameters, config=self.config)]

self._last_request_to_attempt_count: MutableMapping[requests.PreparedRequest, int] = {}

@property # type: ignore # overwrite the property to handle the case where max_retries is not provided in the constructor
def max_retries(self) -> Union[int, None]:
return self._max_retries

@max_retries.setter
def max_retries(self, value: int) -> None:
# Covers the case where max_retries is not provided in the constructor, which causes the property object
# to be set which we need to avoid doing
if not isinstance(value, property):
self._max_retries = value

def interpret_response(self, response: requests.Response) -> ResponseStatus:
request = response.request

if request not in self._last_request_to_attempt_count:
self._last_request_to_attempt_count = {request: 1}
else:
self._last_request_to_attempt_count[request] += 1
if self.response_filters:
for response_filter in self.response_filters:
matched_status = response_filter.matches(
response=response, backoff_time=self._backoff_time(response, self._last_request_to_attempt_count[request])
)
if matched_status is not None:
return matched_status

if response.ok:
return response_status.SUCCESS
# Fail if the response matches no filters
return response_status.FAIL

def _backoff_time(self, response: requests.Response, attempt_count: int) -> Optional[float]:
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:

if isinstance(response_or_exception, requests.Response):
pnilan marked this conversation as resolved.
Show resolved Hide resolved

request = response_or_exception.request

if request not in self._last_request_to_attempt_count:
self._last_request_to_attempt_count = {request: 1}
else:
self._last_request_to_attempt_count[request] += 1
if self.response_filters:

for response_filter in self.response_filters:
matched_error_resolution = response_filter.matches(response_or_exception=response_or_exception)

if matched_error_resolution is not None:
return matched_error_resolution

if response_or_exception.ok:
return SUCCESS_RESOLUTION

# Return default error resolution (retry)
return DEFAULT_ERROR_RESOLUTION

def backoff_time(
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int = 0
) -> Optional[float]:
backoff = None
if self.backoff_strategies:
for backoff_strategies in self.backoff_strategies:
backoff = backoff_strategies.backoff(response, attempt_count)
backoff = backoff_strategies.backoff_time(response_or_exception, attempt_count) # type: ignore # attempt_count maintained for compatibility with low code CDK
if backoff:
return backoff
return backoff
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,15 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import abstractmethod
from abc import ABC
from dataclasses import dataclass
from typing import Union

import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler as HttpErrorHandler


@dataclass
class ErrorHandler:
class ErrorHandler(HttpErrorHandler, ABC):
"""
Defines whether a request was successful and how to handle a failure.
References Python CDK ErrorHandler
"""

@property
@abstractmethod
def max_retries(self) -> Union[int, None]:
"""
Specifies maximum amount of retries for backoff policy. Return None for no limit.
"""
pass

@property
@abstractmethod
def max_time(self) -> Union[int, None]:
"""
Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
"""
pass

@abstractmethod
def interpret_response(self, response: requests.Response) -> ResponseStatus:
"""
Evaluate response status describing whether a failing request should be retried or ignored.

:param response: response to evaluate
:return: response status
"""
pass
Loading
Loading