Skip to content

Commit

Permalink
🐛 Source Monday: fix pagination for the items stream (#25881)
Browse files Browse the repository at this point in the history
* source monday: fix pagination for the items stream

* source monday: refactor item pagination strategy

* source monday: update pagination strategy docstring

* formatting fixes and update version

---------

Co-authored-by: Mal Hancock <mallory@archangelic.space>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
  • Loading branch information
3 people committed May 23, 2023
1 parent b8b1335 commit c296687
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 12 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-monday/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_monday ./source_monday
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/source-monday
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 80a54ea2-9959-4040-aac1-eee42423ec9b
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
dockerRepository: airbyte/source-monday
githubIssueLabel: source-monday
icon: monday.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from dataclasses import dataclass
from functools import partial
from typing import Any, Mapping, MutableMapping, Optional, Type, Union

from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
Expand Down Expand Up @@ -63,12 +64,12 @@ def _build_query(self, object_name: str, field_schema: dict, **object_arguments)

return f"{object_name}{arguments}{{{fields}}}"

def _build_items_query(self, object_name: str, field_schema: dict, **object_arguments) -> str:
def _build_items_query(self, object_name: str, field_schema: dict, sub_page: Optional[int], **object_arguments) -> str:
"""
Special optimization needed for items stream. Starting October 3rd, 2022 items can only be reached through boards.
See https://developer.monday.com/api-reference/docs/items-queries#items-queries
"""
query = self._build_query(object_name, field_schema, limit=self.NESTED_OBJECTS_LIMIT_MAX_VALUE)
query = self._build_query(object_name, field_schema, limit=self.NESTED_OBJECTS_LIMIT_MAX_VALUE, page=sub_page)
arguments = self._get_object_arguments(**object_arguments)
return f"boards({arguments}){{{query}}}"

Expand Down Expand Up @@ -96,8 +97,11 @@ def get_request_params(
Combines queries to a single GraphQL query.
"""
limit = self.limit.eval(self.config)
page = next_page_token and next_page_token[self.NEXT_PAGE_TOKEN_FIELD_NAME]
if self.name == "items":
query_builder = self._build_items_query
# `items` stream use a separate pagination strategy where first level pages are across `boards` and sub-pages are across `items`
page, sub_page = page if page else (None, None)
query_builder = partial(self._build_items_query, sub_page=sub_page)
elif self.name == "teams":
query_builder = self._build_teams_query
else:
Expand All @@ -106,7 +110,7 @@ def get_request_params(
object_name=self.name,
field_schema=self._get_schema_root_properties(),
limit=limit or None,
page=next_page_token and next_page_token[self.NEXT_PAGE_TOKEN_FIELD_NAME],
page=page,
)
return {"query": f"query{{{query}}}"}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping, Optional, Tuple

from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement


class ItemPaginationStrategy(PageIncrement):
"""
Page increment strategy with subpages for the `items` stream.
From the `items` documentation https://developer.monday.com/api-reference/docs/items:
Please note that you cannot return more than 100 items per query when using items at the root.
To adjust your query, try only returning items on a specific board, nesting items inside a boards query,
looping through the boards on your account, or querying less than 100 items at a time.
This pagination strategy supports nested loop through `boards` on the top level and `items` on the second.
See boards documentation for more details: https://developer.monday.com/api-reference/docs/boards#queries.
"""

def __post_init__(self, parameters: Mapping[str, Any]):
# `self._page` corresponds to board page number
# `self._sub_page` corresponds to item page number within its board
self.start_from_page = 1
self._page: Optional[int] = self.start_from_page
self._sub_page: Optional[int] = self.start_from_page

def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Tuple[Optional[int], Optional[int]]]:
"""
Determines page and subpage numbers for the `items` stream
Attributes:
response: Contains `boards` and corresponding lists of `items` for each `board`
last_records: Parsed `items` from the response
"""
if len(last_records) >= self.page_size:
self._sub_page += 1
else:
self._sub_page = self.start_from_page
if len(response.json()["data"]["boards"]):
self._page += 1
else:
return None

return self._page, self._sub_page
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ definitions:
paginator:
$ref: "#/definitions/default_paginator"
pagination_strategy:
$ref: "#/definitions/default_paginator/pagination_strategy"
page_size: 1
class_name: "source_monday.item_pagination_strategy.ItemPaginationStrategy"
type: "CustomPaginationStrategy"
page_size: 100
$parameters:
name: "items"
path: ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,46 +43,51 @@


@pytest.mark.parametrize(
("input_schema", "stream_name", "config", "graphql_query"),
("input_schema", "stream_name", "config", "graphql_query", "next_page_token"),
[
pytest.param(
nested_object_schema,
"test_stream",
{},
{"query": "query{test_stream(limit:100,page:2){root{nested{nested_of_nested}},sibling}}"},
{"next_page_token": 2},
id="test_get_request_params_produces_graphql_query_for_object_items"
),
pytest.param(
nested_array_schema,
"test_stream",
{},
{"query": "query{test_stream(limit:100,page:2){root{nested{nested_of_nested}},sibling}}"},
{"next_page_token": 2},
id="test_get_request_params_produces_graphql_query_for_array_items"
),
pytest.param(
nested_array_schema,
"items",
{},
{"query": "query{boards(limit:100,page:2){items(limit:100){root{nested{nested_of_nested}},sibling}}}"},
{"query": "query{boards(limit:100,page:2){items(limit:100,page:1){root{nested{nested_of_nested}},sibling}}}"},
{"next_page_token": (2, 1)},
id="test_get_request_params_produces_graphql_query_for_items_stream"
),
pytest.param(
nested_array_schema,
"teams",
{"teams_limit": 100},
{'query': 'query{teams(limit:100,page:2){id,name,picture_url,users(limit:100){id}}}'},
{"next_page_token": 2},
id="test_get_request_params_produces_graphql_query_for_teams_optimized_stream"
),
pytest.param(
nested_array_schema,
"teams",
{},
{'query': 'query{teams(limit:100,page:2){root{nested{nested_of_nested}},sibling}}'},
{"next_page_token": 2},
id="test_get_request_params_produces_graphql_query_for_teams_stream"
)
]
)
def test_get_request_params(mocker, input_schema, graphql_query, stream_name, config):
def test_get_request_params(mocker, input_schema, graphql_query, stream_name, config, next_page_token):
mocker.patch.object(MondayGraphqlRequester, "_get_schema_root_properties", return_value=input_schema)
requester = MondayGraphqlRequester(
name="a name",
Expand All @@ -99,5 +104,5 @@ def test_get_request_params(mocker, input_schema, graphql_query, stream_name, co
assert requester.get_request_params(
stream_state={},
stream_slice={},
next_page_token={"next_page_token": 2}
next_page_token=next_page_token
) == graphql_query
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from unittest.mock import MagicMock

import pytest
from source_monday.item_pagination_strategy import ItemPaginationStrategy


@pytest.mark.parametrize(
("response_json", "last_records", "expected"),
[
pytest.param(
{"data": {"boards": [{"items": [{"id": "1"}]}]}},
[{"id": "1"}],
(1, 2),
id="test_next_item_page_for_the_same_board",
),
pytest.param(
{"data": {"boards": [{"items": []}]}},
[],
(2, 1),
id="test_next_board_page_with_item_page_reset",
),
pytest.param(
{"data": {"boards": []}},
[],
None,
id="test_end_pagination",
)
]
)
def test_item_pagination_strategy(response_json, last_records, expected):
strategy = ItemPaginationStrategy(
page_size=1,
parameters={"items_per_page": 1},
)
response = MagicMock()
response.json.return_value = response_json

assert strategy.next_page_token(response, last_records) == expected
1 change: 1 addition & 0 deletions docs/integrations/sources/monday.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ The Monday connector should not run into Monday API limitations under normal usa

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------|
| 0.2.5 | 2023-05-22 | [225881](https://github.com/airbytehq/airbyte/pull/25881) | Fix pagination for the items stream |
| 0.2.4 | 2023-04-26 | [25277](https://github.com/airbytehq/airbyte/pull/25277) | Increase row limit to 100 |
| 0.2.3 | 2023-03-06 | [23231](https://github.com/airbytehq/airbyte/pull/23231) | Publish using low-code CDK Beta version |
| 0.2.2 | 2023-01-04 | [20996](https://github.com/airbytehq/airbyte/pull/20996) | Fix json schema loader |
Expand Down

0 comments on commit c296687

Please sign in to comment.