Skip to content

Commit

Permalink
low-code cdk: make page_size optional for offset and page increment s…
Browse files Browse the repository at this point in the history
…trategies (#26056)

* make page_size optional

* Automated Commit - Formatting Changes

---------

Co-authored-by: brianjlai <brianjlai@users.noreply.github.com>
  • Loading branch information
brianjlai and brianjlai committed May 24, 2023
1 parent 27160be commit 5707e47
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,6 @@ definitions:
type: object
required:
- type
- page_size
properties:
type:
type: string
Expand All @@ -1533,6 +1532,7 @@ definitions:
- type: string
interpolation_context:
- config
- response
examples:
- 100
- "{{ config['page_size'] }}"
Expand All @@ -1545,7 +1545,6 @@ definitions:
type: object
required:
- type
- page_size
properties:
type:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ class Config:

class OffsetIncrement(BaseModel):
type: Literal["OffsetIncrement"]
page_size: Union[int, str] = Field(
...,
page_size: Optional[Union[int, str]] = Field(
None,
description="The number of records to include in each pages.",
examples=[100, "{{ config['page_size'] }}"],
title="Limit",
Expand All @@ -617,8 +617,8 @@ class OffsetIncrement(BaseModel):

class PageIncrement(BaseModel):
type: Literal["PageIncrement"]
page_size: int = Field(
...,
page_size: Optional[int] = Field(
None,
description="The number of records to include in each pages.",
examples=[100, "100"],
title="Page Size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, List, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.declarative.types import Config
Expand Down Expand Up @@ -36,16 +37,23 @@ class OffsetIncrement(PaginationStrategy):
"""

config: Config
page_size: Union[str, int]
page_size: Optional[Union[str, int]]
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = JsonDecoder(parameters={})

def __post_init__(self, parameters: Mapping[str, Any]):
self._offset = 0
page_size = str(self.page_size) if isinstance(self.page_size, int) else self.page_size
self._page_size = InterpolatedString(page_size, parameters=parameters)
if page_size:
self._page_size = InterpolatedString(page_size, parameters=parameters)
else:
self._page_size = None

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
if len(last_records) < self._page_size.eval(self.config):
decoded_response = self.decoder.decode(response)

# Stop paginating when there are fewer records than the page size or the current page has no records
if (self._page_size and len(last_records) < self._page_size.eval(self.config, response=decoded_response)) or len(last_records) == 0:
return None
else:
self._offset += len(last_records)
Expand All @@ -55,7 +63,10 @@ def reset(self):
self._offset = 0

def get_page_size(self) -> Optional[int]:
page_size = self._page_size.eval(self.config)
if not isinstance(page_size, int):
raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
return page_size
if self._page_size:
page_size = self._page_size.eval(self.config)
if not isinstance(page_size, int):
raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
return page_size
else:
return self._page_size
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ class PageIncrement(PaginationStrategy):
start_from_page (int): number of the initial page
"""

page_size: int
page_size: Optional[int]
parameters: InitVar[Mapping[str, Any]]
start_from_page: int = 0

def __post_init__(self, parameters: Mapping[str, Any]):
self._page = self.start_from_page

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
if len(last_records) < self.page_size:
# Stop paginating when there are fewer records than the page size or the current page has no records
if (self.page_size and len(last_records) < self.page_size) or len(last_records) == 0:
return None
else:
self._page += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,17 @@ def test_limit_page_fetched():
assert last_token

assert not paginator.next_page_token(MagicMock(), MagicMock())


def test_paginator_with_page_option_no_page_size():
pagination_strategy = OffsetIncrement(config={}, page_size=None, parameters={})

with pytest.raises(ValueError):
DefaultPaginator(
page_size_option=MagicMock(),
page_token_option=RequestOption("limit", RequestOptionType.request_parameter, parameters={}),
pagination_strategy=pagination_strategy,
config=MagicMock(),
url_base=MagicMock(),
parameters={},
),
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,24 @@


@pytest.mark.parametrize(
"test_name, page_size, parameters, expected_next_page_token, expected_offset",
"page_size, parameters, last_records, expected_next_page_token, expected_offset",
[
("test_same_page_size", "2", {}, 2, 2),
("test_same_page_size", 2, {}, 2, 2),
("test_larger_page_size", "{{ parameters['page_size'] }}", {"page_size": 3}, None, 0),
pytest.param("2", {}, [{"id": 0}, {"id": 1}], 2, 2, id="test_same_page_size"),
pytest.param(2, {}, [{"id": 0}, {"id": 1}], 2, 2, id="test_same_page_size"),
pytest.param("{{ parameters['page_size'] }}", {"page_size": 3}, [{"id": 0}, {"id": 1}], None, 0, id="test_larger_page_size"),
pytest.param(None, {}, [], None, 0, id="test_stop_if_no_records"),
pytest.param("{{ response['page_metadata']['limit'] }}", {}, [{"id": 0}, {"id": 1}], None, 0, id="test_page_size_from_response"),
],
)
def test_offset_increment_paginator_strategy(test_name, page_size, parameters, expected_next_page_token, expected_offset):
def test_offset_increment_paginator_strategy(page_size, parameters, last_records, expected_next_page_token, expected_offset):
paginator_strategy = OffsetIncrement(page_size=page_size, parameters=parameters, config={})
assert paginator_strategy._offset == 0

response = requests.Response()

response.headers = {"A_HEADER": "HEADER_VALUE"}
response_body = {"next": "https://airbyte.io/next_url"}
response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}}
response._content = json.dumps(response_body).encode("utf-8")
last_records = [{"id": 0}, {"id": 1}]

next_page_token = paginator_strategy.next_page_token(response, last_records)
assert expected_next_page_token == next_page_token
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@


@pytest.mark.parametrize(
"test_name, page_size, start_from, expected_next_page_token, expected_offset",
"page_size, start_from, last_records, expected_next_page_token, expected_offset",
[
("test_same_page_size_start_from_0", 2, 1, 2, 2),
("test_larger_page_size_start_from_0", 3, 1, None, 1),
("test_same_page_size_start_from_1", 2, 0, 1, 1),
("test_larger_page_size_start_from_0", 3, 0, None, 0),
pytest.param(2, 1, [{"id": 0}, {"id": 1}], 2, 2, id="test_same_page_size_start_from_0"),
pytest.param(3, 1, [{"id": 0}, {"id": 1}], None, 1, id="test_larger_page_size_start_from_0"),
pytest.param(2, 0, [{"id": 0}, {"id": 1}], 1, 1, id="test_same_page_size_start_from_1"),
pytest.param(3, 0, [{"id": 0}, {"id": 1}], None, 0, id="test_larger_page_size_start_from_0"),
pytest.param(None, 0, [], None, 0, id="test_no_page_size"),
],
)
def test_page_increment_paginator_strategy(test_name, page_size, start_from, expected_next_page_token, expected_offset):
def test_page_increment_paginator_strategy(page_size, start_from, last_records, expected_next_page_token, expected_offset):
paginator_strategy = PageIncrement(page_size, parameters={}, start_from_page=start_from)
assert paginator_strategy._page == start_from

Expand All @@ -27,7 +28,6 @@ def test_page_increment_paginator_strategy(test_name, page_size, start_from, exp
response.headers = {"A_HEADER": "HEADER_VALUE"}
response_body = {"next": "https://airbyte.io/next_url"}
response._content = json.dumps(response_body).encode("utf-8")
last_records = [{"id": 0}, {"id": 1}]

next_page_token = paginator_strategy.next_page_token(response, last_records)
assert expected_next_page_token == next_page_token
Expand Down

0 comments on commit 5707e47

Please sign in to comment.