Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Shopify: Allow the known HTTP errors to be retried more than once for the BULK streams #37589

Merged
merged 17 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.0.6
dockerImageTag: 2.0.7
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.0.6"
version = "2.0.7"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class ShopifyBulkManager:
# to maximize the performance for multi-connection syncs and control the bulk job size within +- 1 hours (3600 sec),
# Ideally the source will balance on it's own rate, based on the time taken to return the data for the slice.
job_max_elapsed_time_sec: Final[float] = 1800.0
# retry max limit
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
_job_retry_on_error_max_limit: Final[int] = 6
# 0.1 ~= P2H, default value, lower boundary for slice size
job_size_min: Final[float] = 0.1
# P365D, upper boundary for slice size
Expand Down Expand Up @@ -83,6 +85,14 @@ class ShopifyBulkManager:
log_job_state_msg_count: int = field(init=False, default=0)
# one time retryable error counter
_one_time_error_retried: bool = field(init=False, default=False)
# retry counter
_job_retry_on_error_count: int = field(init=False, default=0)

# the set of retryable errors
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
_retryable_errors: Final[set] = (
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
ShopifyBulkExceptions.BulkJobBadResponse,
ShopifyBulkExceptions.BulkJobUnknownError,
)

@property
def tools(self) -> BulkTools:
Expand Down Expand Up @@ -302,6 +312,18 @@ def on_access_denied_job(self, **kwagrs) -> AirbyteTracedException:
def on_job_with_errors(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedException:
raise ShopifyBulkExceptions.BulkJobUnknownError(f"Could not validate the status of the BULK Job `{self.job_id}`. Errors: {errors}.")

def on_retryable_error(self, response: requests.Response, exception: Exception) -> Optional[requests.Response]:
if self._job_retry_on_error_count == self._job_retry_on_error_max_limit:
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
self.on_job_with_errors(self.job_check_for_errors(response))
else:
# increment the attempt
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
self._job_retry_on_error_count += 1
# retry the request
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
self.logger.info(
f"Stream: `{self.stream_name}`, retrying bad request, attempt: {self._job_retry_on_error_count}. Error: {repr(exception)}."
)
return self.job_retry_request(response.request)

def job_check_for_errors(self, response: requests.Response) -> Union[AirbyteTracedException, Iterable[Mapping[str, Any]]]:
try:
return response.json().get("errors") or response.json().get("data", {}).get("bulkOperationRunQuery", {}).get("userErrors", [])
Expand All @@ -310,23 +332,14 @@ def job_check_for_errors(self, response: requests.Response) -> Union[AirbyteTrac
f"Couldn't check the `response` for `errors`, status: {response.status_code}, response: `{response.text}`. Trace: {repr(e)}."
)

def job_one_time_retry_error(self, response: requests.Response, exception: Exception) -> Optional[requests.Response]:
if not self._one_time_error_retried:
request = response.request
self.logger.info(f"Stream: `{self.stream_name}`, retrying `Bad Request`: {request.body}. Error: {repr(exception)}.")
self._one_time_error_retried = True
return self.job_retry_request(request)
else:
self.on_job_with_errors(self.job_check_for_errors(response))

def job_track_running(self) -> Union[AirbyteTracedException, requests.Response]:
# format Job state check args
status_args = self.job_get_request_args(ShopifyBulkTemplates.status)
# re-use of `self._session(*, **)` to make BULK Job status checks
with self.session as track_running_job:
response = track_running_job.request(**status_args)
# errors check
try:
# re-use of `self._session(*, **)` to make BULK Job status checks
with self.session as track_running_job:
response = track_running_job.request(**status_args)
errors = self.job_check_for_errors(response)
if not errors:
self.job_update_state(response)
Expand All @@ -335,11 +348,8 @@ def job_track_running(self) -> Union[AirbyteTracedException, requests.Response]:
else:
# execute ERRORS scenario
self.on_job_with_errors(errors)
except (
ShopifyBulkExceptions.BulkJobBadResponse,
ShopifyBulkExceptions.BulkJobUnknownError,
) as error:
return self.job_one_time_retry_error(response, error)
except self._retryable_errors as error:
return self.on_retryable_error(response, error)
bazarnov marked this conversation as resolved.
Show resolved Hide resolved

def job_check_state(self) -> Optional[str]:
response: Optional[requests.Response] = None
Expand Down Expand Up @@ -421,11 +431,8 @@ def job_healthcheck(self, response: requests.Response) -> Optional[requests.Resp
return response if not errors else None
else:
return self.job_retry_on_concurrency(request)
except (ShopifyBulkExceptions.BulkJobBadResponse, ShopifyBulkExceptions.BulkJobUnknownError) as err:
# sometimes we face with `HTTP-500 Internal Server Error`
# we should retry such at least once
self.logger.info(f"Stream: `{self.stream_name}`, retrying Bad Request: {request.body}, error: {repr(err)}.")
return self.job_retry_request(request)
except self._retryable_errors as error:
return self.on_retryable_error(response, error)

@limiter.balance_rate_limit(api_type=ApiTypeEnum.graphql.value)
def job_check(self, created_job_response: requests.Response) -> Optional[str]:
Expand All @@ -443,7 +450,8 @@ def job_check(self, created_job_response: requests.Response) -> Optional[str]:
ShopifyBulkExceptions.BulkJobFailed,
ShopifyBulkExceptions.BulkJobTimout,
ShopifyBulkExceptions.BulkJobAccessDenied,
# this one is one-time retriable
# this one is retriable,mbut stil needs to be raised,
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
# if the max attempts value is reached.
ShopifyBulkExceptions.BulkJobUnknownError,
) as bulk_job_error:
raise bulk_job_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,30 +167,40 @@ def test_job_check(mocker, request, requests_mock, job_response, auth_config, er
ShopifyBulkExceptions.BulkJobUnknownError,
"Could not validate the status of the BULK Job",
),
(
"bulk_successful_response_with_errors",
ShopifyBulkExceptions.BulkJobBadResponse,
"Couldn't check the `response` for `errors`",
),
],
ids=[
"success with errors (edge)",
"BulkJobUnknownError",
"BulkJobBadResponse",
],
)
def test_one_time_retry_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
def test_job_retry_on_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
stream = MetafieldOrders(auth_config)
# modify the sleep time for the test
stream.job_manager.concurrent_max_retry = 1
stream.job_manager.concurrent_interval_sec = 1
stream.job_manager.job_check_interval_sec = 1
# patch the retries number for the test
stream.job_manager._job_retry_on_error_max_limit = 2
# get job_id from FIXTURE
job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id")
# patching the method to get the right ID checks
if job_id:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_check_for_errors", side_effect=error_type(expected))
# mocking the response for STATUS CHECKS
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
test_job_status_response = requests.post(stream.job_manager.base_url)
with pytest.raises(error_type) as error:
stream.job_manager.job_check(test_job_status_response)
# The retried request should FAIL here, because we stil want to see the Exception raised
# We expect the call count to be 4 due to the status checks, the non-retried request would take 2 calls.
assert expected in repr(error.value) and requests_mock.call_count == 4
# We expect the call count to be 5 due to the status checks and max retries = 2,
# the non-retried request would take 2 calls.
assert expected in repr(error.value) and requests_mock.call_count == 5


@pytest.mark.parametrize(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/shopify.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.7 | 2024-04-25 | [37589](https://github.com/airbytehq/airbyte/pull/37589) | Added retry for known HTTP Errors for BULK streams |
| 2.0.6 | 2024-04-22 | [37468](https://github.com/airbytehq/airbyte/pull/37468) | Fixed one time retry for `Internal Server Error` for BULK streams |
| 2.0.5 | 2024-04-03 | [36788](https://github.com/airbytehq/airbyte/pull/36788) | Added ability to dynamically adjust the size of the `slice` |
| 2.0.4 | 2024-03-22 | [36355](https://github.com/airbytehq/airbyte/pull/36355) | Update CDK version to ensure Per-Stream Error Messaging and Record Counts In State (features were already there so just upping the version) |
Expand Down
Loading