Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Shopify: Allow the known HTTP errors to be retried more than once for the BULK streams #37589

Merged
merged 17 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.0.6
dockerImageTag: 2.0.7
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.0.6"
version = "2.0.7"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from functools import wraps
from time import sleep
from typing import Any, Callable, Final, Optional, Tuple, Type

from airbyte_cdk import AirbyteLogger

from .exceptions import ShopifyBulkExceptions

BULK_RETRY_ERRORS: Final[Tuple] = (
ShopifyBulkExceptions.BulkJobBadResponse,
ShopifyBulkExceptions.BulkJobUnknownError,
)


def bulk_retry_on_exception(logger: AirbyteLogger, more_exceptions: Optional[Tuple[Type[Exception], ...]] = None) -> Callable:
"""
A decorator to retry a function when specified exceptions are raised.

:param logger: Number of times to retry.
:param more_exceptions: A tuple of exception types to catch.
"""

def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(self, *args, **kwargs) -> Any:
# mandatory class attributes
max_retries = self._job_max_retries
stream_name = self.stream_name
backoff_time = self._job_backoff_time

current_retries = 0
while True:
try:
return func(self, *args, **kwargs)
except BULK_RETRY_ERRORS or more_exceptions as ex:
current_retries += 1
if current_retries > max_retries:
logger.error("Exceeded retry limit. Giving up.")
raise
else:
logger.warning(
f"Stream `{stream_name}`: {ex}. Retrying {current_retries}/{max_retries} after {backoff_time} seconds."
)
sleep(backoff_time)

return wrapper

return decorator
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from enum import Enum


class ShopifyBulkJobStatus(Enum):
CREATED = "CREATED"
CANCELED = "CANCELED"
CANCELING = "CANCELING"
COMPLETED = "COMPLETED"
RUNNING = "RUNNING"
FAILED = "FAILED"
TIMEOUT = "TIMEOUT"
ACCESS_DENIED = "ACCESS_DENIED"
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def __init__(self, config: Dict) -> None:
# define BULK Manager instance
self.job_manager: ShopifyBulkManager = ShopifyBulkManager(
session=self._session,
base_url=f"{self.url_base}/{self.path()}",
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
base_url=f"{self.url_base}{self.path()}",
stream_name=self.name,
)
# overide the default job slice size, if provided (it's auto-adjusted, later on)
Expand Down Expand Up @@ -748,7 +748,7 @@ def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Un
return self.config.get("start_date")

def emit_slice_message(self, slice_start: datetime, slice_end: datetime) -> None:
slice_size_message = f"Slice size: `P{round(self.job_manager.job_size, 1)}D`"
slice_size_message = f"Slice size: `P{round(self.job_manager._job_size, 1)}D`"
self.logger.info(f"Stream: `{self.name}` requesting BULK Job for period: {slice_start} -- {slice_end}. {slice_size_message}")

@stream_state_cache.cache_stream_state
Expand All @@ -773,8 +773,10 @@ def process_bulk_results(
response: requests.Response,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
# get results fetched from COMPLETED BULK Job or `None`
filename = self.job_manager.job_check(response)
# process the CREATED Job prior to other actions
self.job_manager.job_process_created(response)
# get results fetched from COMPLETED BULK Job
filename = self.job_manager.job_check_for_completion()
# the `filename` could be `None`, meaning there are no data available for the slice period.
if filename:
# add `shop_url` field to each record produced
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest
import requests
from source_shopify.shopify_graphql.bulk.exceptions import ShopifyBulkExceptions
from source_shopify.shopify_graphql.bulk.job import ShopifyBulkStatus
from source_shopify.shopify_graphql.bulk.status import ShopifyBulkJobStatus
from source_shopify.streams.streams import (
Collections,
CustomerAddress,
Expand All @@ -31,7 +31,7 @@ def test_check_for_errors(request, requests_mock, bulk_job_response, expected_le
stream = MetafieldOrders(auth_config)
requests_mock.get(stream.job_manager.base_url, json=request.getfixturevalue(bulk_job_response))
test_response = requests.get(stream.job_manager.base_url)
test_errors = stream.job_manager.job_check_for_errors(test_response)
test_errors = stream.job_manager._job_check_for_errors(test_response)
assert len(test_errors) == expected_len


Expand All @@ -42,7 +42,7 @@ def test_get_errors_from_response_invalid_response(auth_config) -> None:
response.status_code = 404
response.url = "https://example.com/invalid"
with pytest.raises(ShopifyBulkExceptions.BulkJobBadResponse) as error:
stream.job_manager.job_check_for_errors(response)
stream.job_manager._job_check_for_errors(response)
assert expected in repr(error.value)


Expand All @@ -58,29 +58,30 @@ def test_has_running_concurrent_job(request, requests_mock, bulk_job_response, a
stream = MetafieldOrders(auth_config)
requests_mock.get(stream.job_manager.base_url, json=request.getfixturevalue(bulk_job_response))
test_response = requests.get(stream.job_manager.base_url)
test_errors = stream.job_manager.job_check_for_errors(test_response)
assert stream.job_manager.has_running_concurrent_job(test_errors) == expected
test_errors = stream.job_manager._job_check_for_errors(test_response)
assert stream.job_manager._has_running_concurrent_job(test_errors) == expected


@pytest.mark.parametrize(
"bulk_job_response, expected",
[
("bulk_successful_response", "gid://shopify/BulkOperation/4046733967549"),
("bulk_error", None),
("bulk_successful_response_with_no_id", None),
],
)
def test_job_get_id(request, requests_mock, bulk_job_response, auth_config, expected) -> None:
def test_job_process_created(request, requests_mock, bulk_job_response, auth_config, expected) -> None:
stream = MetafieldOrders(auth_config)
requests_mock.get(stream.job_manager.base_url, json=request.getfixturevalue(bulk_job_response))
test_response = requests.get(stream.job_manager.base_url)
assert stream.job_manager.job_get_id(test_response) == expected
# process the job with id (typically CREATED one)
stream.job_manager.job_process_created(test_response)
assert stream.job_manager._job_id == expected


def test_job_state_completed(auth_config) -> None:
stream = MetafieldOrders(auth_config)
stream.job_manager.job_state = ShopifyBulkStatus.COMPLETED.value
assert stream.job_manager.job_completed() == True
stream.job_manager._job_state = ShopifyBulkJobStatus.COMPLETED.value
assert stream.job_manager._job_completed() == True


@pytest.mark.parametrize(
Expand All @@ -104,17 +105,21 @@ def test_job_state_completed(auth_config) -> None:
def test_job_retry_on_concurrency(request, requests_mock, bulk_job_response, concurrent_max_retry, error_type, auth_config, expected) -> None:
stream = MetafieldOrders(auth_config)
# patching concurent settings
stream.job_manager.concurrent_max_retry = concurrent_max_retry
stream.job_manager.concurrent_interval_sec = 1
stream.job_manager._concurrent_max_retry = concurrent_max_retry
stream.job_manager._concurrent_interval = 1

requests_mock.get(stream.job_manager.base_url, json=request.getfixturevalue(bulk_job_response))
stream.job_manager._request = requests.get(stream.job_manager.base_url).request

if error_type:
with pytest.raises(error_type) as error:
stream.job_manager.job_retry_on_concurrency(requests.get(stream.job_manager.base_url).request)
assert expected in repr(error.value)
stream.job_manager._job_retry_on_concurrency()
assert expected in repr(error.value) and requests_mock.call_count == 2
else:
result = stream.job_manager.job_retry_on_concurrency(requests.get(stream.job_manager.base_url).request)
assert stream.job_manager.job_get_id(result) == expected

# simulate the real job_id from created job
stream.job_manager._job_id = expected
stream.job_manager._job_retry_on_concurrency()
assert requests_mock.call_count == 2


@pytest.mark.parametrize(
Expand All @@ -132,33 +137,28 @@ def test_job_retry_on_concurrency(request, requests_mock, bulk_job_response, con
"access_denied",
],
)
def test_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
def test_job_check_for_completion(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
stream = MetafieldOrders(auth_config)
# modify the sleep time for the test
stream.job_manager.concurrent_max_retry = 1
stream.job_manager.concurrent_interval_sec = 1
stream.job_manager.job_check_interval_sec = 1
# get job_id from FIXTURE
job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id")
# patching the method to get the right ID checks
if job_id:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
stream.job_manager._concurrent_max_retry = 1
stream.job_manager._concurrent_interval = 1
stream.job_manager._job_check_interval = 1
# mocking the response for STATUS CHECKS
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
test_job_status_response = requests.post(stream.job_manager.base_url)
job_result_url = test_job_status_response.json().get("data", {}).get("node", {}).get("url")
if error_type:
with pytest.raises(error_type) as error:
stream.job_manager.job_check(test_job_status_response)
stream.job_manager.job_check_for_completion()
assert expected in repr(error.value)
else:
if job_result_url:
# mocking the nested request call to retrieve the data from result URL
requests_mock.get(job_result_url, json=request.getfixturevalue(job_response))
result = stream.job_manager.job_check(test_job_status_response)
result = stream.job_manager.job_check_for_completion()
assert expected == result



@pytest.mark.parametrize(
"job_response, error_type, expected",
[
Expand All @@ -167,38 +167,43 @@ def test_job_check(mocker, request, requests_mock, job_response, auth_config, er
ShopifyBulkExceptions.BulkJobUnknownError,
"Could not validate the status of the BULK Job",
),
(
"bulk_successful_response_with_errors",
ShopifyBulkExceptions.BulkJobBadResponse,
"Couldn't check the `response` for `errors`",
),
],
ids=[
"success with errors (edge)",
"BulkJobUnknownError",
"BulkJobBadResponse",
],
)
def test_one_time_retry_job_check(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
def test_retry_on_job_exception(mocker, request, requests_mock, job_response, auth_config, error_type, expected) -> None:
stream = MetafieldOrders(auth_config)
# modify the sleep time for the test
stream.job_manager.concurrent_max_retry = 1
stream.job_manager.concurrent_interval_sec = 1
stream.job_manager.job_check_interval_sec = 1
# get job_id from FIXTURE
job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id")
# patching the method to get the right ID checks
if job_id:
mocker.patch("source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager.job_get_id", value=job_id)
stream.job_manager._job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id")
stream.job_manager._job_backoff_time = 0
stream.job_manager._job_max_retries = 2
# making the check method to raise the specific error
mocker.patch(
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
"source_shopify.shopify_graphql.bulk.job.ShopifyBulkManager._job_check_for_errors",
side_effect=error_type(expected),
)
# mocking the response for STATUS CHECKS
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
test_job_status_response = requests.post(stream.job_manager.base_url)
# testing raised exception and backoff
with pytest.raises(error_type) as error:
stream.job_manager.job_check(test_job_status_response)
# The retried request should FAIL here, because we stil want to see the Exception raised
# We expect the call count to be 4 due to the status checks, the non-retried request would take 2 calls.
assert expected in repr(error.value) and requests_mock.call_count == 4
stream.job_manager._job_check_state()

assert expected in repr(error.value) and requests_mock.call_count == 3


@pytest.mark.parametrize(
"job_response, expected",
[
("bulk_job_created_response", ShopifyBulkStatus.CREATED.value),
("bulk_job_running_response", ShopifyBulkStatus.RUNNING.value),
("bulk_job_running_response_without_id", ShopifyBulkStatus.RUNNING.value),
("bulk_job_created_response", ShopifyBulkJobStatus.CREATED.value),
("bulk_job_running_response", ShopifyBulkJobStatus.RUNNING.value),
("bulk_job_running_response_without_id", ShopifyBulkJobStatus.RUNNING.value),
],
ids=[
"created",
Expand All @@ -209,23 +214,23 @@ def test_one_time_retry_job_check(mocker, request, requests_mock, job_response,
def test_job_check_with_running_scenario(request, requests_mock, job_response, auth_config, expected) -> None:
stream = MetafieldOrders(auth_config)
# modify the sleep time for the test
stream.job_manager.job_check_interval_sec = 0
stream.job_manager._job_check_interval = 0
# get job_id from FIXTURE
job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id")
# mocking the response for STATUS CHECKS
requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))
test_job_status_response = requests.post(stream.job_manager.base_url)
job_result_url = test_job_status_response.json().get("data", {}).get("node", {}).get("url")
# test the state of the job isn't assigned
assert stream.job_manager.job_state == None
assert stream.job_manager._job_state == None

# mocking the nested request call to retrieve the data from result URL
stream.job_manager.job_id = job_id
stream.job_manager._job_id = job_id
requests_mock.get(job_result_url, json=request.getfixturevalue(job_response))

# calling the sceario processing
stream.job_manager.job_track_running()
assert stream.job_manager.job_state == expected
stream.job_manager._job_track_running()
assert stream.job_manager._job_state == expected



Expand Down Expand Up @@ -318,7 +323,7 @@ def test_stream_slices(
auth_config["start_date"] = "2020-01-01"

stream = stream(auth_config)
stream.job_manager.job_size = 1000
stream.job_manager._job_size = 1000
test_result = list(stream.stream_slices(stream_state=stream_state))
test_query_from_slice = test_result[0].get("query")
assert expected in test_query_from_slice
Expand Down Expand Up @@ -357,11 +362,11 @@ def test_expand_stream_slices_job_size(

# for the sake of simplicity we fake some parts to simulate the `current_job_time_elapsed`
# fake current slice interval value
stream.job_manager.job_size = previous_slice_size
stream.job_manager._job_size = previous_slice_size
# fake `last job elapsed time`
if last_job_elapsed_time:
stream.job_manager.job_last_elapsed_time = last_job_elapsed_time
stream.job_manager._job_last_elapsed_time = last_job_elapsed_time
# parsing result from completed job
list(stream.parse_response(test_bulk_response))
# check the next slice
assert stream.job_manager.job_size == adjusted_slice_size
assert stream.job_manager._job_size == adjusted_slice_size
1 change: 1 addition & 0 deletions docs/integrations/sources/shopify.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.0.7 | 2024-04-25 | [37589](https://github.com/airbytehq/airbyte/pull/37589) | Added retry for known HTTP Errors for BULK streams |
| 2.0.6 | 2024-04-22 | [37468](https://github.com/airbytehq/airbyte/pull/37468) | Fixed one time retry for `Internal Server Error` for BULK streams |
| 2.0.5 | 2024-04-03 | [36788](https://github.com/airbytehq/airbyte/pull/36788) | Added ability to dynamically adjust the size of the `slice` |
| 2.0.4 | 2024-03-22 | [36355](https://github.com/airbytehq/airbyte/pull/36355) | Update CDK version to ensure Per-Stream Error Messaging and Record Counts In State (features were already there so just upping the version) |
Expand Down
Loading