Skip to content

Commit

Permalink
🐛 Source Shopify: Allow the known HTTP errors to be retried more th…
Browse files Browse the repository at this point in the history
…an once for the BULK streams (#37589)
  • Loading branch information
bazarnov committed May 2, 2024
1 parent 8665eaf commit d34b521
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 325 deletions.
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.0.7
dockerImageTag: 2.0.8
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.0.7"
version = "2.0.8"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down

Large diffs are not rendered by default.

@@ -0,0 +1,50 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from functools import wraps
from time import sleep
from typing import Any, Callable, Final, Optional, Tuple, Type

from airbyte_cdk import AirbyteLogger

from .exceptions import ShopifyBulkExceptions

BULK_RETRY_ERRORS: Final[Tuple] = (
ShopifyBulkExceptions.BulkJobBadResponse,
ShopifyBulkExceptions.BulkJobUnknownError,
)


def bulk_retry_on_exception(logger: AirbyteLogger, more_exceptions: Optional[Tuple[Type[Exception], ...]] = None) -> Callable:
"""
A decorator to retry a function when specified exceptions are raised.
:param logger: Number of times to retry.
:param more_exceptions: A tuple of exception types to catch.
"""

def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(self, *args, **kwargs) -> Any:
# mandatory class attributes
max_retries = self._job_max_retries
stream_name = self.stream_name
backoff_time = self._job_backoff_time

current_retries = 0
while True:
try:
return func(self, *args, **kwargs)
except BULK_RETRY_ERRORS or more_exceptions as ex:
current_retries += 1
if current_retries > max_retries:
logger.error("Exceeded retry limit. Giving up.")
raise
else:
logger.warning(
f"Stream `{stream_name}`: {ex}. Retrying {current_retries}/{max_retries} after {backoff_time} seconds."
)
sleep(backoff_time)

return wrapper

return decorator
@@ -0,0 +1,14 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from enum import Enum


class ShopifyBulkJobStatus(Enum):
CREATED = "CREATED"
CANCELED = "CANCELED"
CANCELING = "CANCELING"
COMPLETED = "COMPLETED"
RUNNING = "RUNNING"
FAILED = "FAILED"
TIMEOUT = "TIMEOUT"
ACCESS_DENIED = "ACCESS_DENIED"
Expand Up @@ -642,7 +642,7 @@ def __init__(self, config: Dict) -> None:
# define BULK Manager instance
self.job_manager: ShopifyBulkManager = ShopifyBulkManager(
session=self._session,
base_url=f"{self.url_base}/{self.path()}",
base_url=f"{self.url_base}{self.path()}",
stream_name=self.name,
)
# overide the default job slice size, if provided (it's auto-adjusted, later on)
Expand Down Expand Up @@ -748,7 +748,7 @@ def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Un
return self.config.get("start_date")

def emit_slice_message(self, slice_start: datetime, slice_end: datetime) -> None:
slice_size_message = f"Slice size: `P{round(self.job_manager.job_size, 1)}D`"
slice_size_message = f"Slice size: `P{round(self.job_manager._job_size, 1)}D`"
self.logger.info(f"Stream: `{self.name}` requesting BULK Job for period: {slice_start} -- {slice_end}. {slice_size_message}")

@stream_state_cache.cache_stream_state
Expand All @@ -773,8 +773,10 @@ def process_bulk_results(
response: requests.Response,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
# get results fetched from COMPLETED BULK Job or `None`
filename = self.job_manager.job_check(response)
# process the CREATED Job prior to other actions
self.job_manager.job_process_created(response)
# get results fetched from COMPLETED BULK Job
filename = self.job_manager.job_check_for_completion()
# the `filename` could be `None`, meaning there are no data available for the slice period.
if filename:
# add `shop_url` field to each record produced
Expand Down
Expand Up @@ -86,6 +86,7 @@ class ShopifyRateLimiter:
"""

on_unknown_load: float = 1.0
on_very_low_load: float = 0.0
on_low_load: float = 0.2
on_mid_load: float = 1.5
on_high_load: float = 5.0
Expand Down Expand Up @@ -122,20 +123,26 @@ def _convert_load_to_time(load: Optional[float], threshold: float) -> float:
:: wait_time - time to wait between each request in seconds
"""
mid_load = threshold / 2 # average load based on threshold

half_of_threshold = threshold / 2 # average load based on threshold
quarter_of_threshold = threshold / 4 # low load based on threshold

if not load:
# when there is no rate_limits from header, use the `sleep_on_unknown_load`
wait_time = ShopifyRateLimiter.on_unknown_load
ShopifyRateLimiter.log_message_counter("API Load: `REGULAR`")
elif load >= threshold:
elif threshold <= load:
wait_time = ShopifyRateLimiter.on_high_load
ShopifyRateLimiter.log_message_counter("API Load: `HIGH`")
elif load >= mid_load:
elif half_of_threshold <= load < threshold:
wait_time = ShopifyRateLimiter.on_mid_load
ShopifyRateLimiter.log_message_counter("API Load: `MID`")
elif load < mid_load:
elif quarter_of_threshold <= load < half_of_threshold:
wait_time = ShopifyRateLimiter.on_low_load
ShopifyRateLimiter.log_message_counter("API Load: `LOW`")
elif load < quarter_of_threshold:
wait_time = ShopifyRateLimiter.on_very_low_load

return wait_time

@staticmethod
Expand Down Expand Up @@ -219,22 +226,6 @@ def get_graphql_api_wait_time(*args, threshold: float = 0.9) -> float:
wait_time = ShopifyRateLimiter._convert_load_to_time(load, threshold)
return wait_time

def _debug_info(*args) -> Any:
# find the requests.Response inside args list
response = ShopifyRateLimiter.get_response_from_args(*args)

if response:
try:
content = response.json()
content_keys = list(content.keys())
stream_name = content_keys[0] if len(content_keys) > 0 else None
content_lengh = len(content.get(stream_name, [])) if stream_name else None
debug_info = {"stream": stream_name, "url": response.request.url, "n_records": content_lengh}
return debug_info
except (requests.JSONDecodeError, Exception):
# bypassing the errors, we don't care about it here
pass

@staticmethod
def wait_time(wait_time: float) -> None:
return sleep(wait_time)
Expand Down

0 comments on commit d34b521

Please sign in to comment.