Skip to content

Commit

Permalink
low-code: Add last_page_size and last_record to pagination context (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Apr 2, 2024
1 parent a649cbc commit 4af69fc
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 30 deletions.
Expand Up @@ -353,11 +353,12 @@ definitions:
interpolation_context:
- config
- headers
- last_records
- last_page_size
- last_record
- response
examples:
- "{{ headers.link.next.cursor }}"
- "{{ last_records[-1]['key'] }}"
- "{{ last_record['key'] }}"
- "{{ response['nextPage'] }}"
page_size:
title: Page Size
Expand All @@ -372,7 +373,7 @@ definitions:
interpolation_context:
- config
- headers
- last_records
- last_record
- response
examples:
- "{{ response.data.has_more is false }}"
Expand Down Expand Up @@ -2306,20 +2307,20 @@ interpolation:
x-ratelimit-limit: "600"
x-ratelimit-remaining: "598"
x-ratelimit-reset: "39"
- title: last_records
description: List of records extracted from the last response received from the API.
type: list
- title: last_record
description: Last record extracted from the response received from the API.
type: object
examples:
- name: "Test List: 19"
id: 0236d6d2
contact_count: 20
_metadata:
self: https://api.sendgrid.com/v3/marketing/lists/0236d6d2
- title: last_page_size
description: Number of records extracted from the last response received from the API.
type: object
examples:
- - name: "Test List: 19"
id: 0236d6d2
contact_count: 20
_metadata:
self: https://api.sendgrid.com/v3/marketing/lists/0236d6d2
- name: List for CI tests, number 30
id: 041ee031
contact_count: 0
_metadata:
self: https://api.sendgrid.com/v3/marketing/lists/041ee031
- 2
- title: next_page_token
description: Object describing the token to fetch the next page of records. The object has a single key "next_page_token".
type: object
Expand Down
Expand Up @@ -880,7 +880,7 @@ class CursorPagination(BaseModel):
description='Value of the cursor defining the next page to fetch.',
examples=[
'{{ headers.link.next.cursor }}',
"{{ last_records[-1]['key'] }}",
"{{ last_record['key'] }}",
"{{ response['nextPage'] }}",
],
title='Cursor Value',
Expand Down
Expand Up @@ -3,15 +3,15 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Dict, List, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import PaginationStrategy
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.declarative.types import Config, Record


@dataclass
Expand All @@ -34,32 +34,52 @@ class CursorPaginationStrategy(PaginationStrategy):
stop_condition: Optional[Union[InterpolatedBoolean, str]] = None
decoder: Decoder = JsonDecoder(parameters={})

def __post_init__(self, parameters: Mapping[str, Any]):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if isinstance(self.cursor_value, str):
self.cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters)
self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters)
else:
self._cursor_value = self.cursor_value
if isinstance(self.stop_condition, str):
self.stop_condition = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters)
else:
self._stop_condition = self.stop_condition

@property
def initial_token(self) -> Optional[Any]:
return None

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
decoded_response = self.decoder.decode(response)

# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
headers = response.headers
headers: Dict[str, Any] = dict(response.headers)
headers["link"] = response.links

if self.stop_condition:
should_stop = self.stop_condition.eval(self.config, response=decoded_response, headers=headers, last_records=last_records)
last_record = last_records[-1] if last_records else None

if self._stop_condition:
should_stop = self._stop_condition.eval(
self.config,
response=decoded_response,
headers=headers,
last_records=last_records,
last_record=last_record,
last_page_size=len(last_records),
)
if should_stop:
return None
token = self.cursor_value.eval(config=self.config, last_records=last_records, response=decoded_response, headers=headers)
token = self._cursor_value.eval(
config=self.config,
last_records=last_records,
response=decoded_response,
headers=headers,
last_record=last_record,
last_page_size=len(last_records),
)
return token if token else None

def reset(self):
def reset(self) -> None:
# No state to reset
pass

Expand Down
Expand Up @@ -243,8 +243,8 @@ def test_full_config_stream():

assert isinstance(stream.retriever.paginator.pagination_strategy, CursorPaginationStrategy)
assert isinstance(stream.retriever.paginator.pagination_strategy.decoder, JsonDecoder)
assert stream.retriever.paginator.pagination_strategy.cursor_value.string == "{{ response._metadata.next }}"
assert stream.retriever.paginator.pagination_strategy.cursor_value.default == "{{ response._metadata.next }}"
assert stream.retriever.paginator.pagination_strategy._cursor_value.string == "{{ response._metadata.next }}"
assert stream.retriever.paginator.pagination_strategy._cursor_value.default == "{{ response._metadata.next }}"
assert stream.retriever.paginator.pagination_strategy.page_size == 10

assert isinstance(stream.retriever.requester, HttpRequester)
Expand Down Expand Up @@ -1128,7 +1128,7 @@ def test_create_default_paginator():

assert isinstance(paginator.pagination_strategy, CursorPaginationStrategy)
assert paginator.pagination_strategy.page_size == 50
assert paginator.pagination_strategy.cursor_value.string == "{{ response._metadata.next }}"
assert paginator.pagination_strategy._cursor_value.string == "{{ response._metadata.next }}"

assert isinstance(paginator.page_size_option, RequestOption)
assert paginator.page_size_option.inject_into == RequestOptionType.request_parameter
Expand Down
Expand Up @@ -62,3 +62,31 @@ def test_cursor_pagination_strategy(test_name, template_string, stop_condition,
token = strategy.next_page_token(response, last_records)
assert expected_token == token
assert page_size == strategy.get_page_size()


def test_last_record_points_to_the_last_item_in_last_records_array():
last_records = [{"id": 0, "more_records": True}, {"id": 1, "more_records": True}]
strategy = CursorPaginationStrategy(
page_size=1,
cursor_value="{{ last_record.id }}",
config={},
parameters={},
)

response = requests.Response()
next_page_token = strategy.next_page_token(response, last_records)
assert next_page_token == 1


def test_last_record_is_node_if_no_records():
last_records = []
strategy = CursorPaginationStrategy(
page_size=1,
cursor_value="{{ last_record.id }}",
config={},
parameters={},
)

response = requests.Response()
next_page_token = strategy.next_page_token(response, last_records)
assert next_page_token is None

0 comments on commit 4af69fc

Please sign in to comment.