diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 040ac4689..f271c68bb 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2208,6 +2208,14 @@ def _build_concurrent_cursor( and stream_slicer and not isinstance(stream_slicer, SinglePartitionRouter) ): + if isinstance(model.incremental_sync, IncrementingCountCursorModel): + # We don't currently support usage of partition routing and IncrementingCountCursor at the + # same time because we didn't solve for design questions like what the lookback window would + # be as well as global cursor fall backs. We have not seen customers that have needed both + # at the same time yet and are currently punting on this until we need to solve it. + raise ValueError( + f"The low-code framework does not currently support usage of a PartitionRouter and an IncrementingCountCursor at the same time. Please specify only one of these options for stream {stream_name}." + ) return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing state_manager=self._connector_state_manager, model_type=DatetimeBasedCursorModel, diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 4825a9a3f..743be4a7b 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3669,6 +3669,58 @@ def test_create_concurrent_cursor_from_perpartition_cursor_runs_state_migrations ) +def test_incrementing_count_cursor_with_partition_router_raises_error(): + content = """ + type: DeclarativeStream + primary_key: "id" + name: test + schema_loader: + type: InlineSchemaLoader + schema: + $schema: "http://json-schema.org/draft-07/schema" + type: object + properties: + id: + type: string + incremental_sync: + type: "IncrementingCountCursor" + cursor_field: "mid" + start_value: "0" + retriever: + type: SimpleRetriever + name: test + requester: + type: HttpRequester + name: "test" + url_base: "https://api.test.com/v3/" + http_method: "GET" + authenticator: + type: NoAuth + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + partition_router: + type: ListPartitionRouter + cursor_field: arbitrary + values: + - item_1 + - item_2 + """ + + factory = ModelToComponentFactory( + emit_connector_builder_messages=True, connector_state_manager=ConnectorStateManager() + ) + + with pytest.raises(ValueError): + factory.create_component( + model_type=DeclarativeStreamModel, + component_definition=YamlDeclarativeSource._parse(content), + config=input_config, + ) + + def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined(): """ Validates a special case for when the start_time.datetime_format and end_time.datetime_format are defined, the date to