Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
1d5b468
remove
maxi297 Jul 31, 2025
76ac6f7
Auto-fix lint and format issues
Jul 31, 2025
2d1e2f4
remove unused file
maxi297 Jul 31, 2025
b4a5fec
have declarative availability check support AbstractStream
maxi297 Jul 31, 2025
fc6c6b6
Auto-fix lint and format issues
Jul 31, 2025
5fe2e02
mypy
maxi297 Jul 31, 2025
1e8e968
Auto-fix lint and format issues
Jul 31, 2025
689e792
Remove RFR stuff
maxi297 Aug 1, 2025
5399436
have bland stream be instantiated as DefaultStream
maxi297 Aug 4, 2025
dff2559
fix test
maxi297 Aug 4, 2025
7dc2164
fix test, format, lint and a bit of mypy
maxi297 Aug 4, 2025
0bfbdfe
mypy
maxi297 Aug 4, 2025
0b454bb
format
maxi297 Aug 4, 2025
13c17f4
remove unused line
maxi297 Aug 4, 2025
0f36dc5
Merge branch 'main' into maxi297/remove-availability-strategy-except-…
maxi297 Aug 4, 2025
6f95ebb
Merge branch 'maxi297/remove-availability-strategy-except-for-filebas…
maxi297 Aug 4, 2025
c94892a
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
maxi297 Aug 4, 2025
fb75765
fix test
maxi297 Aug 4, 2025
c078395
lint
maxi297 Aug 4, 2025
decc557
format
maxi297 Aug 4, 2025
b8daf64
code review
maxi297 Aug 4, 2025
e8edc4b
Merge branch 'main' into maxi297/availability_strategy_to_support_abs…
maxi297 Aug 5, 2025
2bc4b30
code review
maxi297 Aug 5, 2025
98e2227
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
maxi297 Aug 5, 2025
d9d09f0
incremental without partition router as DefaultStream
maxi297 Aug 5, 2025
1af2264
refactor regarding async stuff
maxi297 Aug 5, 2025
8c771bb
partially fix mypy
maxi297 Aug 5, 2025
fb40a6b
fix mypy
maxi297 Aug 5, 2025
9175283
format
maxi297 Aug 5, 2025
1d84a49
mypy
maxi297 Aug 5, 2025
2cba5ff
fix
maxi297 Aug 5, 2025
8181c83
fix condition where we might override FinalStateCursor with null
maxi297 Aug 5, 2025
1079629
supports_file_transfer
maxi297 Aug 6, 2025
f196ea7
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 6, 2025
7f643e4
format
maxi297 Aug 6, 2025
8566607
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 6, 2025
96f15c3
Merge branch 'main' into maxi297/bland_stream_instantiated_as_default…
maxi297 Aug 11, 2025
6cb012b
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 11, 2025
11e3a35
format
maxi297 Aug 11, 2025
86909df
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 11, 2025
cee6157
[WIP still tests failing] Remove DeclarativeStream instantiation
maxi297 Aug 11, 2025
ebb4b28
more fixes for DefaultStream in Connector Builder
maxi297 Aug 11, 2025
6fef39b
mypy and format
maxi297 Aug 11, 2025
e31fed9
format broke mypy
maxi297 Aug 11, 2025
e996805
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 11, 2025
c68ae59
Merge branch 'maxi297/incremental_without_partition_router_as_default…
maxi297 Aug 11, 2025
23c9712
fix connector builder tests and format
maxi297 Aug 11, 2025
f5d1b71
Merging main into branch
maxi297 Aug 21, 2025
7b704e0
Merge branch 'main' into maxi297/remove_declarative_stream
maxi297 Aug 21, 2025
6b35969
fix failed merge
maxi297 Aug 25, 2025
fbfcfd2
adding comments based on code review
maxi297 Aug 26, 2025
8c6bd42
format
maxi297 Aug 26, 2025
35db9d7
add test for per_partition_request_option_provider
maxi297 Aug 26, 2025
f74aa7e
add comment to explain parameter not being propagated sometimes for c…
maxi297 Aug 26, 2025
76492e9
remove legacy tests that can't be maintained
maxi297 Aug 26, 2025
2135f18
re-add test
maxi297 Aug 26, 2025
03ebdc8
Merge branch 'main' into maxi297/remove_declarative_stream
brianjlai Aug 26, 2025
485502e
add slice_limit parameter to StreamSlicerPartitionGenerator so connec…
brianjlai Aug 26, 2025
1bd76e8
fix 4 of 6 test_per_partition_cursor_integration.py tests
brianjlai Aug 27, 2025
9158b67
fix 2 more tests
brianjlai Aug 27, 2025
bb424b8
whatever
brianjlai Aug 27, 2025
6c8771c
clean tests
brianjlai Aug 27, 2025
f9eb050
fix remaining tests in test_per_partition_cursor_integration
maxi297 Aug 27, 2025
cf8b084
coderabbitai code review
maxi297 Aug 27, 2025
5286442
add tests, format and lint
maxi297 Aug 27, 2025
4f46c3e
Merge branch 'main' into maxi297/remove_declarative_stream
maxi297 Aug 27, 2025
eb635b1
fix linting but break test_per_partition_cursor.py for now
maxi297 Aug 27, 2025
2b849f7
fix test_per_partition_cursor.py
maxi297 Aug 27, 2025
ace8739
remove unused code and improve tests
maxi297 Aug 28, 2025
0f99c9e
emit updated parent before last record, not after
maxi297 Aug 28, 2025
efd1040
mypy
maxi297 Aug 28, 2025
a04aead
add typing
maxi297 Aug 28, 2025
f1cf1de
Merge branch 'main' into maxi297/remove_declarative_stream
maxi297 Aug 28, 2025
c5b4837
mypy
maxi297 Aug 28, 2025
bb96293
remove comments that documented the new behavior
maxi297 Aug 29, 2025
639a734
a bit more cleanup
maxi297 Aug 29, 2025
1b4b756
more clean up
maxi297 Aug 29, 2025
c004637
remove unecessary test
maxi297 Aug 29, 2025
25ca5b8
allow for specific parameters to be passed to custom components
maxi297 Sep 2, 2025
e5ecf41
fix internal _get_url
maxi297 Sep 2, 2025
f2f1363
fix case where request option provider is stream slicer
maxi297 Sep 2, 2025
38cd657
add migration guide
maxi297 Sep 3, 2025
5bafeca
code review comment
maxi297 Sep 3, 2025
8bef9dd
format
maxi297 Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ def set_initial_state(self, stream_state: StreamState) -> None:
if "state" in stream_state:
self._state_to_migrate_from = stream_state["state"]

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)
# We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the
# Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called.
# We are still keeping this line as a comment to be explicit about the past behavior.
# self._partition_router.set_initial_state(stream_state)

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ def _group_streams(
stream_slicer=declarative_stream.retriever.stream_slicer,
slice_limit=self._limits.max_slices
if self._limits
else None, # technically not needed because create_declarative_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
else None, # technically not needed because create_default_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
)
else:
if (
Expand Down Expand Up @@ -772,7 +772,7 @@ def _group_streams(
declarative_stream.retriever.stream_slicer,
slice_limit=self._limits.max_slices
if self._limits
else None, # technically not needed because create_declarative_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
else None, # technically not needed because create_default_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
)

final_state_cursor = FinalStateCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
from datetime import timedelta
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional

from airbyte_cdk.models import (
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStateType,
AirbyteStreamState,
StreamDescriptor,
)
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
Timer,
Expand Down Expand Up @@ -48,7 +55,7 @@ class ConcurrentPerPartitionCursor(Cursor):
Manages state per partition when a stream has many partitions, preventing data loss or duplication.

Attributes:
DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000).
DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). This limit needs to be higher than the number of threads we might enqueue (which is represented by ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE). If not, we could have partitions that have been generated and submitted to the ThreadPool but got deleted from the ConcurrentPerPartitionCursor and when closing them, it will generate KeyError.

- **Partition Limitation Logic**
Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached.
Expand Down Expand Up @@ -128,6 +135,7 @@ def __init__(

# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
self._synced_some_data = False

@property
def cursor_field(self) -> CursorField:
Expand Down Expand Up @@ -168,8 +176,8 @@ def close_partition(self, partition: Partition) -> None:
with self._lock:
self._semaphore_per_partition[partition_key].acquire()
if not self._use_global_cursor:
self._cursor_per_partition[partition_key].close_partition(partition=partition)
cursor = self._cursor_per_partition[partition_key]
cursor.close_partition(partition=partition)
if (
partition_key in self._partitions_done_generating_stream_slices
and self._semaphore_per_partition[partition_key]._value == 0
Expand Down Expand Up @@ -213,8 +221,10 @@ def ensure_at_least_one_state_emitted(self) -> None:
if not any(
semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
):
self._global_cursor = self._new_global_cursor
self._lookback_window = self._timer.finish()
if self._synced_some_data:
# we only update those if we actually synced some data
self._global_cursor = self._new_global_cursor
self._lookback_window = self._timer.finish()
self._parent_state = self._partition_router.get_stream_state()
self._emit_state_message(throttle=False)

Expand Down Expand Up @@ -422,9 +432,6 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
if stream_state.get("parent_state"):
self._parent_state = stream_state["parent_state"]

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)

def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
"""
Initializes the global cursor state from the provided stream state.
Expand Down Expand Up @@ -458,6 +465,7 @@ def observe(self, record: Record) -> None:
except ValueError:
return

self._synced_some_data = True
record_cursor = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(record_cursor_value)
)
Expand Down Expand Up @@ -541,3 +549,45 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:

def limit_reached(self) -> bool:
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT

@staticmethod
def get_parent_state(
stream_state: Optional[StreamState], parent_stream_name: str
) -> Optional[AirbyteStateMessage]:
if not stream_state:
return None

if "parent_state" not in stream_state:
logger.warning(
f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state"
)
return None
elif parent_stream_name not in stream_state["parent_state"]:
logger.info(
f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}"
)
return None

return AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(parent_stream_name, None),
stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]),
),
)

@staticmethod
def get_global_state(
stream_state: Optional[StreamState], parent_stream_name: str
) -> Optional[AirbyteStateMessage]:
return (
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(parent_stream_name, None),
stream_state=AirbyteStateBlob(stream_state["state"]),
),
)
if stream_state and "state" in stream_state
else None
)
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ def set_initial_state(self, stream_state: StreamState) -> None:
# Example: {"global_state_format_key": "global_state_format_value"}
self._stream_cursor.set_initial_state(stream_state)

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)
# We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the
# Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called.
# We are still keeping this line as a comment to be explicit about the past behavior.
# self._partition_router.set_initial_state(stream_state)

def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None:
"""
Expand Down
Loading
Loading