diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index bdccf22eaa19..49479c5cc78e 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -4,7 +4,7 @@ import logging import os -from datetime import datetime, timedelta +from datetime import timedelta from typing import Any, List, Mapping, MutableMapping, Optional, Tuple import pendulum @@ -45,6 +45,10 @@ _MAX_CONCURRENCY = 20 _DEFAULT_CONCURRENCY = 10 _CACHE_DISABLED = os.environ.get("CACHE_DISABLED") +_REFUND_STREAM_NAME = "refunds" +_INCREMENTAL_CONCURRENCY_EXCLUSION = { + _REFUND_STREAM_NAME, # excluded because of the upcoming changes in terms of cursor https://github.com/airbytehq/airbyte/issues/34332 +} USE_CACHE = not _CACHE_DISABLED STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_" @@ -54,6 +58,7 @@ class SourceStripe(ConcurrentSourceAdapter): message_repository = InMemoryMessageRepository(entrypoint_logger.level) _SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION = { Events: ("created[gte]", "created[lte]"), + CreatedCursorIncrementalStripeStream: ("created[gte]", "created[lte]"), } def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs): @@ -292,7 +297,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]: # The Refunds stream does not utilize the Events API as it created issues with data loss during the incremental syncs. # Therefore, we're using the regular API with the `created` cursor field. A bug has been filed with Stripe. # See more at https://github.com/airbytehq/oncall/issues/3090, https://github.com/airbytehq/oncall/issues/3428 - CreatedCursorIncrementalStripeStream(name="refunds", path="refunds", **incremental_args), + CreatedCursorIncrementalStripeStream(name=_REFUND_STREAM_NAME, path="refunds", **incremental_args), UpdatedCursorIncrementalStripeStream( name="payment_methods", path="payment_methods", @@ -523,7 +528,7 @@ def _to_concurrent(self, stream: Stream, fallback_start, state_manager: Connecto state = state_manager.get_stream_state(stream.name, stream.namespace) slice_boundary_fields = self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream)) - if slice_boundary_fields: + if slice_boundary_fields and stream.name not in _INCREMENTAL_CONCURRENCY_EXCLUSION: cursor_field = CursorField(stream.cursor_field) if isinstance(stream.cursor_field, str) else CursorField(stream.cursor_field[0]) converter = EpochValueConcurrentStreamStateConverter() cursor = ConcurrentCursor(