diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index ef666dc51..781bb64d1 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -169,6 +169,7 @@ def __init__( component_factory = ModelToComponentFactory( emit_connector_builder_messages=emit_connector_builder_messages, message_repository=ConcurrentMessageRepository(queue, message_repository), + configured_catalog=catalog, connector_state_manager=self._connector_state_manager, max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a77bc34ea..f834ca20e 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2520,6 +2520,34 @@ definitions: type: type: string enum: [JsonlDecoder] + JsonSchemaPropertySelector: + title: Json Schema Property Selector + description: When configured, the JSON schema supplied in the catalog containing which columns are selected for the current stream will be used to reduce which query properties will be included in the outbound API request. This can improve the performance of API requests, especially for those requiring multiple requests to get a complete record. + type: object + required: + - type + properties: + type: + type: string + enum: [JsonSchemaPropertySelector] + transformations: + title: Transformations + description: A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests. + linkable: true + type: array + items: + anyOf: + - "$ref": "#/definitions/AddFields" + - "$ref": "#/definitions/RemoveFields" + - "$ref": "#/definitions/KeysToLower" + - "$ref": "#/definitions/KeysToSnakeCase" + - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/DpathFlattenFields" + - "$ref": "#/definitions/KeysReplace" + - "$ref": "#/definitions/CustomTransformation" + $parameters: + type: object + additionalProperties: true KeysToLower: title: Keys to Lower Case description: A transformation that renames all keys to lower case. @@ -3410,6 +3438,10 @@ definitions: title: Property Chunking description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request. "$ref": "#/definitions/PropertyChunking" + property_selector: + title: Property Selector + description: Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request. + "$ref": "#/definitions/JsonSchemaPropertySelector" $parameters: type: object additionalProperties: true @@ -3746,7 +3778,7 @@ definitions: properties: type: type: string - enum: [ PaginationReset ] + enum: [PaginationReset] action: type: string enum: @@ -3763,7 +3795,7 @@ definitions: properties: type: type: string - enum: [ PaginationResetLimits ] + enum: [PaginationResetLimits] number_of_records: type: integer GzipDecoder: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 915f942d0..35186ef71 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -2029,6 +2031,29 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel): ) +class JsonSchemaPropertySelector(BaseModel): + type: Literal["JsonSchemaPropertySelector"] + transformations: Optional[ + List[ + Union[ + AddFields, + RemoveFields, + KeysToLower, + KeysToSnakeCase, + FlattenFields, + DpathFlattenFields, + KeysReplace, + CustomTransformation, + ] + ] + ] = Field( + None, + description="A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.", + title="Transformations", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ListPartitionRouter(BaseModel): type: Literal["ListPartitionRouter"] cursor_field: str = Field( @@ -2799,6 +2824,11 @@ class QueryProperties(BaseModel): description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.", title="Property Chunking", ) + property_selector: Optional[JsonSchemaPropertySelector] = Field( + None, + description="Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.", + title="Property Selector", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b674b5f90..4809f5151 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -26,6 +26,7 @@ get_type_hints, ) +from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream from isodate import parse_duration from pydantic.v1 import BaseModel from requests import Response @@ -42,6 +43,7 @@ AirbyteStateMessage, AirbyteStateType, AirbyteStreamState, + ConfiguredAirbyteCatalog, FailureType, Level, StreamDescriptor, @@ -314,6 +316,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonSchemaPropertySelector as JsonSchemaPropertySelectorModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -501,6 +506,9 @@ from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + JsonSchemaPropertySelector, +) from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import ( GroupByKey, ) @@ -668,6 +676,7 @@ def __init__( message_repository: Optional[MessageRepository] = None, connector_state_manager: Optional[ConnectorStateManager] = None, max_concurrent_async_job_count: Optional[int] = None, + configured_catalog: Optional[ConfiguredAirbyteCatalog] = None, ): self._init_mappings() self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice @@ -678,6 +687,9 @@ def __init__( self._message_repository = message_repository or InMemoryMessageRepository( self._evaluate_log_level(emit_connector_builder_messages) ) + self._stream_name_to_configured_stream = self._create_stream_name_to_configured_stream( + configured_catalog + ) self._connector_state_manager = connector_state_manager or ConnectorStateManager() self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1) @@ -734,6 +746,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector, GzipDecoderModel: self.create_gzip_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, @@ -796,6 +809,16 @@ def _init_mappings(self) -> None: # Needed for the case where we need to perform a second parse on the fields of a custom component self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR} + @staticmethod + def _create_stream_name_to_configured_stream( + configured_catalog: Optional[ConfiguredAirbyteCatalog], + ) -> Mapping[str, ConfiguredAirbyteStream]: + return ( + {stream.stream.name: stream for stream in configured_catalog.streams} + if configured_catalog + else {} + ) + def create_component( self, model_type: Type[BaseModel], @@ -2987,7 +3010,7 @@ def create_property_chunking( ) def create_query_properties( - self, model: QueryPropertiesModel, config: Config, **kwargs: Any + self, model: QueryPropertiesModel, config: Config, *, stream_name: str, **kwargs: Any ) -> QueryProperties: if isinstance(model.property_list, list): property_list = model.property_list @@ -3004,10 +3027,43 @@ def create_query_properties( else None ) + property_selector = ( + self._create_component_from_model( + model=model.property_selector, config=config, stream_name=stream_name, **kwargs + ) + if model.property_selector + else None + ) + return QueryProperties( property_list=property_list, always_include_properties=model.always_include_properties, property_chunking=property_chunking, + property_selector=property_selector, + config=config, + parameters=model.parameters or {}, + ) + + def create_json_schema_property_selector( + self, + model: JsonSchemaPropertySelectorModel, + config: Config, + *, + stream_name: str, + **kwargs: Any, + ) -> JsonSchemaPropertySelector: + configured_stream = self._stream_name_to_configured_stream.get(stream_name) + + transformations = [] + if model.transformations: + for transformation_model in model.transformations: + transformations.append( + self._create_component_from_model(model=transformation_model, config=config) + ) + + return JsonSchemaPropertySelector( + configured_stream=configured_stream, + properties_transformations=transformations, config=config, parameters=model.parameters or {}, ) @@ -3235,7 +3291,7 @@ def _get_url(req: Requester) -> str: if len(query_properties_definitions) == 1: query_properties = self._create_component_from_model( - model=query_properties_definitions[0], config=config + model=query_properties_definitions[0], stream_name=name, config=config ) # Removes QueryProperties components from the interpolated mappings because it has been designed @@ -3261,11 +3317,13 @@ def _get_url(req: Requester) -> str: query_properties = self.create_query_properties( model=query_properties_definition, + stream_name=name, config=config, ) elif hasattr(model.requester, "query_properties") and model.requester.query_properties: query_properties = self.create_query_properties( model=model.requester.query_properties, + stream_name=name, config=config, ) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py index 06ade8253..0a3400df8 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -2,7 +2,7 @@ from dataclasses import InitVar, dataclass from enum import Enum -from typing import Any, Iterable, List, Mapping, Optional +from typing import Any, Iterable, List, Mapping, Optional, Set from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( @@ -40,7 +40,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: ) def get_request_property_chunks( - self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] + self, + property_fields: Iterable[str], + always_include_properties: Optional[List[str]], + configured_properties: Optional[Set[str]], ) -> Iterable[List[str]]: if not self.property_limit: single_property_chunk = list(property_fields) @@ -53,6 +56,8 @@ def get_request_property_chunks( for property_field in property_fields: # If property_limit_type is not defined, we default to property_count which is just an incrementing count # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size + if configured_properties is not None and property_field not in configured_properties: + continue property_field_size = ( len(property_field) + 3 # The +3 represents the extra characters for encoding the delimiter in between properties diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/__init__.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/__init__.py new file mode 100644 index 000000000..e9dbbd8d9 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import ( + JsonSchemaPropertySelector, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import ( + PropertySelector, +) + +__all__ = ["JsonSchemaPropertySelector", "PropertySelector"] diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py new file mode 100644 index 000000000..821a4586e --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/json_schema_property_selector.py @@ -0,0 +1,54 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +import copy +from dataclasses import InitVar, dataclass, field +from typing import Any, List, Mapping, Optional, Set + +from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream + +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import ( + PropertySelector, +) +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config + + +@dataclass +class JsonSchemaPropertySelector(PropertySelector): + """ + A class that contains a list of transformations to apply to properties. + """ + + config: Config + parameters: InitVar[Mapping[str, Any]] + # For other non-read operations, there is no configured catalog and therefore no schema selection + configured_stream: Optional[ConfiguredAirbyteStream] = None + properties_transformations: List[RecordTransformation] = field(default_factory=lambda: []) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._parameters = parameters + + def select(self) -> Optional[Set[str]]: + """ + Returns the set of properties that have been selected for the configured stream. The intent being that + we should only query for selected properties not all since disabled properties are discarded. + + When configured_stream is None, then there was no incoming catalog and all fields should be retrieved. + This is different from the empty set where the json_schema was empty and no schema fields were selected. + """ + + # For CHECK/DISCOVER operations, there is no catalog and therefore no configured stream or selected + # columns. In this case we return None which is interpreted by the QueryProperties component to not + # perform any filtering of schema properties and fetch all of them + if self.configured_stream is None: + return None + + schema_properties = copy.deepcopy( + self.configured_stream.stream.json_schema.get("properties", {}) + ) + if self.properties_transformations: + for transformation in self.properties_transformations: + transformation.transform( + record=schema_properties, + config=self.config, + ) + return set(schema_properties.keys()) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/property_selector.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/property_selector.py new file mode 100644 index 000000000..bd1d05c75 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_selector/property_selector.py @@ -0,0 +1,24 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional, Set + + +@dataclass +class PropertySelector(ABC): + """ + Describes the interface for selecting and transforming properties from a configured stream's schema + to determine which properties should be queried from the API. + """ + + @abstractmethod + def select(self) -> Optional[Set[str]]: + """ + Selects and returns the set of properties that should be queried from the API based on the + configured stream's schema and any applicable transformations. + + Returns: + Set[str]: The set of property names to query + """ + pass diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 37b26d171..424caad77 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -1,12 +1,16 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. from dataclasses import InitVar, dataclass -from typing import Any, Iterable, List, Mapping, Optional, Union +from typing import Any, Iterable, List, Mapping, Optional, Set, Union +from airbyte_cdk.models import ConfiguredAirbyteStream from airbyte_cdk.sources.declarative.requesters.query_properties import ( PropertiesFromEndpoint, PropertyChunking, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + PropertySelector, +) from airbyte_cdk.sources.types import Config, StreamSlice @@ -22,18 +26,24 @@ class QueryProperties: property_list: Optional[Union[List[str], PropertiesFromEndpoint]] always_include_properties: Optional[List[str]] property_chunking: Optional[PropertyChunking] + property_selector: Optional[PropertySelector] config: Config parameters: InitVar[Mapping[str, Any]] def get_request_property_chunks( - self, stream_slice: Optional[StreamSlice] = None + self, + stream_slice: Optional[StreamSlice] = None, ) -> Iterable[List[str]]: """ Uses the defined property_list to fetch the total set of properties dynamically or from a static list and based on the resulting properties, performs property chunking if applicable. :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object + :param configured_stream: The customer configured stream being synced which is needed to identify which + record fields to query for and emit. """ + configured_properties = self.property_selector.select() if self.property_selector else None + fields: Union[Iterable[str], List[str]] if isinstance(self.property_list, PropertiesFromEndpoint): fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice) @@ -42,7 +52,21 @@ def get_request_property_chunks( if self.property_chunking: yield from self.property_chunking.get_request_property_chunks( - property_fields=fields, always_include_properties=self.always_include_properties + property_fields=fields, + always_include_properties=self.always_include_properties, + configured_properties=configured_properties, ) else: - yield list(fields) + if configured_properties is not None: + all_fields = ( + [field for field in fields if field in configured_properties] + if configured_properties is not None + else list(fields) + ) + else: + all_fields = list(fields) + + if self.always_include_properties: + all_fields = list(self.always_include_properties) + all_fields + + yield all_fields diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 7e40dc9c1..55b956e4d 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -386,7 +386,7 @@ def _read_pages( try: if self.additional_query_properties: for properties in self.additional_query_properties.get_request_property_chunks( - stream_slice=stream_slice + stream_slice=stream_slice, ): stream_slice = StreamSlice( partition=stream_slice.partition or {}, diff --git a/unit_tests/connector_builder/test_property_chunking.py b/unit_tests/connector_builder/test_property_chunking.py index ccae4a336..88346b78d 100644 --- a/unit_tests/connector_builder/test_property_chunking.py +++ b/unit_tests/connector_builder/test_property_chunking.py @@ -186,7 +186,12 @@ "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", - "properties": {}, + "properties": { + "one": {"type": ["null", "string"]}, + "two": {"type": ["null", "string"]}, + "three": {"type": ["null", "string"]}, + "four": {"type": ["null", "string"]}, + }, }, "supported_sync_modes": ["full_refresh"], "source_defined_cursor": False, diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 903eeb14c..4463358c0 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -12,6 +12,13 @@ import freezegun import pytest import requests +from airbyte_protocol_dataclasses.models.airbyte_protocol import ( + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) from freezegun.api import FakeDatetime from pydantic.v1 import ValidationError @@ -133,6 +140,9 @@ from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + JsonSchemaPropertySelector, +) from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.declarative.requesters.request_option import ( RequestOption, @@ -156,6 +166,9 @@ ) from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy @@ -230,6 +243,28 @@ def test_create_component_type_mismatch(): factory.create_component(CheckStreamModel, manifest["check"], {}) +def test_create_component_with_configured_catalog(): + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="test", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=[SyncMode.full_refresh], + ), + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + ) + ] + ) + + factory_with_catalog = ModelToComponentFactory(configured_catalog=configured_catalog) + + assert factory_with_catalog._stream_name_to_configured_stream == { + "test": configured_catalog.streams[0] + } + + def test_full_config_stream(): content = """ decoder: @@ -1217,7 +1252,7 @@ def test_stream_with_custom_requester_and_query_properties(requests_mock): http_method: "GET" request_parameters: not_query: 1 - query: + query: type: QueryProperties property_list: - id @@ -4481,6 +4516,12 @@ def test_simple_retriever_with_query_properties(): record_merge_strategy: type: GroupByKeyMergeStrategy key: ["id"] + property_selector: + type: JsonSchemaPropertySelector + transformations: + - type: KeysReplace + old: "properties_" + new: "" analytics_stream: type: DeclarativeStream incremental_sync: @@ -4522,6 +4563,13 @@ def test_simple_retriever_with_query_properties(): ] assert query_properties.always_include_properties == ["id"] + property_selector = query_properties.property_selector + assert isinstance(property_selector, JsonSchemaPropertySelector) + assert len(property_selector.properties_transformations) == 1 + assert property_selector.properties_transformations == [ + KeysReplaceTransformation(old="properties_", new="", parameters={}) + ] + property_chunking = retriever.additional_query_properties.property_chunking assert isinstance(property_chunking, PropertyChunking) assert property_chunking.property_limit_type == PropertyLimitType.property_count diff --git a/unit_tests/sources/declarative/requesters/query_properties/property_selector/__init__.py b/unit_tests/sources/declarative/requesters/query_properties/property_selector/__init__.py new file mode 100644 index 000000000..58b636bf9 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/property_selector/__init__.py @@ -0,0 +1 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. diff --git a/unit_tests/sources/declarative/requesters/query_properties/property_selector/test_json_schema_property_selector.py b/unit_tests/sources/declarative/requesters/query_properties/property_selector/test_json_schema_property_selector.py new file mode 100644 index 000000000..12f1d8d93 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/property_selector/test_json_schema_property_selector.py @@ -0,0 +1,94 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from typing import List, Optional, Set + +import pytest +from airbyte_protocol_dataclasses.models import ( + AirbyteStream, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) + +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + JsonSchemaPropertySelector, +) +from airbyte_cdk.sources.declarative.transformations import RecordTransformation, RemoveFields +from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import ( + KeysReplaceTransformation, +) + +CONFIG = {} + + +def _create_configured_airbyte_stream(configured_properties: List[str]): + return ConfiguredAirbyteStream( + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + stream=AirbyteStream( + name="players", + namespace=None, + json_schema={ + "properties": { + property: {"type": ["null", "string"]} for property in configured_properties + } + }, + supported_sync_modes=[SyncMode.full_refresh], + ), + ) + + +@pytest.mark.parametrize( + "configured_stream, transformations, expected_properties", + [ + pytest.param( + # Test case 1: configured stream with 5 fields, RemoveFields and KeysReplaceTransformation + _create_configured_airbyte_stream( + ["id", "first_name", "last_name", "number", "team", "properties_statistics"] + ), + [ + RemoveFields(field_pointers=[["id"]], parameters={}), + KeysReplaceTransformation(old="properties_", new="", parameters={}), + ], + {"first_name", "last_name", "number", "team", "statistics"}, + id="test_select_properties_with_transformations", + ), + pytest.param( + None, + [], + None, + id="configured_stream_is_none", + ), + pytest.param( + # Test case 3: configured stream has no properties + ConfiguredAirbyteStream( + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + stream=AirbyteStream( + name="players", + namespace=None, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ), + ), + [], + set(), + id="configured_stream_no_properties_key_in_json_schema", + ), + ], +) +def test_select_properties( + configured_stream: Optional[ConfiguredAirbyteStream], + transformations: List[RecordTransformation], + expected_properties: Optional[Set[str]], +): + json_schema_property_selector = JsonSchemaPropertySelector( + configured_stream=configured_stream, + properties_transformations=transformations, + config=CONFIG, + parameters={}, + ) + + selected_properties = json_schema_property_selector.select() + + assert selected_properties == expected_properties diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py index 671cc26bc..c20658e14 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py @@ -72,6 +72,7 @@ def test_get_request_property_chunks( property_limit, expected_property_chunks, ): + configured_properties = set(property_fields) property_fields = iter(property_fields) property_chunking = PropertyChunking( property_limit_type=property_limit_type, @@ -83,7 +84,9 @@ def test_get_request_property_chunks( property_chunks = list( property_chunking.get_request_property_chunks( - property_fields=property_fields, always_include_properties=always_include_properties + property_fields=property_fields, + always_include_properties=always_include_properties, + configured_properties=configured_properties, ) ) @@ -92,6 +95,54 @@ def test_get_request_property_chunks( assert property_chunks[i] == expected_property_chunk +def test_get_request_property_chunks_empty_configured_properties(): + expected_property_chunks = [["white", "lotus"]] + + always_include_properties = ["white", "lotus"] + property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"] + configured_properties = set() + property_chunking = PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + property_chunks = list( + property_chunking.get_request_property_chunks( + property_fields=property_fields, + always_include_properties=always_include_properties, + configured_properties=configured_properties, + ) + ) + assert property_chunks == expected_property_chunks + + +def test_get_request_property_chunks_none_configured_properties(): + expected_property_chunks = [ + ["white", "lotus", "maui", "taormina"], + ["white", "lotus", "koh_samui", "saint_jean_cap_ferrat"], + ] + + always_include_properties = ["white", "lotus"] + property_fields = ["maui", "taormina", "koh_samui", "saint_jean_cap_ferrat"] + property_chunking = PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=2, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + property_chunks = list( + property_chunking.get_request_property_chunks( + property_fields=property_fields, + always_include_properties=always_include_properties, + configured_properties=None, + ) + ) + assert property_chunks == expected_property_chunks + + def test_get_merge_key(): record = Record(stream_name="test", data={"id": "0"}) property_chunking = PropertyChunking( diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py index e67383951..dc4721b07 100644 --- a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -1,7 +1,15 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. +from typing import List from unittest.mock import Mock +from airbyte_protocol_dataclasses.models import ( + AirbyteStream, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, +) + from airbyte_cdk.sources.declarative.requesters.query_properties import ( PropertiesFromEndpoint, QueryProperties, @@ -10,13 +18,38 @@ PropertyChunking, PropertyLimitType, ) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import ( + JsonSchemaPropertySelector, +) from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey +from airbyte_cdk.sources.declarative.transformations import RemoveFields from airbyte_cdk.sources.types import StreamSlice CONFIG = {} -def test_get_request_property_chunks_static_list_with_chunking(): +def _create_configured_airbyte_stream(configured_properties: List[str]): + return ConfiguredAirbyteStream( + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + stream=AirbyteStream( + name="nonary_game_01", + namespace=None, + json_schema={ + "properties": { + property: {"type": ["null", "string"]} for property in configured_properties + } + }, + supported_sync_modes=[SyncMode.full_refresh], + ), + ) + + +def test_get_request_property_chunks_static_list_with_chunking_property_selection(): + configured_airbyte_stream = _create_configured_airbyte_stream( + ["santa", "clover", "junpei", "june", "remove_me"] + ) + stream_slice = StreamSlice(cursor_slice={}, partition={}) query_properties = QueryProperties( @@ -30,6 +63,7 @@ def test_get_request_property_chunks_static_list_with_chunking(): "seven", "lotus", "nine", + "remove_me", ], always_include_properties=None, property_chunking=PropertyChunking( @@ -39,19 +73,40 @@ def test_get_request_property_chunks_static_list_with_chunking(): config=CONFIG, parameters={}, ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), config=CONFIG, parameters={}, ) property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) - assert len(property_chunks) == 3 - assert property_chunks[0] == ["ace", "snake", "santa"] - assert property_chunks[1] == ["clover", "junpei", "june"] - assert property_chunks[2] == ["seven", "lotus", "nine"] + assert len(property_chunks) == 2 + assert property_chunks[0] == ["santa", "clover", "junpei"] + assert property_chunks[1] == ["june"] def test_get_request_property_chunks_static_list_with_always_include_properties(): + configured_airbyte_stream = _create_configured_airbyte_stream( + [ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ] + ) + stream_slice = StreamSlice(cursor_slice={}, partition={}) query_properties = QueryProperties( @@ -74,6 +129,14 @@ def test_get_request_property_chunks_static_list_with_always_include_properties( config=CONFIG, parameters={}, ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), config=CONFIG, parameters={}, ) @@ -86,7 +149,110 @@ def test_get_request_property_chunks_static_list_with_always_include_properties( assert property_chunks[2] == ["zero", "seven", "lotus", "nine"] +def test_get_request_no_property_chunking_selected_properties_always_include_properties(): + configured_airbyte_stream = _create_configured_airbyte_stream( + ["santa", "clover", "junpei", "june", "remove_me"] + ) + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=["zero"], + property_chunking=None, + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 1 + assert property_chunks[0] == ["zero", "santa", "clover", "junpei", "june"] + + +def test_get_request_no_property_chunking_always_include_properties(): + configured_airbyte_stream = _create_configured_airbyte_stream( + [ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ] + ) + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=["zero"], + property_chunking=None, + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 1 + assert property_chunks[0] == [ + "zero", + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ] + + def test_get_request_property_chunks_dynamic_endpoint(): + configured_airbyte_stream = _create_configured_airbyte_stream( + ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] + ) + stream_slice = StreamSlice(cursor_slice={}, partition={}) properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint) @@ -104,6 +270,14 @@ def test_get_request_property_chunks_dynamic_endpoint(): config=CONFIG, parameters={}, ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), config=CONFIG, parameters={}, ) @@ -113,3 +287,125 @@ def test_get_request_property_chunks_dynamic_endpoint(): assert len(property_chunks) == 2 assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"] assert property_chunks[1] == ["phi", "quark", "sigma", "tenmyouji"] + + +def test_get_request_property_chunks_with_configured_catalog_static_list(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + # Simulate configured_airbyte_stream whose json_schema only enables 'luna', 'phi', 'sigma' + configured_airbyte_stream = _create_configured_airbyte_stream( + ["santa", "clover", "junpei", "june"] + ) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 2 + assert property_chunks[0] == ["santa", "clover", "junpei"] + assert property_chunks[1] == ["june"] + + +def test_get_request_property_chunks_with_configured_catalog_dynamic_endpoint(): + configured_airbyte_stream = _create_configured_airbyte_stream( + ["luna", "phi", "sigma", "remove_me"] + ) + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint) + properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter( + ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] + ) + + query_properties = QueryProperties( + property_list=properties_from_endpoint_mock, + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=5, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + property_selector=JsonSchemaPropertySelector( + configured_stream=configured_airbyte_stream, + properties_transformations=[ + RemoveFields(field_pointers=[["remove_me"]], parameters={}), + ], + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 1 + assert property_chunks[0] == ["luna", "phi", "sigma"] + + +def test_get_request_property_no_property_selection(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + property_selector=None, + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 3 + assert property_chunks[0] == ["ace", "snake", "santa"] + assert property_chunks[1] == ["clover", "junpei", "june"] + assert property_chunks[2] == ["seven", "lotus", "nine"] diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 63a375823..5caec9a34 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1096,6 +1096,7 @@ def test_simple_retriever_with_additional_query_properties(): config=config, parameters={}, ), + property_selector=None, config=config, parameters={}, ) @@ -1167,6 +1168,7 @@ def test_simple_retriever_with_additional_query_properties_but_without_property_ property_list=["first_name", "last_name", "nonary", "bracelet"], always_include_properties=[], property_chunking=None, + property_selector=None, config=config, parameters={}, ) @@ -1349,6 +1351,7 @@ def test_simple_retriever_with_additional_query_properties_single_chunk(): config=config, parameters={}, ), + property_selector=None, config=config, parameters={}, ) @@ -1508,6 +1511,7 @@ def test_simple_retriever_still_emit_records_if_no_merge_key(): config=config, parameters={}, ), + property_selector=None, config=config, parameters={}, )