Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

from typing import Optional

from airbyte_cdk.sources.declarative.models import FailureType
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -44,10 +46,21 @@ def has_reached_limit(self) -> bool:
def _reset(self) -> None:
self._record_count = 0

def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice:
new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice
def reduce_slice_range_if_possible(
self, previous_stream_slice: StreamSlice, original_stream_slice: StreamSlice
) -> StreamSlice:
"""
:param previous_stream_slice: Stream slice that was just processed (It can be the same as original_stream_slice or already reduced)
:param original_stream_slice: The original stream slice before any reduction
:return: Reduced stream slice
"""
new_slice = (
self._cursor.reduce_slice_range(original_stream_slice)
if self._cursor
else previous_stream_slice
)

if new_slice == stream_slice:
if new_slice == previous_stream_slice:
self._number_of_attempt_with_same_slice += 1
if (
self._number_of_attempt_with_same_slice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def _read_pages(
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
) -> Iterable[Record]:
original_stream_slice = stream_slice
pagination_tracker = self.pagination_tracker_factory()
reset_pagination = False
next_page_token = self._get_initial_next_page_token()
Expand Down Expand Up @@ -440,7 +441,9 @@ def _read_pages(
if reset_pagination or pagination_tracker.has_reached_limit():
next_page_token = self._get_initial_next_page_token()
previous_slice = stream_slice
stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice)
stream_slice = pagination_tracker.reduce_slice_range_if_possible(
stream_slice, original_stream_slice
)
LOGGER.info(
f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_given_reduce_slice_before_limit_reached_when_has_reached_limit_return_t
tracker = PaginationTracker(max_number_of_records=2)

tracker.observe(_A_RECORD)
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE)
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, _A_STREAM_SLICE)
tracker.observe(_A_RECORD)

assert not tracker.has_reached_limit()
Expand All @@ -57,31 +57,40 @@ def test_given_no_cursor_when_reduce_slice_range_then_return_same_slice(self):
tracker = PaginationTracker()
original_slice = StreamSlice(partition={}, cursor_slice={})

result_slice = tracker.reduce_slice_range_if_possible(original_slice)
result_slice = tracker.reduce_slice_range_if_possible(original_slice, original_slice)

assert result_slice == original_slice

def test_given_no_cursor_when_reduce_slice_range_multiple_times_then_raise(self):
tracker = PaginationTracker()
original_slice = StreamSlice(partition={}, cursor_slice={})

tracker.reduce_slice_range_if_possible(original_slice)
tracker.reduce_slice_range_if_possible(original_slice, original_slice)
with pytest.raises(AirbyteTracedException):
tracker.reduce_slice_range_if_possible(original_slice)
tracker.reduce_slice_range_if_possible(original_slice, original_slice)

def test_given_cursor_when_reduce_slice_range_then_return_cursor_stream_slice(self):
tracker = PaginationTracker(cursor=self._cursor)
self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE

new_slice = tracker.reduce_slice_range_if_possible(
StreamSlice(partition={}, cursor_slice={})
StreamSlice(partition={}, cursor_slice={}), StreamSlice(partition={}, cursor_slice={})
)

assert new_slice == _A_STREAM_SLICE

def test_given_cursor_cant_reduce_slice_when_reduce_slice_range_then_raise(self):
tracker = PaginationTracker(cursor=self._cursor)
original_slice = StreamSlice(partition={}, cursor_slice={})
self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE

with pytest.raises(AirbyteTracedException):
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE)
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, original_slice)

def test_cursor_called_with_original_slice_when_reduce_slice_range(self):
tracker = PaginationTracker(cursor=self._cursor)
original_slice = StreamSlice(partition={}, cursor_slice={})

tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, original_slice)

self._cursor.reduce_slice_range.assert_called_once_with(original_slice)
Loading