diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index b62f4252f..7068105cb 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2279,12 +2279,14 @@ definitions: - FAIL - RETRY - IGNORE + - RESET_PAGINATION - RATE_LIMITED examples: - SUCCESS - FAIL - RETRY - IGNORE + - RESET_PAGINATION - RATE_LIMITED failure_type: title: Failure Type @@ -3707,6 +3709,9 @@ definitions: anyOf: - "$ref": "#/definitions/DefaultPaginator" - "$ref": "#/definitions/NoPagination" + pagination_reset: + description: Describes what triggers pagination reset and how to handle it. + "$ref": "#/definitions/PaginationReset" ignore_stream_slicer_parameters_on_paginated_requests: description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored. type: boolean @@ -3730,6 +3735,36 @@ definitions: $parameters: type: object additionalProperties: true + PaginationReset: + title: Pagination Reset + description: Describes what triggers pagination reset and how to handle it. If SPLIT_USING_CURSOR, the connector developer is accountable for ensuring that the records are returned in ascending order. + type: object + required: + - type + - action + properties: + type: + type: string + enum: [ PaginationReset ] + action: + type: string + enum: + - SPLIT_USING_CURSOR + - RESET + limits: + "$ref": "#/definitions/PaginationResetLimits" + PaginationResetLimits: + title: Pagination Reset Limits + description: Describes the limits that trigger pagination reset + type: object + required: + - type + properties: + type: + type: string + enum: [ PaginationResetLimits ] + number_of_records: + type: integer GzipDecoder: title: gzip description: Select 'gzip' for response data that is compressed with gzip. Requires specifying an inner data type/decoder to parse the decompressed data. diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 30e2c7eac..f0379368d 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -151,7 +151,7 @@ def __init__( self._connector_state_converter = connector_state_converter self._cursor_field = cursor_field - self._cursor_factory = cursor_factory + self._cursor_factory = cursor_factory # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation self._partition_router = partition_router # The dict is ordered to ensure that once the maximum number of partitions is reached, diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 34ef7a463..49b48a232 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -539,6 +539,7 @@ class Action(Enum): FAIL = "FAIL" RETRY = "RETRY" IGNORE = "IGNORE" + RESET_PAGINATION = "RESET_PAGINATION" RATE_LIMITED = "RATE_LIMITED" @@ -553,7 +554,14 @@ class HttpResponseFilter(BaseModel): action: Optional[Action] = Field( None, description="Action to execute if a response matches the filter.", - examples=["SUCCESS", "FAIL", "RETRY", "IGNORE", "RATE_LIMITED"], + examples=[ + "SUCCESS", + "FAIL", + "RETRY", + "IGNORE", + "RESET_PAGINATION", + "RATE_LIMITED", + ], title="Action", ) failure_type: Optional[FailureType] = Field( @@ -1173,6 +1181,16 @@ class LegacySessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class Action1(Enum): + SPLIT_USING_CURSOR = "SPLIT_USING_CURSOR" + RESET = "RESET" + + +class PaginationResetLimits(BaseModel): + type: Literal["PaginationResetLimits"] + number_of_records: Optional[int] = None + + class CsvDecoder(BaseModel): type: Literal["CsvDecoder"] encoding: Optional[str] = "utf-8" @@ -2054,6 +2072,12 @@ class RecordSelector(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class PaginationReset(BaseModel): + type: Literal["PaginationReset"] + action: Action1 + limits: Optional[PaginationResetLimits] = None + + class GzipDecoder(BaseModel): type: Literal["GzipDecoder"] decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder] @@ -2822,6 +2846,10 @@ class SimpleRetriever(BaseModel): None, description="Paginator component that describes how to navigate through the API's pages.", ) + pagination_reset: Optional[PaginationReset] = Field( + None, + description="Describes what triggers pagination reset and how to handle it.", + ) ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field( False, description="If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 040ac4689..8db6889ec 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -116,11 +116,15 @@ ) from airbyte_cdk.sources.declarative.models import ( CustomStateMigration, + PaginationResetLimits, ) from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( DEPRECATION_LOGS_TAG, BaseModelWithDeprecations, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + Action1 as PaginationResetActionModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddedFieldDefinition as AddedFieldDefinitionModel, ) @@ -358,6 +362,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( PageIncrement as PageIncrementModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PaginationReset as PaginationResetModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParametrizedComponentsResolver as ParametrizedComponentsResolverModel, ) @@ -529,6 +536,7 @@ LocalFileSystemFileWriter, NoopFileWriter, ) +from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker from airbyte_cdk.sources.declarative.schema import ( ComplexFieldType, DefaultSchemaLoader, @@ -644,6 +652,8 @@ # this would be a circular import MAX_SLICES = 5 +LOGGER = logging.getLogger(f"airbyte.model_to_component_factory") + class ModelToComponentFactory: EPOCH_DATETIME_FORMAT = "%s" @@ -2043,6 +2053,7 @@ def create_default_stream( if isinstance(concurrent_cursor, FinalStateCursor) else concurrent_cursor ) + retriever = self._create_component_from_model( model=model.retriever, config=config, @@ -2051,12 +2062,9 @@ def create_default_stream( request_options_provider=request_options_provider, stream_slicer=stream_slicer, partition_router=partition_router, - stop_condition_cursor=concurrent_cursor - if self._is_stop_condition_on_cursor(model) - else None, - client_side_incremental_sync={"cursor": concurrent_cursor} - if self._is_client_side_filtering_enabled(model) - else None, + has_stop_condition_cursor=self._is_stop_condition_on_cursor(model), + is_client_side_incremental_sync=self._is_client_side_filtering_enabled(model), + cursor=concurrent_cursor, transformations=transformations, file_uploader=file_uploader, incremental_sync=model.incremental_sync, @@ -3050,7 +3058,7 @@ def create_record_selector( name: str, transformations: List[RecordTransformation] | None = None, decoder: Decoder | None = None, - client_side_incremental_sync: Dict[str, Any] | None = None, + client_side_incremental_sync_cursor: Optional[Cursor] = None, file_uploader: Optional[DefaultFileUploader] = None, **kwargs: Any, ) -> RecordSelector: @@ -3066,14 +3074,14 @@ def create_record_selector( transform_before_filtering = ( False if model.transform_before_filtering is None else model.transform_before_filtering ) - if client_side_incremental_sync: + if client_side_incremental_sync_cursor: record_filter = ClientSideIncrementalRecordFilterDecorator( config=config, parameters=model.parameters, condition=model.record_filter.condition if (model.record_filter and hasattr(model.record_filter, "condition")) else None, - **client_side_incremental_sync, + cursor=client_side_incremental_sync_cursor, ) transform_before_filtering = ( True @@ -3151,8 +3159,9 @@ def create_simple_retriever( name: str, primary_key: Optional[Union[str, List[str], List[List[str]]]], request_options_provider: Optional[RequestOptionsProvider] = None, - stop_condition_cursor: Optional[Cursor] = None, - client_side_incremental_sync: Optional[Dict[str, Any]] = None, + cursor: Optional[Cursor] = None, + has_stop_condition_cursor: bool = False, + is_client_side_incremental_sync: bool = False, transformations: List[RecordTransformation], file_uploader: Optional[DefaultFileUploader] = None, incremental_sync: Optional[ @@ -3182,6 +3191,9 @@ def _get_url(req: Requester) -> str: return _url or _url_base + if cursor is None: + cursor = FinalStateCursor(name, None, self._message_repository) + decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder @@ -3193,7 +3205,7 @@ def _get_url(req: Requester) -> str: config=config, decoder=decoder, transformations=transformations, - client_side_incremental_sync=client_side_incremental_sync, + client_side_incremental_sync_cursor=cursor if is_client_side_incremental_sync else None, file_uploader=file_uploader, ) @@ -3270,7 +3282,7 @@ def _get_url(req: Requester) -> str: url_base=_get_url(requester), extractor_model=model.record_selector.extractor, decoder=decoder, - cursor_used_for_stop_condition=stop_condition_cursor or None, + cursor_used_for_stop_condition=cursor if has_stop_condition_cursor else None, ) if model.paginator else NoPagination(parameters={}) @@ -3319,6 +3331,13 @@ def _get_url(req: Requester) -> str: parameters=model.parameters or {}, ) + if ( + model.record_selector.record_filter + and model.pagination_reset + and model.pagination_reset.limits + ): + raise ValueError("PaginationResetLimits are not supported while having record filter.") + return SimpleRetriever( name=name, paginator=paginator, @@ -3332,9 +3351,40 @@ def _get_url(req: Requester) -> str: ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, additional_query_properties=query_properties, log_formatter=self._get_log_formatter(log_formatter, name), + pagination_tracker_factory=self._create_pagination_tracker_factory( + model.pagination_reset, cursor + ), parameters=model.parameters or {}, ) + def _create_pagination_tracker_factory( + self, model: Optional[PaginationResetModel], cursor: Cursor + ) -> Callable[[], PaginationTracker]: + if model is None: + return lambda: PaginationTracker() + + # Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic + cursor_factory: Callable[[], Optional[ConcurrentCursor]] = lambda: None + if model.action == PaginationResetActionModel.RESET: + # in that case, we will let cursor_factory to return None even if the stream has a cursor + pass + elif model.action == PaginationResetActionModel.SPLIT_USING_CURSOR: + if isinstance(cursor, ConcurrentCursor): + cursor_factory = lambda: cursor.copy_without_state() # type: ignore # the if condition validates that it is a ConcurrentCursor + elif isinstance(cursor, ConcurrentPerPartitionCursor): + cursor_factory = lambda: cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here + {}, datetime.timedelta(0) + ) + elif not isinstance(cursor, FinalStateCursor): + LOGGER.warning( + "Unknown cursor for PaginationTracker. Pagination resets might not work properly" + ) + else: + raise ValueError(f"Unknown PaginationReset action: {model.action}") + + limit = model.limits.number_of_records if model and model.limits else None + return lambda: PaginationTracker(cursor_factory(), limit) + def _get_log_formatter( self, log_formatter: Callable[[Response], Any] | None, name: str ) -> Callable[[Response], Any] | None: diff --git a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py index bb60f2a96..c09f18727 100644 --- a/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py +++ b/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py @@ -66,14 +66,14 @@ def interpret_response( if not isinstance(matched_error_resolution, ErrorResolution): continue - if matched_error_resolution.response_action == ResponseAction.SUCCESS: + if matched_error_resolution.response_action in [ + ResponseAction.SUCCESS, + ResponseAction.RETRY, + ResponseAction.IGNORE, + ResponseAction.RESET_PAGINATION, + ]: return matched_error_resolution - if ( - matched_error_resolution.response_action == ResponseAction.RETRY - or matched_error_resolution.response_action == ResponseAction.IGNORE - ): - return matched_error_resolution if matched_error_resolution: return matched_error_resolution diff --git a/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py b/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py new file mode 100644 index 000000000..a858c04fb --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py @@ -0,0 +1,64 @@ +from typing import Optional + +from airbyte_cdk.sources.declarative.models import FailureType +from airbyte_cdk.sources.declarative.types import Record, StreamSlice +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + + +class PaginationTracker: + _record_count: int + _number_of_attempt_with_same_slice: int + + def __init__( + self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None + ) -> None: + """ + Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all + implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` + switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate + view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only + ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. + """ + self._cursor = cursor + self._limit = max_number_of_records + self._reset() + + """ + Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will + always process the same slice. + + Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. + """ + self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 + self._number_of_attempt_with_same_slice = 0 + + def observe(self, record: Record) -> None: + self._record_count += 1 + if self._cursor: + self._cursor.observe(record) + + def has_reached_limit(self) -> bool: + return self._limit is not None and self._record_count >= self._limit + + def _reset(self) -> None: + self._record_count = 0 + + def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice: + new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice + + if new_slice == stream_slice: + self._number_of_attempt_with_same_slice += 1 + if ( + self._number_of_attempt_with_same_slice + >= self._allowed_number_of_attempt_with_same_slice + ): + raise AirbyteTracedException( + internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", + failure_type=FailureType.system_error, + ) + else: + self._number_of_attempt_with_same_slice = 0 + + self._reset() + return new_slice diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index ed83279de..56cb83fcd 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -3,10 +3,10 @@ # import json +import logging from collections import defaultdict from dataclasses import InitVar, dataclass, field from functools import partial -from itertools import islice from typing import ( Any, Callable, @@ -39,14 +39,20 @@ RequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.requester import Requester +from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.source import ExperimentalClassWarning +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.core import StreamData +from airbyte_cdk.sources.streams.http.pagination_reset_exception import ( + PaginationResetRequiredException, +) from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.utils.mapping_helpers import combine_mappings FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete" +LOGGER = logging.getLogger("airbyte") @dataclass @@ -92,8 +98,14 @@ class SimpleRetriever(Retriever): ignore_stream_slicer_parameters_on_paginated_requests: bool = False additional_query_properties: Optional[QueryProperties] = None log_formatter: Optional[Callable[[requests.Response], Any]] = None + pagination_tracker_factory: Callable[[], PaginationTracker] = field( + default_factory=lambda: lambda: PaginationTracker() + ) def __post_init__(self, parameters: Mapping[str, Any]) -> None: + # while changing `ModelToComponentFactory.create_simple_retriever` to accept a cursor, the sources implementing + # a CustomRetriever inheriting for SimpleRetriever needed to have the following validation added. + self.cursor = None if isinstance(self.cursor, Cursor) else self.cursor self._paginator = self.paginator or NoPagination(parameters=parameters) self._parameters = parameters self._name = ( @@ -362,90 +374,97 @@ def _read_pages( stream_state: Mapping[str, Any], stream_slice: StreamSlice, ) -> Iterable[Record]: - pagination_complete = False - initial_token = self._paginator.get_initial_token() - next_page_token: Optional[Mapping[str, Any]] = ( - {"next_page_token": initial_token} if initial_token is not None else None - ) - while not pagination_complete: - property_chunks: List[List[str]] = ( - list( - self.additional_query_properties.get_request_property_chunks( - stream_slice=stream_slice - ) - ) - if self.additional_query_properties - else [ - [] - ] # A single empty property chunk represents the case where property chunking is not configured - ) - + pagination_tracker = self.pagination_tracker_factory() + reset_pagination = False + next_page_token = self._get_initial_next_page_token() + while True: merged_records: MutableMapping[str, Any] = defaultdict(dict) last_page_size = 0 last_record: Optional[Record] = None - response: Optional[requests.Response] = None - for properties in property_chunks: - if len(properties) > 0: - stream_slice = StreamSlice( - partition=stream_slice.partition or {}, - cursor_slice=stream_slice.cursor_slice or {}, - extra_fields={"query_properties": properties}, - ) - - response = self._fetch_next_page(stream_state, stream_slice, next_page_token) - for current_record in records_generator_fn(response): - if ( - current_record - and self.additional_query_properties - and self.additional_query_properties.property_chunking + + response = None + try: + if ( + self.additional_query_properties + and self.additional_query_properties.property_chunking + ): + for properties in self.additional_query_properties.get_request_property_chunks( + stream_slice=stream_slice ): - merge_key = ( - self.additional_query_properties.property_chunking.get_merge_key( - current_record + stream_slice = StreamSlice( + partition=stream_slice.partition or {}, + cursor_slice=stream_slice.cursor_slice or {}, + extra_fields={"query_properties": properties}, + ) + response = self._fetch_next_page( + stream_state, stream_slice, next_page_token + ) + + for current_record in records_generator_fn(response): + merge_key = ( + self.additional_query_properties.property_chunking.get_merge_key( + current_record + ) ) + if merge_key: + _deep_merge(merged_records[merge_key], current_record) + else: + # We should still emit records even if the record did not have a merge key + pagination_tracker.observe(current_record) + last_page_size += 1 + last_record = current_record + yield current_record + + for merged_record in merged_records.values(): + record = Record( + data=merged_record, stream_name=self.name, associated_slice=stream_slice ) - if merge_key: - _deep_merge(merged_records[merge_key], current_record) - else: - # We should still emit records even if the record did not have a merge key - last_page_size += 1 - last_record = current_record - yield current_record - else: + pagination_tracker.observe(record) + last_page_size += 1 + last_record = record + yield record + else: + response = self._fetch_next_page(stream_state, stream_slice, next_page_token) + for current_record in records_generator_fn(response): + pagination_tracker.observe(current_record) last_page_size += 1 last_record = current_record yield current_record - - if ( - self.additional_query_properties - and self.additional_query_properties.property_chunking - ): - for merged_record in merged_records.values(): - record = Record( - data=merged_record, stream_name=self.name, associated_slice=stream_slice - ) - last_page_size += 1 - last_record = record - yield record - - if not response: - pagination_complete = True + except PaginationResetRequiredException: + reset_pagination = True + else: + if not response: + break + + if reset_pagination or pagination_tracker.has_reached_limit(): + next_page_token = self._get_initial_next_page_token() + previous_slice = stream_slice + stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice) + LOGGER.info( + f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}" + ) + reset_pagination = False else: last_page_token_value = ( next_page_token.get("next_page_token") if next_page_token else None ) next_page_token = self._next_page_token( - response=response, + response=response, # type:ignore # we are breaking from the loop on the try/else if there are no response so this should be fine last_page_size=last_page_size, last_record=last_record, last_page_token_value=last_page_token_value, ) if not next_page_token: - pagination_complete = True + break # Always return an empty generator just in case no records were ever yielded yield from [] + def _get_initial_next_page_token(self) -> Optional[Mapping[str, Any]]: + initial_token = self._paginator.get_initial_token() + next_page_token = {"next_page_token": initial_token} if initial_token is not None else None + return next_page_token + def _read_single_page( self, records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index ca63a6901..5210164dd 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -19,7 +19,7 @@ ) from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager -from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY from airbyte_cdk.sources.streams.concurrent.clamping import ClampingStrategy, NoClamping from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType, GapType @@ -41,7 +41,7 @@ class CursorField: def __init__(self, cursor_field_key: str) -> None: self.cursor_field_key = cursor_field_key - def extract_value(self, record: Record) -> CursorValueType: + def extract_value(self, record: Record) -> Any: cursor_value = record.data.get(self.cursor_field_key) if cursor_value is None: raise ValueError(f"Could not find cursor field {self.cursor_field_key} in record") @@ -136,6 +136,24 @@ class ConcurrentCursor(Cursor): _START_BOUNDARY = 0 _END_BOUNDARY = 1 + def copy_without_state(self) -> "ConcurrentCursor": + return self.__class__( + stream_name=self._stream_name, + stream_namespace=self._stream_namespace, + stream_state={}, + message_repository=NoopMessageRepository(), + connector_state_manager=ConnectorStateManager(), + connector_state_converter=self._connector_state_converter, + cursor_field=self._cursor_field, + slice_boundary_fields=self._slice_boundary_fields, + start=self._start, + end_provider=self._end_provider, + lookback_window=self._lookback_window, + slice_range=self._slice_range, + cursor_granularity=self._cursor_granularity, + clamping_strategy=self._clamping_strategy, + ) + def __init__( self, stream_name: str, @@ -174,6 +192,7 @@ def __init__( # Flag to track if the logger has been triggered (per stream) self._should_be_synced_logger_triggered = False self._clamping_strategy = clamping_strategy + self._is_ascending_order = True # A lock is required when closing a partition because updating the cursor's concurrent_state is # not thread safe. When multiple partitions are being closed by the cursor at the same time, it is @@ -245,6 +264,8 @@ def observe(self, record: Record) -> None: if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value: self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value + elif most_recent_cursor_value > cursor_value: + self._is_ascending_order = False except ValueError: self._log_for_record_without_cursor_value() @@ -516,3 +537,31 @@ def _log_for_record_without_cursor_value(self) -> None: f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced" ) self._should_be_synced_logger_triggered = True + + def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: + # In theory, we might be more flexible here meaning that it doesn't need to be in ascending order but it just + # needs to be ordered. For now though, we will only support ascending order. + if not self._is_ascending_order: + LOGGER.warning( + "Attempting to reduce slice while records are not returned in incremental order might lead to missing records" + ) + + if stream_slice in self._most_recent_cursor_value_per_partition: + return StreamSlice( + partition=stream_slice.partition, + cursor_slice={ + self._slice_boundary_fields_wrapper[ + self._START_BOUNDARY + ]: self._connector_state_converter.output_format( + self._most_recent_cursor_value_per_partition[stream_slice] + ), + self._slice_boundary_fields_wrapper[ + self._END_BOUNDARY + ]: stream_slice.cursor_slice[ + self._slice_boundary_fields_wrapper[self._END_BOUNDARY] + ], + }, + extra_fields=stream_slice.extra_fields, + ) + else: + return stream_slice diff --git a/airbyte_cdk/sources/streams/http/error_handlers/response_models.py b/airbyte_cdk/sources/streams/http/error_handlers/response_models.py index e882b89bd..7199d1982 100644 --- a/airbyte_cdk/sources/streams/http/error_handlers/response_models.py +++ b/airbyte_cdk/sources/streams/http/error_handlers/response_models.py @@ -16,6 +16,7 @@ class ResponseAction(Enum): RETRY = "RETRY" FAIL = "FAIL" IGNORE = "IGNORE" + RESET_PAGINATION = "RESET_PAGINATION" RATE_LIMITED = "RATE_LIMITED" diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index bade96c9c..e9fc5add2 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -42,6 +42,9 @@ RequestBodyException, UserDefinedBackoffException, ) +from airbyte_cdk.sources.streams.http.pagination_reset_exception import ( + PaginationResetRequiredException, +) from airbyte_cdk.sources.streams.http.rate_limiting import ( http_client_default_backoff_handler, rate_limit_default_backoff_handler, @@ -428,6 +431,9 @@ def _handle_error_resolution( if error_resolution.response_action not in self._ACTIONS_TO_RETRY_ON: self._evict_key(request) + if error_resolution.response_action == ResponseAction.RESET_PAGINATION: + raise PaginationResetRequiredException() + # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached if error_resolution.response_action == ResponseAction.RATE_LIMITED: # TODO: Update to handle with message repository when concurrent message repository is ready diff --git a/airbyte_cdk/sources/streams/http/pagination_reset_exception.py b/airbyte_cdk/sources/streams/http/pagination_reset_exception.py new file mode 100644 index 000000000..1f371c7b9 --- /dev/null +++ b/airbyte_cdk/sources/streams/http/pagination_reset_exception.py @@ -0,0 +1,2 @@ +class PaginationResetRequiredException(Exception): + pass diff --git a/airbyte_cdk/test/mock_http/response_builder.py b/airbyte_cdk/test/mock_http/response_builder.py index fd67461da..994869c72 100644 --- a/airbyte_cdk/test/mock_http/response_builder.py +++ b/airbyte_cdk/test/mock_http/response_builder.py @@ -75,6 +75,25 @@ def __str__(self) -> str: return f"NestedPath(path={self._path})" +class RootPath: + """ + Path to use when the root of the response is an array. + """ + + def write(self, template: List[Dict[str, Any]], value: List[Dict[str, Any]]) -> None: + template.extend(value) + + def update(self, template: List[Dict[str, Any]], value: List[Any]) -> None: + template.clear() + template.extend(value) + + def extract(self, template: List[Dict[str, Any]]) -> Any: + return template + + def __str__(self) -> str: + return f"RootPath" + + class PaginationStrategy(ABC): @abstractmethod def update(self, response: Dict[str, Any]) -> None: @@ -149,12 +168,14 @@ def build(self) -> Dict[str, Any]: class HttpResponseBuilder: def __init__( self, - template: Dict[str, Any], - records_path: Union[FieldPath, NestedPath], + template: Union[Dict[str, Any], List[Dict[str, Any]]], + records_path: Union[FieldPath, NestedPath, RootPath], pagination_strategy: Optional[PaginationStrategy], ): - self._response = template + _validate_path_with_response(records_path, template) + self._records: List[RecordBuilder] = [] + self._response = template self._records_path = records_path self._pagination_strategy = pagination_strategy self._status_code = 200 @@ -169,6 +190,9 @@ def with_pagination(self) -> "HttpResponseBuilder": "`pagination_strategy` was not provided and hence, fields related to the pagination can't be modified. Please provide " "`pagination_strategy` while instantiating ResponseBuilder to leverage this capability" ) + elif isinstance(self._response, List): + raise ValueError("pagination_strategy requires the response to be a dict but was list") + self._pagination_strategy.update(self._response) return self @@ -177,7 +201,7 @@ def with_status_code(self, status_code: int) -> "HttpResponseBuilder": return self def build(self) -> HttpResponse: - self._records_path.update(self._response, [record.build() for record in self._records]) + self._records_path.update(self._response, [record.build() for record in self._records]) # type: ignore # validated using _validate_path_with_response return HttpResponse(json.dumps(self._response), self._status_code) @@ -208,15 +232,16 @@ def find_binary_response(resource: str, execution_folder: str) -> bytes: def create_record_builder( response_template: Dict[str, Any], - records_path: Union[FieldPath, NestedPath], + records_path: Union[FieldPath, NestedPath, RootPath], record_id_path: Optional[Path] = None, record_cursor_path: Optional[Union[FieldPath, NestedPath]] = None, ) -> RecordBuilder: """ This will use the first record define at `records_path` as a template for the records. If more records are defined, they will be ignored """ + _validate_path_with_response(records_path, response_template) try: - record_template = records_path.extract(response_template)[0] + record_template = records_path.extract(response_template)[0] # type: ignore # validated using _validate_path_with_response if not record_template: raise ValueError( f"Could not extract any record from template at path `{records_path}`. " @@ -230,8 +255,20 @@ def create_record_builder( def create_response_builder( - response_template: Dict[str, Any], - records_path: Union[FieldPath, NestedPath], + response_template: Union[Dict[str, Any], List[Dict[str, Any]]], + records_path: Union[FieldPath, NestedPath, RootPath], pagination_strategy: Optional[PaginationStrategy] = None, ) -> HttpResponseBuilder: return HttpResponseBuilder(response_template, records_path, pagination_strategy) + + +def _validate_path_with_response( + records_path: Union[FieldPath, NestedPath, RootPath], + response_template: Union[Dict[str, Any], List[Dict[str, Any]]], +) -> None: + if isinstance(response_template, List) and not isinstance(records_path, RootPath): + raise ValueError("templates of type lists require RootPath") + elif isinstance(response_template, Dict) and not isinstance( + records_path, (FieldPath, NestedPath) + ): + raise ValueError("templates of type dict either require FieldPath or NestedPath") diff --git a/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py b/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py new file mode 100644 index 000000000..3573fb4cd --- /dev/null +++ b/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py @@ -0,0 +1,87 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# +from unittest import TestCase +from unittest.mock import Mock + +import pytest + +from airbyte_cdk.sources.declarative.models import FailureType +from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker +from airbyte_cdk.sources.declarative.types import Record, StreamSlice +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + +_A_RECORD = Record( + data={"id": 1}, + associated_slice=StreamSlice(partition={"id": 11}, cursor_slice={}), + stream_name="a_stream_name", +) +_A_STREAM_SLICE = StreamSlice(cursor_slice={"stream slice": "slice value"}, partition={}) + + +class TestPaginationTracker(TestCase): + def setUp(self) -> None: + self._cursor = Mock(spec=ConcurrentCursor) + + def test_given_cursor_when_observe_then_forward_to_cursor(self): + tracker = PaginationTracker(cursor=self._cursor) + + tracker.observe(_A_RECORD) + + self._cursor.observe.assert_called_once_with(_A_RECORD) + + def test_given_not_enough_records_when_has_reached_limit_return_false(self): + tracker = PaginationTracker(max_number_of_records=100) + tracker.observe(_A_RECORD) + assert not tracker.has_reached_limit() + + def test_given_enough_records_when_has_reached_limit_return_true(self): + tracker = PaginationTracker(max_number_of_records=2) + + tracker.observe(_A_RECORD) + tracker.observe(_A_RECORD) + + assert tracker.has_reached_limit() + + def test_given_reduce_slice_before_limit_reached_when_has_reached_limit_return_true(self): + tracker = PaginationTracker(max_number_of_records=2) + + tracker.observe(_A_RECORD) + tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE) + tracker.observe(_A_RECORD) + + assert not tracker.has_reached_limit() + + def test_given_no_cursor_when_reduce_slice_range_then_return_same_slice(self): + tracker = PaginationTracker() + original_slice = StreamSlice(partition={}, cursor_slice={}) + + result_slice = tracker.reduce_slice_range_if_possible(original_slice) + + assert result_slice == original_slice + + def test_given_no_cursor_when_reduce_slice_range_multiple_times_then_raise(self): + tracker = PaginationTracker() + original_slice = StreamSlice(partition={}, cursor_slice={}) + + tracker.reduce_slice_range_if_possible(original_slice) + with pytest.raises(AirbyteTracedException): + tracker.reduce_slice_range_if_possible(original_slice) + + def test_given_cursor_when_reduce_slice_range_then_return_cursor_stream_slice(self): + tracker = PaginationTracker(cursor=self._cursor) + self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE + + new_slice = tracker.reduce_slice_range_if_possible( + StreamSlice(partition={}, cursor_slice={}) + ) + + assert new_slice == _A_STREAM_SLICE + + def test_given_cursor_cant_reduce_slice_when_reduce_slice_range_then_raise(self): + tracker = PaginationTracker(cursor=self._cursor) + self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE + + with pytest.raises(AirbyteTracedException): + tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 9150ed43e..fda3a124b 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -24,9 +24,9 @@ ) from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth from airbyte_cdk.sources.declarative.decoders import JsonDecoder -from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordSelector +from airbyte_cdk.sources.declarative.extractors import DpathExtractor, HttpSelector, RecordSelector from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter -from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator +from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, Paginator from airbyte_cdk.sources.declarative.requesters.paginators.strategies import ( CursorPaginationStrategy, PageIncrement, @@ -40,13 +40,19 @@ PropertyLimitType, ) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType -from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod +from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester +from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator +from airbyte_cdk.sources.streams.http.pagination_reset_exception import ( + PaginationResetRequiredException, +) from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +A_RECORD_SCHEMA = {} A_SLICE_STATE = {"slice_state": "slice state value"} +A_STREAM_NAME = "stream_name" A_STREAM_SLICE = StreamSlice(cursor_slice={"stream slice": "slice value"}, partition={}) A_STREAM_STATE = {"stream state": "state value"} @@ -1446,3 +1452,115 @@ def test_simple_retriever_still_emit_records_if_no_merge_key(): assert len(actual_records) == 10 assert actual_records == expected_records + + +def test_given_requester_raise_pagination_reset_exception_when_read_records_than_reduce_slice_range_and_retry_with_new_slice(): + requester = Mock(spec=Requester) + requester.send_request.side_effect = [ + [{"id": 1}], + PaginationResetRequiredException(), + [{"id": 2}], + ] + record_selector = Mock(spec=HttpSelector) + record_selector.select_records.side_effect = [ + [{"id": 1}], + [{"id": 2}], + ] + pagination_tracker = Mock(spec=PaginationTracker) + pagination_tracker.has_reached_limit.return_value = False + paginator = _mock_paginator() + paginator.get_initial_token.return_value = 1 + paginator.next_page_token.side_effect = [ + {"next_page_token": 2}, + None, + ] + retriever = SimpleRetriever( + name=A_STREAM_NAME, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + paginator=paginator, + pagination_tracker_factory=lambda: pagination_tracker, + parameters={}, + config={}, + ) + + x = list(retriever.read_records(A_RECORD_SCHEMA, A_STREAM_SLICE)) + + assert len(x) == 2 + assert pagination_tracker.reduce_slice_range_if_possible.call_count == 1 + assert requester.send_request.call_count == 3 + assert requester.send_request.call_args_list[1].kwargs["stream_slice"] == A_STREAM_SLICE + assert requester.send_request.call_args_list[1].kwargs["next_page_token"] == { + "next_page_token": 2 + } + assert ( + requester.send_request.call_args_list[2].kwargs["stream_slice"] + == pagination_tracker.reduce_slice_range_if_possible.return_value + ) + assert requester.send_request.call_args_list[2].kwargs["next_page_token"] == { + "next_page_token": 1 + } + + +def test_given_reach_pagination_limit_after_two_pages_when_read_records_than_reduce_slice_range_and_retry_with_new_slice(): + requester = Mock(spec=Requester) + requester.send_request.side_effect = [ + [{"id": 1}], + [{"id": 2}], + [{"id": 3}], + ] + record_selector = Mock(spec=HttpSelector) + record_selector.select_records.side_effect = [ + [{"id": 1}], + [{"id": 2}], + [{"id": 3}], + ] + pagination_tracker = Mock(spec=PaginationTracker) + pagination_tracker.has_reached_limit.side_effect = [ + False, + True, + False, + ] + paginator = _mock_paginator() + paginator.get_initial_token.return_value = 1 + paginator.next_page_token.side_effect = [ + {"next_page_token": 2}, + None, + ] + retriever = SimpleRetriever( + name=A_STREAM_NAME, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + paginator=paginator, + pagination_tracker_factory=lambda: pagination_tracker, + parameters={}, + config={}, + ) + + x = list(retriever.read_records(A_RECORD_SCHEMA, A_STREAM_SLICE)) + + assert len(x) == 3 + assert pagination_tracker.reduce_slice_range_if_possible.call_count == 1 + assert requester.send_request.call_count == 3 + assert requester.send_request.call_args_list[1].kwargs["stream_slice"] == A_STREAM_SLICE + assert requester.send_request.call_args_list[1].kwargs["next_page_token"] == { + "next_page_token": 2 + } + assert ( + requester.send_request.call_args_list[2].kwargs["stream_slice"] + == pagination_tracker.reduce_slice_range_if_possible.return_value + ) + assert requester.send_request.call_args_list[2].kwargs["next_page_token"] == { + "next_page_token": 1 + } + + +def _mock_paginator(): + paginator = Mock(spec=Paginator) + paginator.get_request_params.__name__ = "get_request_params" + paginator.get_request_headers.__name__ = "get_request_headers" + paginator.get_request_body_data.__name__ = "get_request_body_data" + paginator.get_request_body_json.__name__ = "get_request_body_json" + return paginator diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 7661c7fc4..ce2e17ce5 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -4610,3 +4610,427 @@ def test_parameter_propagation_for_concurrent_cursor(): streams = source.streams({}) assert streams[0].cursor.cursor_field.cursor_field_key == cursor_field_parameter_override + + +def test_given_response_action_is_pagination_reset_when_read_then_reset_pagination(): + input_config = {} + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test", + "authenticator": {"type": "NoAuth"}, + "error_handler": { + "type": "DefaultErrorHandler", + "response_filters": [ + { + "http_codes": [400], + "action": "RESET_PAGINATION", + "failure_type": "system_error", + }, + ], + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + }, + } + + catalog = create_catalog("Test") + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=input_config, + catalog=catalog, + state=None, + ) + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest("https://example.org/test"), + [ + HttpResponse("", 400), + HttpResponse(json.dumps([{"id": 1}]), 200), + ], + ) + messages = list( + source.read(logger=source.logger, config=input_config, catalog=catalog, state=[]) + ) + + assert len(list(filter(lambda message: message.type == Type.RECORD, messages))) + + +def test_given_pagination_limit_reached_when_read_then_reset_pagination(): + input_config = {} + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test?from={{ stream_interval.start_time }}", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "pagination_reset": { + "type": "PaginationReset", + "action": "SPLIT_USING_CURSOR", + "limits": { + "type": "PaginationResetLimits", + "number_of_records": 2, + }, + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": {"datetime": "2022-01-01"}, + "end_datetime": "2023-12-31", + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d"], + "cursor_granularity": "P1D", + "step": "P1Y", + "cursor_field": "updated_at", + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + }, + } + + catalog = create_catalog("Test") + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=input_config, + catalog=catalog, + state=None, + ) + + with HttpMocker() as http_mocker: + # Slice from 2022-01-01 to 2022-12-31 + http_mocker.get( + HttpRequest("https://example.org/test?from=2022-01-01"), + HttpResponse( + json.dumps( + [{"id": 1, "updated_at": "2022-02-01"}, {"id": 2, "updated_at": "2022-03-01"}] + ), + 200, + ), + ) + http_mocker.get( + HttpRequest("https://example.org/test?from=2022-03-01"), + HttpResponse(json.dumps([{"id": 3, "updated_at": "2022-04-01"}]), 200), + ) + # Slice from 2023-01-01 to 2023-12-31 + http_mocker.get( + HttpRequest("https://example.org/test?from=2023-01-01"), + HttpResponse(json.dumps([{"id": 4, "updated_at": "2023-04-01"}]), 200), + ) + messages = list( + source.read(logger=source.logger, config=input_config, catalog=catalog, state=[]) + ) + + assert len(list(filter(lambda message: message.type == Type.RECORD, messages))) == 4 + + +def test_given_per_partition_cursor_when_read_then_reset_pagination(): + input_config = {} + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test?partition={{ stream_partition.parent_id }}&from={{ stream_interval.start_time }}", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "pagination_reset": { + "type": "PaginationReset", + "action": "SPLIT_USING_CURSOR", + "limits": { + "type": "PaginationResetLimits", + "number_of_records": 2, + }, + }, + "partition_router": { + "type": "ListPartitionRouter", + "cursor_field": "parent_id", + "values": ["1", "2"], + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": {"datetime": "2022-01-01"}, + "end_datetime": "2022-12-31", + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d"], + "cursor_granularity": "P1D", + "step": "P1Y", + "cursor_field": "updated_at", + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + }, + } + + catalog = create_catalog("Test") + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=input_config, + catalog=catalog, + state=None, + ) + + with HttpMocker() as http_mocker: + # Partition 1 + http_mocker.get( + HttpRequest("https://example.org/test?partition=1&from=2022-01-01"), + HttpResponse( + json.dumps( + [{"id": 1, "updated_at": "2022-02-01"}, {"id": 2, "updated_at": "2022-03-01"}] + ), + 200, + ), + ) + http_mocker.get( + HttpRequest("https://example.org/test?partition=1&from=2022-03-01"), + HttpResponse(json.dumps([{"id": 3, "updated_at": "2022-04-01"}]), 200), + ) + # Partition 2 + http_mocker.get( + HttpRequest("https://example.org/test?partition=2&from=2022-01-01"), + HttpResponse(json.dumps([{"id": 4, "updated_at": "2023-04-01"}]), 200), + ) + messages = list( + source.read(logger=source.logger, config=input_config, catalog=catalog, state=[]) + ) + + assert len(list(filter(lambda message: message.type == Type.RECORD, messages))) == 4 + + +def test_given_pagination_reset_action_is_reset_even_though_stream_is_incremental_when_read_then_reset_pagination(): + input_config = {} + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test?from={{ stream_interval.start_time }}", + "authenticator": {"type": "NoAuth"}, + "error_handler": { + "type": "DefaultErrorHandler", + "response_filters": [ + { + "http_codes": [400], + "action": "RESET_PAGINATION", + "failure_type": "system_error", + }, + ], + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["results"]}, + }, + "paginator": { + "type": "DefaultPaginator", + "page_token_option": {"type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response.next }}", + }, + }, + "pagination_reset": { + "type": "PaginationReset", + "action": "RESET", + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": {"datetime": "2022-01-01"}, + "end_datetime": "2022-12-31", + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d"], + "cursor_granularity": "P1D", + "step": "P1Y", + "cursor_field": "updated_at", + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + }, + } + + catalog = create_catalog("Test") + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=input_config, + catalog=catalog, + state=None, + ) + + with HttpMocker() as http_mocker: + # Slice from 2022-01-01 to 2022-12-31 + http_mocker.get( + HttpRequest("https://example.org/test?from=2022-01-01"), + HttpResponse( + json.dumps( + { + "results": [ + {"id": 1, "updated_at": "2022-02-01"}, + {"id": 2, "updated_at": "2022-03-01"}, + ], + "next": "https://example.org/test?from=2022-01-01&cursor=toto", + } + ), + 200, + ), + ) + http_mocker.get( + HttpRequest("https://example.org/test?from=2022-01-01&cursor=toto"), + [ + HttpResponse(json.dumps({}), 400), + HttpResponse(json.dumps({"results": [{"id": 3, "updated_at": "2022-04-01"}]}), 200), + ], + ) + messages = list( + source.read(logger=source.logger, config=input_config, catalog=catalog, state=[]) + ) + + assert len(list(filter(lambda message: message.type == Type.RECORD, messages))) == 5 + + +def test_given_record_selector_is_filtering_when_read_then_raise_error(): + """ + This test is here to show the limitations of pagination reset. If it starts failing, maybe we just want to delete + it. Basically, since the filtering happens before we count the number of entries, than we might not have an + accurate picture of the number of records passed through the HTTP response. + """ + input_config = {} + manifest = { + "version": "0.34.2", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Test"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "Test", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"type": "object"}, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.org", + "path": "/test?from={{ stream_interval.start_time }}", + "authenticator": {"type": "NoAuth"}, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + "record_filter": { + "type": "RecordFilter", + "condition": "{{ record['id'] != 1 }}", + }, + }, + "pagination_reset": { + "type": "PaginationReset", + "action": "SPLIT_USING_CURSOR", + "limits": { + "type": "PaginationResetLimits", + "number_of_records": 2, + }, + }, + }, + } + ], + "spec": { + "type": "Spec", + "documentation_url": "https://example.org", + "connection_specification": {}, + }, + } + + catalog = create_catalog("Test") + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=input_config, + catalog=catalog, + state=None, + ) + + with pytest.raises(ValueError): + list(source.read(logger=source.logger, config=input_config, catalog=catalog, state=[]))