Skip to content

Commit

Permalink
airbyte-cdk offset pagination strategy: page_size to be interpolated … (
Browse files Browse the repository at this point in the history
#19646)

* airbyte-cdk offset pagination strategy: page_size to be interpolated string
* airbyte-cdk offset pagination strategy: bump version
  • Loading branch information
roman-yermilov-gl authored Dec 6, 2022
1 parent 22c1e12 commit bedc3b9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,51 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, List, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class OffsetIncrement(PaginationStrategy, JsonSchemaMixin):
"""
Pagination strategy that returns the number of records reads so far and returns it as the next page token
Examples:
# page_size to be a constant integer value
pagination_strategy:
type: OffsetIncrement
page_size: 2
# page_size to be a constant string value
pagination_strategy:
type: OffsetIncrement
page_size: "2"
# page_size to be an interpolated string value
pagination_strategy:
type: OffsetIncrement
page_size: "{{ options['items_per_page'] }}"
Attributes:
page_size (int): the number of records to request
page_size (InterpolatedString): the number of records to request
"""

page_size: int
config: Config
page_size: Union[InterpolatedString, str, int]
options: InitVar[Mapping[str, Any]]

def __post_init__(self, options: Mapping[str, Any]):
self._offset = 0
# InterpolatedString page_size may contain one of int/str types,
# so we need to ensure that its `.string` attribute is of *string* type
self.page_size.string = str(self.page_size.string)

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
if len(last_records) < self.page_size:
if len(last_records) < self.page_size.eval(self.config):
return None
else:
self._offset += len(last_records)
Expand All @@ -36,4 +57,7 @@ def reset(self):
self._offset = 0

def get_page_size(self) -> Optional[int]:
return self.page_size
page_size = self.page_size.eval(self.config)
if not isinstance(page_size, int):
raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
return page_size
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.11.3",
version="0.11.4",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@

import pytest
import requests

from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement


@pytest.mark.parametrize(
"test_name, page_size, expected_next_page_token, expected_offset",
[
("test_same_page_size", 2, 2, 2),
("test_larger_page_size", 3, None, 0),
("test_same_page_size", InterpolatedString(string="2", options={}), 2, 2),
("test_same_page_size", InterpolatedString(string=2, options={}), 2, 2),
("test_larger_page_size", InterpolatedString(string="{{ options['page_size'] }}", options={"page_size": 3}), None, 0),
],
)
def test_offset_increment_paginator_strategy(test_name, page_size, expected_next_page_token, expected_offset):
paginator_strategy = OffsetIncrement(page_size, options={})
paginator_strategy = OffsetIncrement(page_size=page_size, options={}, config={})
assert paginator_strategy._offset == 0

response = requests.Response()
Expand All @@ -33,3 +36,14 @@ def test_offset_increment_paginator_strategy(test_name, page_size, expected_next

paginator_strategy.reset()
assert 0 == paginator_strategy._offset


def test_offset_increment_paginator_strategy_rises():
paginator_strategy = OffsetIncrement(
page_size=InterpolatedString(string="{{ options['page_size'] }}", options={"page_size": "invalid value"}),
options={},
config={}
)
with pytest.raises(Exception) as exc:
paginator_strategy.get_page_size()
assert str(exc.value) == 'invalid value is of type <class \'str\'>. Expected <class \'int\'>'

0 comments on commit bedc3b9

Please sign in to comment.