Skip to content

Commit

Permalink
🐛 Source Shopify: Make BULK Job termination threshold limit adjusta…
Browse files Browse the repository at this point in the history
…ble from `input configuration` (#40526)
  • Loading branch information
bazarnov committed Jun 26, 2024
1 parent 6b36211 commit 708edc6
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.4.5
dockerImageTag: 2.4.6
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
18 changes: 9 additions & 9 deletions airbyte-integrations/connectors/source-shopify/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-shopify/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.4.5"
version = "2.4.6"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_shopify"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.90.0"
airbyte-cdk = "^1"
sgqlc = "==16.3"
graphql-query = "^1.1.1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ class ShopifyErrorHandler(ErrorHandler):
def __init__(self, stream_name: str = "<no specified stream>") -> None:
self._stream_name = stream_name

@property
def max_retries(self) -> Optional[int]:
return 5

@property
def max_time(self) -> Optional[int]:
return 20

def interpret_response(self, response: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
if isinstance(response, TRANSIENT_EXCEPTIONS):
return ErrorResolution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

import requests
from airbyte_cdk.sources.streams.http import HttpClient
from requests.exceptions import ConnectionError, InvalidURL, JSONDecodeError, SSLError
from requests.exceptions import InvalidURL, JSONDecodeError

from .http_request import ShopifyErrorHandler
from .utils import ShopifyAccessScopesError, ShopifyBadJsonError, ShopifyConnectionError, ShopifyWrongShopNameError
from .utils import ShopifyAccessScopesError, ShopifyBadJsonError, ShopifyWrongShopNameError

SCOPES_MAPPING: Mapping[str, set[str]] = {
# SCOPE: read_customers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class ShopifyBulkManager:
base_url: str
stream_name: str
query: ShopifyBulkQuery
job_termination_threshold: float
job_size: float

# default logger
logger: Final[logging.Logger] = logging.getLogger("airbyte")
Expand Down Expand Up @@ -66,24 +68,25 @@ class ShopifyBulkManager:
_job_size_min: Final[float] = 0.1
# P365D, upper boundary for slice size
_job_size_max: Final[float] = 365.0
# dynamically adjusted slice interval
job_size: float = field(init=False, default=0.0)

# expand slice factor
_job_size_expand_factor: int = field(init=False, default=2)
# reduce slice factor
_job_size_reduce_factor: int = field(init=False, default=2)
# whether or not the slicer should revert the previous start value
_job_should_revert_slice: bool = field(init=False, default=False)

# Each job ideally should be executed within the specified time (in sec),
# to maximize the performance for multi-connection syncs and control the bulk job size within +- 1 hours (3600 sec),
# Ideally the source will balance on it's own rate, based on the time taken to return the data for the slice.
_job_max_elapsed_time: Final[float] = 2700.0
# 2 sec is set as default value to cover the case with the empty-fast-completed jobs
_job_last_elapsed_time: float = field(init=False, default=2.0)

def __post_init__(self):
self._http_client = HttpClient(self.stream_name, self.logger, ShopifyErrorHandler(), session=self.session)
self._job_size = self.job_size
# Each job ideally should be executed within the specified time (in sec),
# to maximize the performance for multi-connection syncs and control the bulk job size within +- 1 hours (3600 sec),
# Ideally the source will balance on it's own rate, based on the time taken to return the data for the slice.
# This behaviour could be overidden by providing the `BULK Job termination threshold` option in the `config`.
self._job_max_elapsed_time = self.job_termination_threshold

@property
def _tools(self) -> BulkTools:
Expand Down Expand Up @@ -141,10 +144,10 @@ def _is_long_running_job(self) -> bool:
return False

def _expand_job_size(self) -> None:
self.job_size += self._job_size_adjusted_expand_factor
self._job_size += self._job_size_adjusted_expand_factor

def _reduce_job_size(self) -> None:
self.job_size /= self._job_size_adjusted_reduce_factor
self._job_size /= self._job_size_adjusted_reduce_factor

def _job_size_reduce_next(self) -> None:
# revert the flag
Expand All @@ -162,7 +165,7 @@ def __adjust_job_size(self, job_current_elapsed_time: float) -> None:
# set the last job time
self._job_last_elapsed_time = job_current_elapsed_time
# check the job size slice interval are acceptable
self.job_size = max(self._job_size_min, min(self.job_size, self._job_size_max))
self._job_size = max(self._job_size_min, min(self._job_size, self._job_size_max))

def __reset_state(self) -> None:
# reset the job state to default
Expand Down Expand Up @@ -251,7 +254,7 @@ def _on_canceling_job(self, **kwargs) -> None:
def _on_running_job(self, **kwargs) -> None:
if self._is_long_running_job:
self.logger.info(
f"Stream: `{self.stream_name}` the BULK Job: {self._job_id} runs longer than expected. Retry with the reduced `Slice Size` after self-cancelation."
f"Stream: `{self.stream_name}` the BULK Job: {self._job_id} runs longer than expected ({self._job_max_elapsed_time} sec). Retry with the reduced `Slice Size` after self-cancelation."
)
# cancel the long-running bulk job
self._job_cancel()
Expand Down Expand Up @@ -387,10 +390,10 @@ def job_size_normalize(self, start: datetime, end: datetime) -> datetime:
# adjust slice size when it's bigger than the loop point when it should end,
# to preserve correct job size adjustments when this is the only job we need to run, based on STATE provided
requested_slice_size = (end - start).total_days()
self.job_size = requested_slice_size if requested_slice_size < self.job_size else self.job_size
self._job_size = requested_slice_size if requested_slice_size < self._job_size else self._job_size

def get_adjusted_job_start(self, slice_start: datetime) -> datetime:
step = self.job_size if self.job_size else self._job_size_min
step = self._job_size if self._job_size else self._job_size_min
return slice_start.add(days=step)

def get_adjusted_job_end(self, slice_start: datetime, slice_end: datetime) -> datetime:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@
"title": "Add `user_id` to Transactions (slower)",
"description": "Defines which API type (REST/BULK) to use to fetch `Transactions` data. If you are a `Shopify Plus` user, leave the default value to speed up the fetch.",
"default": false
},
"job_termination_threshold": {
"type": "integer",
"title": "BULK Job termination threshold",
"description": "The max time in seconds, after which the single BULK Job should be `CANCELED` and retried. The bigger the value the longer the BULK Job is allowed to run.",
"default": 3600,
"minimum": 1
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,11 +646,11 @@ def __init__(self, config: Dict) -> None:
base_url=f"{self.url_base}{self.path()}",
stream_name=self.name,
query=self.query,
job_termination_threshold=float(config.get("job_termination_threshold", 3600)),
# overide the default job slice size, if provided (it's auto-adjusted, later on)
job_size=config.get("bulk_window_in_days", 0.0),
)
# overide the default job slice size, if provided (it's auto-adjusted, later on)
self.bulk_window_in_days = config.get("bulk_window_in_days")
if self.bulk_window_in_days:
self.job_manager.job_size = self.bulk_window_in_days

# define Record Producer instance
self.record_producer: ShopifyBulkRecord = ShopifyBulkRecord(self.query)

Expand Down Expand Up @@ -729,7 +729,7 @@ def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Un
return self.config.get("start_date")

def emit_slice_message(self, slice_start: datetime, slice_end: datetime) -> None:
slice_size_message = f"Slice size: `P{round(self.job_manager.job_size, 1)}D`"
slice_size_message = f"Slice size: `P{round(self.job_manager._job_size, 1)}D`"
self.logger.info(f"Stream: `{self.name}` requesting BULK Job for period: {slice_start} -- {slice_end}. {slice_size_message}")

@stream_state_cache.cache_stream_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def test_stream_slices(
auth_config["start_date"] = "2020-01-01"

stream = stream(auth_config)
stream.job_manager.job_size = 1000
stream.job_manager._job_size = 1000
test_result = list(stream.stream_slices(stream_state=stream_state))
assert test_result[0].get("start") == expected_start

Expand Down Expand Up @@ -368,12 +368,12 @@ def test_expand_stream_slices_job_size(

# for the sake of simplicity we fake some parts to simulate the `current_job_time_elapsed`
# fake current slice interval value
stream.job_manager.job_size = previous_slice_size
stream.job_manager._job_size = previous_slice_size
# fake `last job elapsed time`
if last_job_elapsed_time:
stream.job_manager._job_last_elapsed_time = last_job_elapsed_time

first_slice = next(stream.stream_slices())
list(stream.read_records(SyncMode.incremental, stream_slice=first_slice))
# check the next slice
assert stream.job_manager.job_size == adjusted_slice_size
assert stream.job_manager._job_size == adjusted_slice_size
1 change: 1 addition & 0 deletions docs/integrations/sources/shopify.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https

| Version | Date | Pull Request | Subject |
| :------ |:-----------| :------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 2.4.6 | 2024-06-26 | [40526](https://github.com/airbytehq/airbyte/pull/40526) | Made `BULK Job termination threshold` limit adjustable from `input configuration`, increased the default value to `1 hour`. |
| 2.4.5 | 2024-06-25 | [40484](https://github.com/airbytehq/airbyte/pull/40484) | Update dependencies |
| 2.4.4 | 2024-06-19 | [39594](https://github.com/airbytehq/airbyte/pull/39594) | Extended the `Discount Codes`, `Fulfillment Orders`, `Inventory Items`, `Inventory Levels`, `Products`, `Product Variants` and `Transactions` stream schemas |
| 2.4.3 | 2024-06-06 | [38084](https://github.com/airbytehq/airbyte/pull/38084) | add resiliency on some transient errors using the HttpClient |
Expand Down

0 comments on commit 708edc6

Please sign in to comment.