Skip to content

Commit

Permalink
Add ignore_stream_slicer_parameters_on_paginated_requests flag (#35462)
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda authored Feb 21, 2024
1 parent c9b7d8a commit 5724ca0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2001,6 +2001,10 @@ definitions:
anyOf:
- "$ref": "#/definitions/DefaultPaginator"
- "$ref": "#/definitions/NoPagination"
ignore_stream_slicer_parameters_on_paginated_requests:
description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.
type: boolean
default: false
partition_router:
title: Partition Router
description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,10 @@ class SimpleRetriever(BaseModel):
None,
description="Paginator component that describes how to navigate through the API's pages.",
)
ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field(
False,
description='If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.',
)
partition_router: Optional[
Union[
CustomPartitionRouter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,12 +958,17 @@ def create_simple_retriever(
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
paginator = (
self._create_component_from_model(
model=model.paginator, config=config, url_base=url_base, cursor_used_for_stop_condition=cursor_used_for_stop_condition
model=model.paginator,
config=config,
url_base=url_base,
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
)
if model.paginator
else NoPagination(parameters={})
)

ignore_stream_slicer_parameters_on_paginated_requests = model.ignore_stream_slicer_parameters_on_paginated_requests or False

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
Expand All @@ -975,6 +980,7 @@ def create_simple_retriever(
cursor=cursor,
config=config,
maximum_number_of_slices=self._limit_slices_fetched or 5,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
)
return SimpleRetriever(
Expand All @@ -986,6 +992,7 @@ def create_simple_retriever(
stream_slicer=stream_slicer,
cursor=cursor,
config=config,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters=model.parameters or {},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class SimpleRetriever(Retriever):
paginator: Optional[Paginator] = None
stream_slicer: StreamSlicer = SinglePartitionRouter(parameters={})
cursor: Optional[Cursor] = None
ignore_stream_slicer_parameters_on_paginated_requests: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._paginator = self.paginator or NoPagination(parameters=parameters)
Expand Down Expand Up @@ -105,12 +106,12 @@ def _get_request_options(
Returned merged mapping otherwise
"""
# FIXME we should eventually remove the usage of stream_state as part of the interpolation
return combine_mappings(
[
paginator_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
stream_slicer_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
]
)
mappings = [
paginator_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
]
if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
mappings.append(stream_slicer_method(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token))
return combine_mappings(mappings)

def _request_headers(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,46 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping):
pass


@pytest.mark.parametrize(
"test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, next_page_token, expected_mapping",
[
("test_do_not_ignore_stream_slicer_params_if_ignore_is_true_but_no_next_page_token", {"key_from_pagination": "1000"}, True, None, {"key_from_pagination": "1000"}),
("test_do_not_ignore_stream_slicer_params_if_ignore_is_false_and_no_next_page_token", {"key_from_pagination": "1000"}, False, None, {"key_from_pagination": "1000", "key_from_slicer": "value"}),
("test_ignore_stream_slicer_params_on_paginated_request", {"key_from_pagination": "1000"}, True, {"page": 2}, {"key_from_pagination": "1000"}),
("test_do_not_ignore_stream_slicer_params_on_paginated_request", {"key_from_pagination": "1000"}, False, {"page": 2}, {"key_from_pagination": "1000", "key_from_slicer": "value"}),
],
)
def test_ignore_stream_slicer_parameters_on_paginated_requests(test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, next_page_token, expected_mapping):
# This test is separate from the other request options because request headers must be strings
paginator = MagicMock()
paginator.get_request_headers.return_value = paginator_mapping
requester = MagicMock(use_cache=False)

stream_slicer = MagicMock()
stream_slicer.get_request_headers.return_value = {"key_from_slicer": "value"}

record_selector = MagicMock()
retriever = SimpleRetriever(
name="stream_name",
primary_key=primary_key,
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
paginator=paginator,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
parameters={},
config={},
)

request_option_type_to_method = {
RequestOptionType.header: retriever._request_headers,
}

for _, method in request_option_type_to_method.items():
actual_mapping = method(None, None, next_page_token={"next_page_token": "1000"})
assert expected_mapping == actual_mapping


@pytest.mark.parametrize(
"test_name, slicer_body_data, paginator_body_data, expected_body_data",
[
Expand Down

0 comments on commit 5724ca0

Please sign in to comment.