From c4ca314cf04c3fe2608a90a00d879c4e3777f2c1 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Wed, 12 Mar 2025 16:53:59 +0100 Subject: [PATCH 1/2] Fix set_initial_state for incremental_dependency --- .../substream_partition_router.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 73a747f02..90ce0e690 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -280,20 +280,12 @@ def set_initial_state(self, stream_state: StreamState) -> None: parent_state = stream_state.get("parent_state", {}) - # If `parent_state` doesn't exist and at least one parent stream has an incremental dependency, - # copy the child state to parent streams with incremental dependencies. - incremental_dependency = any( - [parent_config.incremental_dependency for parent_config in self.parent_stream_configs] - ) - if not parent_state and not incremental_dependency: - return - - if not parent_state and incremental_dependency: - # Migrate child state to parent state format - parent_state = self._migrate_child_state_to_parent_state(stream_state) - # Set state for each parent stream with an incremental dependency for parent_config in self.parent_stream_configs: + if not parent_state.get(parent_config.stream.name, {}) and parent_config.incremental_dependency: + # Migrate child state to parent state format + parent_state = self._migrate_child_state_to_parent_state(stream_state) + if parent_config.incremental_dependency: parent_config.stream.state = parent_state.get(parent_config.stream.name, {}) From 530a78736dbd74ae2493e121e2a2bf3338b5c2ae Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Wed, 12 Mar 2025 16:05:07 +0000 Subject: [PATCH 2/2] Auto-fix lint and format issues --- .../partition_routers/substream_partition_router.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 90ce0e690..fa9264843 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -282,7 +282,10 @@ def set_initial_state(self, stream_state: StreamState) -> None: # Set state for each parent stream with an incremental dependency for parent_config in self.parent_stream_configs: - if not parent_state.get(parent_config.stream.name, {}) and parent_config.incremental_dependency: + if ( + not parent_state.get(parent_config.stream.name, {}) + and parent_config.incremental_dependency + ): # Migrate child state to parent state format parent_state = self._migrate_child_state_to_parent_state(stream_state)