diff --git a/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte_cdk/sources/declarative/extractors/record_filter.py index 943068f87..c19a5b4ab 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -2,15 +2,22 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # from dataclasses import InitVar, dataclass -from typing import Any, Iterable, Mapping, Optional, Union +from typing import Any, Callable, Iterable, Mapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + RecordFilter as RecordFilterModel, +) +from airbyte_cdk.sources.declarative.parsers.component_constructor import ( + AdditionalFlags, + ComponentConstructor, +) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState @dataclass -class RecordFilter: +class RecordFilter(ComponentConstructor[RecordFilterModel]): """ Filter applied on a list of Records @@ -22,6 +29,21 @@ class RecordFilter: config: Config condition: str = "" + @classmethod + def resolve_dependencies( + cls, + model: RecordFilterModel, + config: Config, + dependency_constructor: Callable[..., Any], + additional_flags: AdditionalFlags, + **kwargs: Any, + ) -> Mapping[str, Any]: + return { + "condition": model.condition or "", + "config": config, + "parameters": model.parameters or {}, + } + def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._filter_interpolator = InterpolatedBoolean( condition=self.condition, parameters=parameters diff --git a/airbyte_cdk/sources/declarative/parsers/component_constructor.py b/airbyte_cdk/sources/declarative/parsers/component_constructor.py new file mode 100644 index 000000000..11cad49ba --- /dev/null +++ b/airbyte_cdk/sources/declarative/parsers/component_constructor.py @@ -0,0 +1,90 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from typing import Any, Callable, Generic, Mapping, Optional, Type, TypeVar + +from pydantic.v1 import BaseModel + +from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType +from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.types import Config + +M = TypeVar("M", bound=BaseModel) + + +@dataclass +class AdditionalFlags: + def __init__( + self, + emit_connector_builder_messages: bool, + disable_retries: bool, + message_repository: MessageRepository, + connector_state_manager: ConnectorStateManager, + limit_pages_fetched_per_slice: Optional[int], + limit_slices_fetched: Optional[int], + ): + self.emit_connector_builder_messages = emit_connector_builder_messages + self.disable_retries = disable_retries + self.message_repository = message_repository + self.connector_state_manager = connector_state_manager + self.limit_pages_fetched_per_slice = limit_pages_fetched_per_slice + self.limit_slices_fetched = limit_slices_fetched + + @property + def should_limit_slices_fetched(self) -> bool: + """ + Returns True if the number of slices fetched should be limited, False otherwise. + This is used to limit the number of slices fetched during tests. + """ + return bool(self.limit_slices_fetched or self.emit_connector_builder_messages) + + +@dataclass +class ComponentConstructor(Generic[M]): + @classmethod + def resolve_dependencies( + cls, + model: M, + config: Config, + dependency_constructor: Callable[..., Any], + additional_flags: AdditionalFlags, + **kwargs: Any, + ) -> Mapping[str, Any]: + """ + Resolves the component's dependencies, this method should be created in the component, + if there are any dependencies on other components, or we need to adopt / change / adjust / fine-tune + specific component's behavior. + """ + return {} + + @classmethod + def build( + cls, + model: M, + config: Config, + dependency_constructor: Callable[..., Any], + additional_flags: AdditionalFlags, + **kwargs: Any, + ) -> "ComponentConstructor[M]": + """ + Builds up the Component and it's component-specific dependencies. + Order of operations: + - build the dependencies first + - build the component with the resolved dependencies + """ + + # resolve the component dependencies first + resolved_dependencies: Mapping[str, Any] = cls.resolve_dependencies( + model=model, + config=config, + dependency_constructor=dependency_constructor, + additional_flags=additional_flags, + **kwargs, + ) + + # returns the instance of the component class, + # with resolved dependencies and model-specific arguments. + return cls(**resolved_dependencies) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4a73dced3..b68cddc94 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -19,6 +19,7 @@ MutableMapping, Optional, Type, + TypeVar, Union, cast, get_args, @@ -631,6 +632,10 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from airbyte_cdk.sources.declarative.parsers.component_constructor import ( + AdditionalFlags, + ComponentConstructor, +) ComponentDefinition = Mapping[str, Any] @@ -644,8 +649,17 @@ # this would be a circular import MAX_SLICES = 5 +M = TypeVar("M", bound=BaseModel) +D = TypeVar("D", bound=BaseModel) + class ModelToComponentFactory: + """ + The default Model > Component Factory implementation. + The Custom components are built separately from the default implementations, + to provide the reasonable decoupling from the standard and Custom implementation build technique. + """ + EPOCH_DATETIME_FORMAT = "%s" def __init__( @@ -674,8 +688,20 @@ def __init__( # placeholder for deprecation warnings self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = [] + # support the dependency constructors with the re-usable parts from this Factory + self._flags = AdditionalFlags( + emit_connector_builder_messages=self._emit_connector_builder_messages, + disable_retries=self._disable_retries, + message_repository=self._message_repository, + connector_state_manager=self._connector_state_manager, + limit_pages_fetched_per_slice=self._limit_pages_fetched_per_slice, + limit_slices_fetched=self._limit_slices_fetched, + ) + def _init_mappings(self) -> None: - self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { + self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Dict[ + Type[BaseModel], Union[Type[ComponentConstructor[Any]], Callable[..., Any]] + ] = { AddedFieldDefinitionModel: self.create_added_field_definition, AddFieldsModel: self.create_add_fields, ApiKeyAuthenticatorModel: self.create_api_key_authenticator, @@ -751,15 +777,15 @@ def _init_mappings(self) -> None: PredicateValidatorModel: self.create_predicate_validator, PropertiesFromEndpointModel: self.create_properties_from_endpoint, PropertyChunkingModel: self.create_property_chunking, - QueryPropertiesModel: self.create_query_properties, - RecordFilterModel: self.create_record_filter, + QueryPropertiesModel: QueryProperties, + RecordFilterModel: RecordFilter, RecordSelectorModel: self.create_record_selector, RemoveFieldsModel: self.create_remove_fields, RequestPathModel: self.create_request_path, RequestOptionModel: self.create_request_option, LegacySessionTokenAuthenticatorModel: self.create_legacy_session_token_authenticator, SelectiveAuthenticatorModel: self.create_selective_authenticator, - SimpleRetrieverModel: self.create_simple_retriever, + SimpleRetrieverModel: SimpleRetriever, StateDelegatingStreamModel: self.create_state_delegating_stream, SpecModel: self.create_spec, SubstreamPartitionRouterModel: self.create_substream_partition_router, @@ -821,20 +847,37 @@ def create_component( model=declarative_component_model, config=config, **kwargs ) - def _create_component_from_model(self, model: BaseModel, config: Config, **kwargs: Any) -> Any: + def _create_component_from_model( + self, + model: BaseModel, + config: Config, + **kwargs: Any, + ) -> Any: # TODO: change -> Any to -> ComponentConstructor[BaseModel] once all components are updated with ComponentConstructor logic if model.__class__ not in self.PYDANTIC_MODEL_TO_CONSTRUCTOR: raise ValueError( f"{model.__class__} with attributes {model} is not a valid component type" ) - component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__) - if not component_constructor: - raise ValueError(f"Could not find constructor for {model.__class__}") - # collect deprecation warnings for supported models. - if isinstance(model, BaseModelWithDeprecations): - self._collect_model_deprecations(model) + component = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__) + if not component: + raise ValueError(f"Could not find constructor for {model.__class__}") - return component_constructor(model=model, config=config, **kwargs) + if inspect.isclass(component) and issubclass(component, ComponentConstructor): + # Default components flow + component_instance: ComponentConstructor[BaseModel] = component.build( + model=model, + config=config, + dependency_constructor=self._create_component_from_model, + additional_flags=self._flags, + **kwargs, + ) + return component_instance + if inspect.ismethod(component) or inspect.isfunction(component): + return component(model=model, config=config, **kwargs) + raise ValueError( + f"Unexpected component mapping type for {model.__class__}. " + f"Instance should be one of ComponentConstructor or method implemented in ModelToComponentFactory" + ) def get_model_deprecations(self) -> List[ConnectorBuilderLogMessage]: """ @@ -2968,40 +3011,6 @@ def create_property_chunking( parameters=model.parameters or {}, ) - def create_query_properties( - self, model: QueryPropertiesModel, config: Config, **kwargs: Any - ) -> QueryProperties: - if isinstance(model.property_list, list): - property_list = model.property_list - else: - property_list = self._create_component_from_model( - model=model.property_list, config=config, **kwargs - ) - - property_chunking = ( - self._create_component_from_model( - model=model.property_chunking, config=config, **kwargs - ) - if model.property_chunking - else None - ) - - return QueryProperties( - property_list=property_list, - always_include_properties=model.always_include_properties, - property_chunking=property_chunking, - config=config, - parameters=model.parameters or {}, - ) - - @staticmethod - def create_record_filter( - model: RecordFilterModel, config: Config, **kwargs: Any - ) -> RecordFilter: - return RecordFilter( - condition=model.condition or "", config=config, parameters=model.parameters or {} - ) - @staticmethod def create_request_path(model: RequestPathModel, config: Config, **kwargs: Any) -> RequestPath: return RequestPath(parameters={}) @@ -3132,198 +3141,6 @@ def create_legacy_session_token_authenticator( parameters=model.parameters or {}, ) - def create_simple_retriever( - self, - model: SimpleRetrieverModel, - config: Config, - *, - name: str, - primary_key: Optional[Union[str, List[str], List[List[str]]]], - request_options_provider: Optional[RequestOptionsProvider] = None, - stop_condition_cursor: Optional[Cursor] = None, - client_side_incremental_sync: Optional[Dict[str, Any]] = None, - transformations: List[RecordTransformation], - file_uploader: Optional[DefaultFileUploader] = None, - incremental_sync: Optional[ - Union[IncrementingCountCursorModel, DatetimeBasedCursorModel] - ] = None, - use_cache: Optional[bool] = None, - log_formatter: Optional[Callable[[Response], Any]] = None, - partition_router: Optional[PartitionRouter] = None, - **kwargs: Any, - ) -> SimpleRetriever: - def _get_url(req: Requester) -> str: - """ - Closure to get the URL from the requester. This is used to get the URL in the case of a lazy retriever. - This is needed because the URL is not set until the requester is created. - """ - - _url: str = ( - model.requester.url - if hasattr(model.requester, "url") and model.requester.url is not None - else req.get_url(stream_state=None, stream_slice=None, next_page_token=None) - ) - _url_base: str = ( - model.requester.url_base - if hasattr(model.requester, "url_base") and model.requester.url_base is not None - else req.get_url_base(stream_state=None, stream_slice=None, next_page_token=None) - ) - - return _url or _url_base - - decoder = ( - self._create_component_from_model(model=model.decoder, config=config) - if model.decoder - else JsonDecoder(parameters={}) - ) - record_selector = self._create_component_from_model( - model=model.record_selector, - name=name, - config=config, - decoder=decoder, - transformations=transformations, - client_side_incremental_sync=client_side_incremental_sync, - file_uploader=file_uploader, - ) - - query_properties: Optional[QueryProperties] = None - query_properties_key: Optional[str] = None - if self._query_properties_in_request_parameters(model.requester): - # It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple - # places instead of default to request_parameters which isn't clearly documented - if ( - hasattr(model.requester, "fetch_properties_from_endpoint") - and model.requester.fetch_properties_from_endpoint - ): - raise ValueError( - f"PropertiesFromEndpoint should only be specified once per stream, but found in {model.requester.type}.fetch_properties_from_endpoint and {model.requester.type}.request_parameters" - ) - - query_properties_definitions = [] - for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _query_properties_in_request_parameters() - if isinstance(request_parameter, QueryPropertiesModel): - query_properties_key = key - query_properties_definitions.append(request_parameter) - - if len(query_properties_definitions) > 1: - raise ValueError( - f"request_parameters only supports defining one QueryProperties field, but found {len(query_properties_definitions)} usages" - ) - - if len(query_properties_definitions) == 1: - query_properties = self._create_component_from_model( - model=query_properties_definitions[0], config=config - ) - elif ( - hasattr(model.requester, "fetch_properties_from_endpoint") - and model.requester.fetch_properties_from_endpoint - ): - # todo: Deprecate this condition once dependent connectors migrate to query_properties - query_properties_definition = QueryPropertiesModel( - type="QueryProperties", - property_list=model.requester.fetch_properties_from_endpoint, - always_include_properties=None, - property_chunking=None, - ) # type: ignore # $parameters has a default value - - query_properties = self.create_query_properties( - model=query_properties_definition, - config=config, - ) - elif hasattr(model.requester, "query_properties") and model.requester.query_properties: - query_properties = self.create_query_properties( - model=model.requester.query_properties, - config=config, - ) - - requester = self._create_component_from_model( - model=model.requester, - decoder=decoder, - name=name, - query_properties_key=query_properties_key, - use_cache=use_cache, - config=config, - ) - - if not request_options_provider: - request_options_provider = DefaultRequestOptionsProvider(parameters={}) - if isinstance(request_options_provider, DefaultRequestOptionsProvider) and isinstance( - partition_router, PartitionRouter - ): - request_options_provider = partition_router - - paginator = ( - self._create_component_from_model( - model=model.paginator, - config=config, - url_base=_get_url(requester), - extractor_model=model.record_selector.extractor, - decoder=decoder, - cursor_used_for_stop_condition=stop_condition_cursor or None, - ) - if model.paginator - else NoPagination(parameters={}) - ) - - ignore_stream_slicer_parameters_on_paginated_requests = ( - model.ignore_stream_slicer_parameters_on_paginated_requests or False - ) - - if ( - model.partition_router - and isinstance(model.partition_router, SubstreamPartitionRouterModel) - and not bool(self._connector_state_manager.get_stream_state(name, None)) - and any( - parent_stream_config.lazy_read_pointer - for parent_stream_config in model.partition_router.parent_stream_configs - ) - ): - if incremental_sync: - if incremental_sync.type != "DatetimeBasedCursor": - raise ValueError( - f"LazySimpleRetriever only supports DatetimeBasedCursor. Found: {incremental_sync.type}." - ) - - elif incremental_sync.step or incremental_sync.cursor_granularity: - raise ValueError( - f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}." - ) - - if model.decoder and model.decoder.type != "JsonDecoder": - raise ValueError( - f"LazySimpleRetriever only supports JsonDecoder. Found: {model.decoder.type}." - ) - - return LazySimpleRetriever( - name=name, - paginator=paginator, - primary_key=primary_key, - requester=requester, - record_selector=record_selector, - stream_slicer=_NO_STREAM_SLICING, - request_option_provider=request_options_provider, - cursor=None, - config=config, - ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, - parameters=model.parameters or {}, - ) - - return SimpleRetriever( - name=name, - paginator=paginator, - primary_key=primary_key, - requester=requester, - record_selector=record_selector, - stream_slicer=_NO_STREAM_SLICING, - request_option_provider=request_options_provider, - cursor=None, - config=config, - ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, - additional_query_properties=query_properties, - log_formatter=self._get_log_formatter(log_formatter, name), - parameters=model.parameters or {}, - ) - def _get_log_formatter( self, log_formatter: Callable[[Response], Any] | None, name: str ) -> Callable[[Response], Any] | None: @@ -3349,19 +3166,6 @@ def _should_limit_slices_fetched(self) -> bool: """ return bool(self._limit_slices_fetched or self._emit_connector_builder_messages) - @staticmethod - def _query_properties_in_request_parameters( - requester: Union[HttpRequesterModel, CustomRequesterModel], - ) -> bool: - if not hasattr(requester, "request_parameters"): - return False - request_parameters = requester.request_parameters - if request_parameters and isinstance(request_parameters, Mapping): - for request_parameter in request_parameters.values(): - if isinstance(request_parameter, QueryPropertiesModel): - return True - return False - @staticmethod def _remove_query_properties( request_parameters: Mapping[str, Union[str, QueryPropertiesModel]], diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py index 37b26d171..051cb4943 100644 --- a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -1,8 +1,15 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. from dataclasses import InitVar, dataclass -from typing import Any, Iterable, List, Mapping, Optional, Union +from typing import Any, Callable, Iterable, List, Mapping, Optional, Union +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + QueryProperties as QueryPropertiesModel, +) +from airbyte_cdk.sources.declarative.parsers.component_constructor import ( + AdditionalFlags, + ComponentConstructor, +) from airbyte_cdk.sources.declarative.requesters.query_properties import ( PropertiesFromEndpoint, PropertyChunking, @@ -11,7 +18,7 @@ @dataclass -class QueryProperties: +class QueryProperties(ComponentConstructor[QueryPropertiesModel]): """ Low-code component that encompasses the behavior to inject additional property values into the outbound API requests. Property values can be defined statically within the manifest or dynamically by making requests @@ -25,6 +32,35 @@ class QueryProperties: config: Config parameters: InitVar[Mapping[str, Any]] + @classmethod + def resolve_dependencies( + cls, + model: QueryPropertiesModel, + config: Config, + dependency_constructor: Callable[..., Any], + additional_flags: AdditionalFlags, + **kwargs: Any, + ) -> Mapping[str, Any]: + if isinstance(model.property_list, list): + property_list = model.property_list + else: + property_list = dependency_constructor( + model=model.property_list, config=config, **kwargs + ) + + property_chunking = ( + dependency_constructor(model=model.property_chunking, config=config, **kwargs) + if model.property_chunking + else None + ) + return { + "property_list": property_list, + "always_include_properties": model.always_include_properties, + "property_chunking": property_chunking, + "config": config, + "parameters": model.parameters or {}, + } + def get_request_property_chunks( self, stream_slice: Optional[StreamSlice] = None ) -> Iterable[List[str]]: diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index ed83279de..37592b0e4 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -6,10 +6,10 @@ from collections import defaultdict from dataclasses import InitVar, dataclass, field from functools import partial -from itertools import islice from typing import ( Any, Callable, + Dict, Iterable, List, Mapping, @@ -18,19 +18,48 @@ Set, Tuple, Union, + cast, ) import requests +from requests import Response from typing_extensions import deprecated from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.models import AirbyteMessage +from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CustomRequester as CustomRequesterModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DatetimeBasedCursor as DatetimeBasedCursorModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpRequester as HttpRequesterModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + IncrementingCountCursor as IncrementingCountCursorModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + QueryProperties as QueryPropertiesModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + SimpleRetriever as SimpleRetrieverModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + SubstreamPartitionRouter as SubstreamPartitionRouterModel, +) +from airbyte_cdk.sources.declarative.parsers.component_constructor import ( + AdditionalFlags, + ComponentConstructor, +) from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( SinglePartitionRouter, ) +from airbyte_cdk.sources.declarative.partition_routers import PartitionRouter from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.query_properties import QueryProperties @@ -39,18 +68,25 @@ RequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.requester import Requester +from airbyte_cdk.sources.declarative.retrievers.file_uploader import DefaultFileUploader from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.http_logger import format_http_message from airbyte_cdk.sources.source import ExperimentalClassWarning +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.utils.mapping_helpers import combine_mappings FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete" +_NO_STREAM_SLICING = SinglePartitionRouter(parameters={}) + @dataclass -class SimpleRetriever(Retriever): +class SimpleRetriever(Retriever, ComponentConstructor[SimpleRetrieverModel]): """ Retrieves records by synchronously sending requests to fetch records. @@ -93,6 +129,278 @@ class SimpleRetriever(Retriever): additional_query_properties: Optional[QueryProperties] = None log_formatter: Optional[Callable[[requests.Response], Any]] = None + @classmethod + def _should_use_lazy_simple_retriever( + cls, + model: SimpleRetrieverModel, + additional_flags: AdditionalFlags, + incremental_sync: Optional[ + Union[IncrementingCountCursorModel, DatetimeBasedCursorModel] + ] = None, + name: Optional[str] = None, + ) -> bool: + if ( + model.partition_router + and isinstance(model.partition_router, SubstreamPartitionRouterModel) + and not bool(additional_flags.connector_state_manager.get_stream_state(name, None)) + and any( + parent_stream_config.lazy_read_pointer + for parent_stream_config in model.partition_router.parent_stream_configs + ) + ): + if incremental_sync: + if incremental_sync.type != "DatetimeBasedCursor": + raise ValueError( + f"LazySimpleRetriever only supports DatetimeBasedCursor. Found: {incremental_sync.type}." + ) + + elif incremental_sync.step or incremental_sync.cursor_granularity: + raise ValueError( + f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}." + ) + + if model.decoder and model.decoder.type != "JsonDecoder": + raise ValueError( + f"LazySimpleRetriever only supports JsonDecoder. Found: {model.decoder.type}." + ) + return True + + return False + + @classmethod + def resolve_dependencies( + cls, + model: SimpleRetrieverModel, + config: Config, + dependency_constructor: Callable[..., Any], + additional_flags: AdditionalFlags, + *, + name: Optional[str] = None, + primary_key: Optional[Union[str, List[str], List[List[str]]]] = None, + request_options_provider: Optional[RequestOptionsProvider] = None, + stop_condition_cursor: Optional[Cursor] = None, + client_side_incremental_sync: Optional[Dict[str, Any]] = None, + transformations: Optional[RecordTransformation] = None, + file_uploader: Optional[DefaultFileUploader] = None, + incremental_sync: Optional[ + Union[IncrementingCountCursorModel, DatetimeBasedCursorModel] + ] = None, + use_cache: Optional[bool] = None, + log_formatter: Optional[Callable[[Response], Any]] = None, + partition_router: Optional[PartitionRouter] = None, + **kwargs: Any, + ) -> Mapping[str, Any]: + if name is None: + raise ValueError(f"name argument is required to instance a {cls.__name__}") + + def _get_url(req: Requester) -> str: + """ + Closure to get the URL from the requester. This is used to get the URL in the case of a lazy retriever. + This is needed because the URL is not set until the requester is created. + """ + + _url: str = ( + model.requester.url + if hasattr(model.requester, "url") and model.requester.url is not None + else req.get_url(stream_state=None, stream_slice=None, next_page_token=None) + ) + _url_base: str = ( + model.requester.url_base + if hasattr(model.requester, "url_base") and model.requester.url_base is not None + else req.get_url_base(stream_state=None, stream_slice=None, next_page_token=None) + ) + + return _url or _url_base + + def query_properties_in_request_parameters( + requester: Union[HttpRequesterModel, CustomRequesterModel], + ) -> bool: + if not hasattr(requester, "request_parameters"): + return False + request_parameters = requester.request_parameters + if request_parameters and isinstance(request_parameters, Mapping): + for request_parameter in request_parameters.values(): + if isinstance(request_parameter, QueryPropertiesModel): + return True + return False + + def _get_log_formatter( + log_formatter: Callable[[Response], Any] | None, + name: str, + additional_flags: AdditionalFlags, + ) -> Callable[[Response], Any] | None: + if additional_flags.should_limit_slices_fetched: + return ( + ( + lambda response: format_http_message( + response, + f"Stream '{name}' request", + f"Request performed in order to extract records for stream '{name}'", + name, + ) + ) + if not log_formatter + else log_formatter + ) + return None + + decoder = ( + dependency_constructor(model=model.decoder, config=config) + if model.decoder + else JsonDecoder(parameters={}) + ) + record_selector = dependency_constructor( + model=model.record_selector, + name=name, + config=config, + decoder=decoder, + transformations=transformations, + client_side_incremental_sync=client_side_incremental_sync, + file_uploader=file_uploader, + ) + + query_properties: Optional[QueryProperties] = None + query_properties_key: Optional[str] = None + if query_properties_in_request_parameters(model.requester): + # It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple + # places instead of default to request_parameters which isn't clearly documented + if ( + hasattr(model.requester, "fetch_properties_from_endpoint") + and model.requester.fetch_properties_from_endpoint + ): + raise ValueError( + f"PropertiesFromEndpoint should only be specified once per stream, but found in {model.requester.type}.fetch_properties_from_endpoint and {model.requester.type}.request_parameters" + ) + + query_properties_definitions = [] + for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using query_properties_in_request_parameters() + if isinstance(request_parameter, QueryPropertiesModel): + query_properties_key = key + query_properties_definitions.append(request_parameter) + + if len(query_properties_definitions) > 1: + raise ValueError( + f"request_parameters only supports defining one QueryProperties field, but found {len(query_properties_definitions)} usages" + ) + + if len(query_properties_definitions) == 1: + query_properties = dependency_constructor( + model=query_properties_definitions[0], config=config + ) + elif ( + hasattr(model.requester, "fetch_properties_from_endpoint") + and model.requester.fetch_properties_from_endpoint + ): + # todo: Deprecate this condition once dependent connectors migrate to query_properties + query_properties_definition = QueryPropertiesModel( + type="QueryProperties", + property_list=model.requester.fetch_properties_from_endpoint, + always_include_properties=None, + property_chunking=None, + ) # type: ignore # $parameters has a default value + + query_properties = dependency_constructor( + model=query_properties_definition, + config=config, + **kwargs, + ) + + elif hasattr(model.requester, "query_properties") and model.requester.query_properties: + query_properties = dependency_constructor( + model=model.requester.query_properties, + config=config, + **kwargs, + ) + + requester = dependency_constructor( + model=model.requester, + decoder=decoder, + name=name, + query_properties_key=query_properties_key, + use_cache=use_cache, + config=config, + ) + + if not request_options_provider: + request_options_provider = DefaultRequestOptionsProvider(parameters={}) + if isinstance(request_options_provider, DefaultRequestOptionsProvider) and isinstance( + partition_router, PartitionRouter + ): + request_options_provider = partition_router + + paginator = ( + dependency_constructor( + model=model.paginator, + config=config, + url_base=_get_url(requester), + extractor_model=model.record_selector.extractor, + decoder=decoder, + cursor_used_for_stop_condition=stop_condition_cursor or None, + ) + if model.paginator + else NoPagination(parameters={}) + ) + + ignore_stream_slicer_parameters_on_paginated_requests = ( + model.ignore_stream_slicer_parameters_on_paginated_requests or False + ) + + resolved_dependencies = { + "name": name, + "paginator": paginator, + "primary_key": primary_key, + "requester": requester, + "record_selector": record_selector, + "stream_slicer": _NO_STREAM_SLICING, + "request_option_provider": request_options_provider, + "cursor": None, + "config": config, + "ignore_stream_slicer_parameters_on_paginated_requests": ignore_stream_slicer_parameters_on_paginated_requests, + "parameters": model.parameters or {}, + } + + if cls._should_use_lazy_simple_retriever(model, additional_flags, incremental_sync, name): + return resolved_dependencies + + resolved_dependencies.update( + { + "additional_query_properties": query_properties, + "log_formatter": _get_log_formatter(log_formatter, name, additional_flags), + } + ) + return resolved_dependencies + + @classmethod + def build( + cls, + model: SimpleRetrieverModel, + config: Config, + dependency_constructor: Callable[..., Any], + additional_flags: AdditionalFlags, + **kwargs: Any, + ) -> "SimpleRetriever": + """ + Builds up the Component and it's component-specific dependencies. + Order of operations: + - build the dependencies first + - build the component with the resolved dependencies + """ + resolved_dependencies: Mapping[str, Any] = cls.resolve_dependencies( + model=model, + config=config, + dependency_constructor=dependency_constructor, + additional_flags=additional_flags, + **kwargs, + ) + if cls._should_use_lazy_simple_retriever( + model=model, + additional_flags=additional_flags, + incremental_sync=kwargs.get("incremental_sync"), + name=kwargs.get("name"), + ): + return LazySimpleRetriever(**resolved_dependencies) + return SimpleRetriever(**resolved_dependencies) + def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._paginator = self.paginator or NoPagination(parameters=parameters) self._parameters = parameters