Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2279,12 +2279,14 @@ definitions:
- FAIL
- RETRY
- IGNORE
- RESET_PAGINATION
- RATE_LIMITED
examples:
- SUCCESS
- FAIL
- RETRY
- IGNORE
- RESET_PAGINATION
- RATE_LIMITED
failure_type:
title: Failure Type
Expand Down Expand Up @@ -3707,6 +3709,9 @@ definitions:
anyOf:
- "$ref": "#/definitions/DefaultPaginator"
- "$ref": "#/definitions/NoPagination"
pagination_reset:
description: Describes what triggers pagination reset and how to handle it.
"$ref": "#/definitions/PaginationReset"
ignore_stream_slicer_parameters_on_paginated_requests:
description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.
type: boolean
Expand All @@ -3730,6 +3735,36 @@ definitions:
$parameters:
type: object
additionalProperties: true
PaginationReset:
title: Pagination Reset
description: Describes what triggers pagination reset and how to handle it. If SPLIT_USING_CURSOR, the connector developer is accountable for ensuring that the records are returned in ascending order.
type: object
required:
- type
- action
properties:
type:
type: string
enum: [ PaginationReset ]
action:
type: string
enum:
- SPLIT_USING_CURSOR
- RESET
limits:
"$ref": "#/definitions/PaginationResetLimits"
PaginationResetLimits:
title: Pagination Reset Limits
description: Describes the limits that trigger pagination reset
type: object
required:
- type
properties:
type:
type: string
enum: [ PaginationResetLimits ]
number_of_records:
type: integer
GzipDecoder:
title: gzip
description: Select 'gzip' for response data that is compressed with gzip. Requires specifying an inner data type/decoder to parse the decompressed data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def __init__(
self._connector_state_converter = connector_state_converter
self._cursor_field = cursor_field

self._cursor_factory = cursor_factory
self._cursor_factory = cursor_factory # self._cursor_factory is flagged as private but is used in model_to_component_factory to ease pagination reset instantiation
self._partition_router = partition_router

# The dict is ordered to ensure that once the maximum number of partitions is reached,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ class Action(Enum):
FAIL = "FAIL"
RETRY = "RETRY"
IGNORE = "IGNORE"
RESET_PAGINATION = "RESET_PAGINATION"
RATE_LIMITED = "RATE_LIMITED"


Expand All @@ -553,7 +554,14 @@ class HttpResponseFilter(BaseModel):
action: Optional[Action] = Field(
None,
description="Action to execute if a response matches the filter.",
examples=["SUCCESS", "FAIL", "RETRY", "IGNORE", "RATE_LIMITED"],
examples=[
"SUCCESS",
"FAIL",
"RETRY",
"IGNORE",
"RESET_PAGINATION",
"RATE_LIMITED",
],
title="Action",
)
failure_type: Optional[FailureType] = Field(
Expand Down Expand Up @@ -1173,6 +1181,16 @@ class LegacySessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class Action1(Enum):
SPLIT_USING_CURSOR = "SPLIT_USING_CURSOR"
RESET = "RESET"


class PaginationResetLimits(BaseModel):
type: Literal["PaginationResetLimits"]
number_of_records: Optional[int] = None


class CsvDecoder(BaseModel):
type: Literal["CsvDecoder"]
encoding: Optional[str] = "utf-8"
Expand Down Expand Up @@ -2054,6 +2072,12 @@ class RecordSelector(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class PaginationReset(BaseModel):
type: Literal["PaginationReset"]
action: Action1
limits: Optional[PaginationResetLimits] = None


class GzipDecoder(BaseModel):
type: Literal["GzipDecoder"]
decoder: Union[CsvDecoder, GzipDecoder, JsonDecoder, JsonlDecoder]
Expand Down Expand Up @@ -2822,6 +2846,10 @@ class SimpleRetriever(BaseModel):
None,
description="Paginator component that describes how to navigate through the API's pages.",
)
pagination_reset: Optional[PaginationReset] = Field(
None,
description="Describes what triggers pagination reset and how to handle it.",
)
ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field(
False,
description="If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@
)
from airbyte_cdk.sources.declarative.models import (
CustomStateMigration,
PaginationResetLimits,
)
from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
DEPRECATION_LOGS_TAG,
BaseModelWithDeprecations,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
Action1 as PaginationResetActionModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
AddedFieldDefinition as AddedFieldDefinitionModel,
)
Expand Down Expand Up @@ -358,6 +362,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
PageIncrement as PageIncrementModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
PaginationReset as PaginationResetModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
)
Expand Down Expand Up @@ -529,6 +536,7 @@
LocalFileSystemFileWriter,
NoopFileWriter,
)
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -644,6 +652,8 @@
# this would be a circular import
MAX_SLICES = 5

LOGGER = logging.getLogger(f"airbyte.model_to_component_factory")


class ModelToComponentFactory:
EPOCH_DATETIME_FORMAT = "%s"
Expand Down Expand Up @@ -2043,6 +2053,7 @@ def create_default_stream(
if isinstance(concurrent_cursor, FinalStateCursor)
else concurrent_cursor
)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
Expand All @@ -2051,12 +2062,9 @@ def create_default_stream(
request_options_provider=request_options_provider,
stream_slicer=stream_slicer,
partition_router=partition_router,
stop_condition_cursor=concurrent_cursor
if self._is_stop_condition_on_cursor(model)
else None,
client_side_incremental_sync={"cursor": concurrent_cursor}
if self._is_client_side_filtering_enabled(model)
else None,
has_stop_condition_cursor=self._is_stop_condition_on_cursor(model),
is_client_side_incremental_sync=self._is_client_side_filtering_enabled(model),
cursor=concurrent_cursor,
transformations=transformations,
file_uploader=file_uploader,
incremental_sync=model.incremental_sync,
Expand Down Expand Up @@ -3050,7 +3058,7 @@ def create_record_selector(
name: str,
transformations: List[RecordTransformation] | None = None,
decoder: Decoder | None = None,
client_side_incremental_sync: Dict[str, Any] | None = None,
client_side_incremental_sync_cursor: Optional[Cursor] = None,
file_uploader: Optional[DefaultFileUploader] = None,
**kwargs: Any,
) -> RecordSelector:
Expand All @@ -3066,14 +3074,14 @@ def create_record_selector(
transform_before_filtering = (
False if model.transform_before_filtering is None else model.transform_before_filtering
)
if client_side_incremental_sync:
if client_side_incremental_sync_cursor:
record_filter = ClientSideIncrementalRecordFilterDecorator(
config=config,
parameters=model.parameters,
condition=model.record_filter.condition
if (model.record_filter and hasattr(model.record_filter, "condition"))
else None,
**client_side_incremental_sync,
cursor=client_side_incremental_sync_cursor,
)
transform_before_filtering = (
True
Expand Down Expand Up @@ -3151,8 +3159,9 @@ def create_simple_retriever(
name: str,
primary_key: Optional[Union[str, List[str], List[List[str]]]],
request_options_provider: Optional[RequestOptionsProvider] = None,
stop_condition_cursor: Optional[Cursor] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
cursor: Optional[Cursor] = None,
has_stop_condition_cursor: bool = False,
is_client_side_incremental_sync: bool = False,
transformations: List[RecordTransformation],
file_uploader: Optional[DefaultFileUploader] = None,
incremental_sync: Optional[
Expand Down Expand Up @@ -3182,6 +3191,9 @@ def _get_url(req: Requester) -> str:

return _url or _url_base

if cursor is None:
cursor = FinalStateCursor(name, None, self._message_repository)

decoder = (
self._create_component_from_model(model=model.decoder, config=config)
if model.decoder
Expand All @@ -3193,7 +3205,7 @@ def _get_url(req: Requester) -> str:
config=config,
decoder=decoder,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
client_side_incremental_sync_cursor=cursor if is_client_side_incremental_sync else None,
file_uploader=file_uploader,
)

Expand Down Expand Up @@ -3270,7 +3282,7 @@ def _get_url(req: Requester) -> str:
url_base=_get_url(requester),
extractor_model=model.record_selector.extractor,
decoder=decoder,
cursor_used_for_stop_condition=stop_condition_cursor or None,
cursor_used_for_stop_condition=cursor if has_stop_condition_cursor else None,
)
if model.paginator
else NoPagination(parameters={})
Expand Down Expand Up @@ -3319,6 +3331,13 @@ def _get_url(req: Requester) -> str:
parameters=model.parameters or {},
)

if (
model.record_selector.record_filter
and model.pagination_reset
and model.pagination_reset.limits
):
raise ValueError("PaginationResetLimits are not supported while having record filter.")

return SimpleRetriever(
name=name,
paginator=paginator,
Expand All @@ -3332,9 +3351,40 @@ def _get_url(req: Requester) -> str:
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
additional_query_properties=query_properties,
log_formatter=self._get_log_formatter(log_formatter, name),
pagination_tracker_factory=self._create_pagination_tracker_factory(
model.pagination_reset, cursor
),
parameters=model.parameters or {},
)

def _create_pagination_tracker_factory(
self, model: Optional[PaginationResetModel], cursor: Cursor
) -> Callable[[], PaginationTracker]:
if model is None:
return lambda: PaginationTracker()

# Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic
cursor_factory: Callable[[], Optional[ConcurrentCursor]] = lambda: None
if model.action == PaginationResetActionModel.RESET:
# in that case, we will let cursor_factory to return None even if the stream has a cursor
pass
elif model.action == PaginationResetActionModel.SPLIT_USING_CURSOR:
if isinstance(cursor, ConcurrentCursor):
cursor_factory = lambda: cursor.copy_without_state() # type: ignore # the if condition validates that it is a ConcurrentCursor
elif isinstance(cursor, ConcurrentPerPartitionCursor):
cursor_factory = lambda: cursor._cursor_factory.create( # type: ignore # if this becomes a problem, we would need to extract the cursor_factory instantiation logic and make it accessible here
{}, datetime.timedelta(0)
)
elif not isinstance(cursor, FinalStateCursor):
LOGGER.warning(
"Unknown cursor for PaginationTracker. Pagination resets might not work properly"
)
else:
raise ValueError(f"Unknown PaginationReset action: {model.action}")

limit = model.limits.number_of_records if model and model.limits else None
return lambda: PaginationTracker(cursor_factory(), limit)

def _get_log_formatter(
self, log_formatter: Callable[[Response], Any] | None, name: str
) -> Callable[[Response], Any] | None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ def interpret_response(
if not isinstance(matched_error_resolution, ErrorResolution):
continue

if matched_error_resolution.response_action == ResponseAction.SUCCESS:
if matched_error_resolution.response_action in [
ResponseAction.SUCCESS,
ResponseAction.RETRY,
ResponseAction.IGNORE,
ResponseAction.RESET_PAGINATION,
]:
return matched_error_resolution

if (
matched_error_resolution.response_action == ResponseAction.RETRY
or matched_error_resolution.response_action == ResponseAction.IGNORE
):
return matched_error_resolution
if matched_error_resolution:
return matched_error_resolution

Expand Down
Loading
Loading