Skip to content

Commit

Permalink
[airbyte-cdk] - Integrate HttpClient into HttpRequester (#38906)
Browse files Browse the repository at this point in the history
  • Loading branch information
pnilan committed Jun 18, 2024
1 parent 7a63966 commit 18e82d9
Show file tree
Hide file tree
Showing 16 changed files with 310 additions and 778 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -796,8 +796,6 @@ def create_http_requester(self, model: HttpRequesterModel, config: Config, *, na
assert model.use_cache is not None # for mypy
assert model.http_method is not None # for mypy

assert model.use_cache is not None # for mypy

return HttpRequester(
name=name,
url_base=model.url_base,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if not isinstance(self.factor, InterpolatedString):
self.factor = str(self.factor)
if isinstance(self.factor, float):
self.factor = InterpolatedString.create(str(self.factor), parameters=parameters)
self._factor = InterpolatedString.create(str(self.factor), parameters=parameters)
else:
self.factor = InterpolatedString.create(self.factor, parameters=parameters)
self._factor = InterpolatedString.create(self.factor, parameters=parameters)

@property
def _retry_factor(self) -> float:
return self._factor.eval(self.config) # type: ignore # factor is always cast to an interpolated string

def backoff_time(
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], **kwargs: Any
Expand All @@ -40,4 +44,4 @@ def backoff_time(
raise ValueError("ExponentialBackoffStrategy requires an attempt_count")
if not isinstance(attempt_count, int):
raise ValueError("ExponentialBackoffStrategy requires an attempt_count that is an integer")
return self.factor.eval(self.config) * 2**attempt_count # type: ignore # factor is always cast to an interpolated string
return self._retry_factor * 2**attempt_count # type: ignore # factor is always cast to an interpolated string
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

@property
def max_retries(self) -> Optional[int]:
return self.error_handlers[0].max_retries # type: ignore # property not defined in ErrorHandler
return self.error_handlers[0].max_retries

@property
def max_time(self) -> Union[int, None]:
return max([error_handler.max_time or 0 for error_handler in self.error_handlers]) # type: ignore # property not defined in ErrorHandler
def max_time(self) -> Optional[int]:
return max([error_handler.max_time or 0 for error_handler in self.error_handlers])

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
matched_error_resolution = None
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution
from airbyte_cdk.sources.types import StreamSlice, StreamState


Expand Down Expand Up @@ -68,18 +67,6 @@ def get_request_params(
E.g: you might want to define query parameters for paging if next_page_token is not None.
"""

@abstractmethod
def interpret_response_status(self, response: requests.Response) -> ErrorResolution:
"""
Specifies conditions for backoff, error handling and reporting based on the response from the server.
By default, back off on the following HTTP response statuses:
- 429 (Too Many Requests) indicating rate limiting
- 500s to handle transient server errors
Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default.
"""

@abstractmethod
def get_request_headers(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.


from datetime import timedelta
from typing import Any, Optional, Union

import requests
Expand All @@ -10,14 +9,6 @@


class DefaultBackoffStrategy(BackoffStrategy):
def __init__(
self,
max_retries: int = 5,
max_time: timedelta = timedelta(seconds=600),
):
self.max_retries = max_retries
self.max_time = max_time.total_seconds()

def backoff_time(
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], **kwargs: Any
) -> Optional[float]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,23 @@

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, ResponseAction
from requests import RequestException
from requests.exceptions import InvalidSchema, InvalidURL, RequestException

DEFAULT_ERROR_MAPPING: Mapping[Union[int, str, Type[Exception]], ErrorResolution] = {
InvalidSchema: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Invalid Protocol Schema: The endpoint that data is being requested from is using an invalid or insecure. Exception: requests.exceptions.InvalidSchema",
),
InvalidURL: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Invalid URL specified: The endpoint that data is being requested from is not a valid URL. Exception: requests.exceptions.InvalidURL",
),
RequestException: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="An exception occurred when making the request.",
error_message="An exception occurred when making the request. Exception: requests.exceptions.RequestException",
),
400: ErrorResolution(
response_action=ResponseAction.FAIL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@ class ErrorHandler(ABC):
Abstract base class to determine how to handle a failed HTTP request.
"""

@property
@abstractmethod
def max_retries(self) -> Optional[int]:
"""
The maximum number of retries to attempt before giving up.
"""
pass

@property
@abstractmethod
def max_time(self) -> Optional[int]:
"""
The maximum amount of time in seconds to retry before giving up.
"""
pass

@abstractmethod
def interpret_response(self, response: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import logging
from datetime import timedelta
from typing import Mapping, Optional, Union

import requests
Expand All @@ -17,6 +18,8 @@ def __init__(
self,
logger: logging.Logger,
error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None,
max_retries: int = 5,
max_time: timedelta = timedelta(seconds=600),
) -> None:
"""
Initialize the HttpStatusErrorHandler.
Expand All @@ -25,6 +28,16 @@ def __init__(
"""
self._logger = logger
self._error_mapping = error_mapping or DEFAULT_ERROR_MAPPING
self._max_retries = max_retries
self._max_time = int(max_time.total_seconds())

@property
def max_retries(self) -> Optional[int]:
return self._max_retries

@property
def max_time(self) -> Optional[int]:
return self._max_time

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
"""
Expand Down
118 changes: 87 additions & 31 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
import os
import urllib
from pathlib import Path
from typing import Any, Dict, Mapping, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union

import requests
import requests_cache
from airbyte_cdk.models import Level
from airbyte_cdk.sources.http_config import MAX_CONNECTION_POOL_SIZE
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.call_rate import APIBudget, CachedLimiterSession, LimiterSession
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests.auth import AuthBase

from .error_handlers import (
from airbyte_cdk.sources.streams.http.error_handlers import (
BackoffStrategy,
DefaultBackoffStrategy,
ErrorHandler,
Expand All @@ -26,17 +24,19 @@
JsonErrorMessageParser,
ResponseAction,
)
from .exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from .rate_limiting import http_client_default_backoff_handler, user_defined_backoff_handler
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from airbyte_cdk.sources.streams.http.rate_limiting import http_client_default_backoff_handler, user_defined_backoff_handler
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests.auth import AuthBase

BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH")


class HttpClient:

_DEFAULT_MAX_RETRY = 5
_DEFAULT_RETRY_FACTOR = 5
_DEFAULT_MAX_TIME = 60 * 10
_DEFAULT_MAX_RETRY: int = 5
_DEFAULT_MAX_TIME: int = 60 * 10

def __init__(
self,
Expand All @@ -47,8 +47,10 @@ def __init__(
session: Optional[Union[requests.Session, requests_cache.CachedSession]] = None,
authenticator: Optional[AuthBase] = None,
use_cache: bool = False,
backoff_strategy: Optional[BackoffStrategy] = None,
backoff_strategy: Optional[Union[BackoffStrategy, List[BackoffStrategy]]] = None,
error_message_parser: Optional[ErrorMessageParser] = None,
disable_retries: bool = False,
message_respository: Optional[MessageRepository] = None,
):
self._name = name
self._api_budget: APIBudget = api_budget or APIBudget(policies=[])
Expand All @@ -64,9 +66,17 @@ def __init__(
self._session.auth = authenticator
self._logger = logger
self._error_handler = error_handler or HttpStatusErrorHandler(self._logger)
self._backoff_strategy = backoff_strategy or DefaultBackoffStrategy()
if backoff_strategy is not None:
if isinstance(backoff_strategy, list):
self._backoff_strategies = backoff_strategy
else:
self._backoff_strategies = [backoff_strategy]
else:
self._backoff_strategies = [DefaultBackoffStrategy()]
self._error_message_parser = error_message_parser or JsonErrorMessageParser()
self._request_attempt_count: Dict[requests.PreparedRequest, int] = {}
self._disable_retries = disable_retries
self._message_repository = message_respository

@property
def cache_filename(self) -> str:
Expand Down Expand Up @@ -136,27 +146,59 @@ def _create_prepared_request(

return prepared_request

def _send_with_retry(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
@property
def _max_retries(self) -> int:
"""
Backoff package has max_tries parameter that means total number of
tries before giving up, so if this number is 0 no calls expected to be done.
But for this class we call it max_REtries assuming there would be at
least one attempt and some retry attempts, to comply this logic we add
1 to expected retries attempts.
Determines the max retries based on the provided error handler.
"""
max_retries = self._backoff_strategy.max_retries or self._DEFAULT_MAX_RETRY # type: ignore # max_retries is included in default implemention but optional
max_tries = max(0, max_retries) + 1
max_retries = None
if self._disable_retries:
max_retries = 0
else:
max_retries = self._error_handler.max_retries
return max_retries if max_retries is not None else self._DEFAULT_MAX_RETRY

@property
def _max_time(self) -> int:
"""
Determines the max time based on the provided error handler.
"""
return self._error_handler.max_time if self._error_handler.max_time is not None else self._DEFAULT_MAX_TIME

def _send_with_retry(
self,
request: requests.PreparedRequest,
request_kwargs: Mapping[str, Any],
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
) -> requests.Response:
"""
Sends a request with retry logic.
Args:
request (requests.PreparedRequest): The prepared HTTP request to send.
request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request.
max_time = self._backoff_strategy.max_time or self._DEFAULT_MAX_TIME # type: ignore # max_time is included in default implemention but optional
Returns:
requests.Response: The HTTP response received from the server after retries.
"""

max_retries = self._max_retries
max_tries = max(0, max_retries) + 1
max_time = self._max_time

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(self._send)
backoff_handler = http_client_default_backoff_handler(max_tries=max_tries, max_time=max_time, factor=self._DEFAULT_RETRY_FACTOR)
backoff_handler = http_client_default_backoff_handler(max_tries=max_tries, max_time=max_time)
# backoff handlers wrap _send, so it will always return a response
response = backoff_handler(user_backoff_handler)(request, request_kwargs)
response = backoff_handler(user_backoff_handler)(request, request_kwargs, log_formatter=log_formatter) # type: ignore # mypy can't infer that backoff_handler wraps _send

return response

def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
def _send(
self,
request: requests.PreparedRequest,
request_kwargs: Mapping[str, Any],
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
) -> requests.Response:

if request not in self._request_attempt_count:
self._request_attempt_count[request] = 1
Expand Down Expand Up @@ -184,6 +226,14 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
"Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text}
)

# Request/repsonse logging for declarative cdk.
if log_formatter is not None and response is not None and self._message_repository is not None:
formatter = log_formatter
self._message_repository.log_message(
Level.DEBUG,
lambda: formatter(response), # type: ignore # log_formatter is always cast to a callable
)

if error_resolution.response_action == ResponseAction.FAIL:
if response:
error_message = f"'{request.method}' request to '{request.url}' failed with status code '{response.status_code}' and error message '{self._error_message_parser.parse_response_error_message(response)}'"
Expand All @@ -208,16 +258,21 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,

# TODO: Consider dynamic retry count depending on subsequent error codes
elif error_resolution.response_action == ResponseAction.RETRY:
custom_backoff_time = self._backoff_strategy.backoff_time(
response_or_exception=response if response is not None else exc, attempt_count=self._request_attempt_count[request]
)
user_defined_backoff_time = None
for backoff_strategy in self._backoff_strategies:
backoff_time = backoff_strategy.backoff_time(
response_or_exception=response if response is not None else exc, attempt_count=self._request_attempt_count[request]
)
if backoff_time:
user_defined_backoff_time = backoff_time
break
error_message = (
error_resolution.error_message
or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
)
if custom_backoff_time:
if user_defined_backoff_time:
raise UserDefinedBackoffException(
backoff=custom_backoff_time,
backoff=user_defined_backoff_time,
request=request,
response=(response if response is not None else exc),
error_message=error_message,
Expand Down Expand Up @@ -246,6 +301,7 @@ def send_request(
json: Optional[Mapping[str, Any]] = None,
data: Optional[Union[str, Mapping[str, Any]]] = None,
dedupe_query_params: bool = False,
log_formatter: Optional[Callable[[requests.Response], Any]] = None,
) -> Tuple[requests.PreparedRequest, requests.Response]:
"""
Prepares and sends request and return request and response objects.
Expand All @@ -255,6 +311,6 @@ def send_request(
http_method=http_method, url=url, dedupe_query_params=dedupe_query_params, headers=headers, params=params, json=json, data=data
)

response: requests.Response = self._send_with_retry(request=request, request_kwargs=request_kwargs)
response: requests.Response = self._send_with_retry(request=request, request_kwargs=request_kwargs, log_formatter=log_formatter)

return request, response
Loading

0 comments on commit 18e82d9

Please sign in to comment.