diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 73a747f02..fa9264843 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -280,20 +280,15 @@ def set_initial_state(self, stream_state: StreamState) -> None: parent_state = stream_state.get("parent_state", {}) - # If `parent_state` doesn't exist and at least one parent stream has an incremental dependency, - # copy the child state to parent streams with incremental dependencies. - incremental_dependency = any( - [parent_config.incremental_dependency for parent_config in self.parent_stream_configs] - ) - if not parent_state and not incremental_dependency: - return - - if not parent_state and incremental_dependency: - # Migrate child state to parent state format - parent_state = self._migrate_child_state_to_parent_state(stream_state) - # Set state for each parent stream with an incremental dependency for parent_config in self.parent_stream_configs: + if ( + not parent_state.get(parent_config.stream.name, {}) + and parent_config.incremental_dependency + ): + # Migrate child state to parent state format + parent_state = self._migrate_child_state_to_parent_state(stream_state) + if parent_config.incremental_dependency: parent_config.stream.state = parent_state.get(parent_config.stream.name, {})