Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Notion: Add Availability Strategy #30750

Merged
merged 18 commits into from Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-notion/Dockerfile
Expand Up @@ -34,5 +34,5 @@ COPY source_notion ./source_notion
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.2.0
LABEL io.airbyte.version=1.2.1
LABEL io.airbyte.name=airbyte/source-notion
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 6e00b415-b02e-4160-bf02-58176a0ae687
dockerImageTag: 1.2.0
dockerImageTag: 1.2.1
dockerRepository: airbyte/source-notion
githubIssueLabel: source-notion
icon: notion.svg
Expand Down
Expand Up @@ -3,21 +3,39 @@
#

from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, TypeVar
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, TypeVar

import pydantic
import requests
from airbyte_cdk.logger import AirbyteLogger as Logger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.streams.http.exceptions import UserDefinedBackoffException
from requests import HTTPError

from .utils import transform_properties

# maximum block hierarchy recursive request depth
MAX_BLOCK_DEPTH = 30


class NotionAvailabilityStrategy(HttpAvailabilityStrategy):
"""
Inherit from HttpAvailabilityStrategy with slight modification to 403 error message.
"""

def reasons_for_unavailable_status_codes(self, stream: Stream, logger: Logger, source: Source, error: HTTPError) -> Dict[int, str]:

reasons_for_codes: Dict[int, str] = {
requests.codes.FORBIDDEN: "This is likely due to insufficient permissions for your Notion integration. "
"Please make sure your integration has read access for the resources you are trying to sync"
}
return reasons_for_codes


class NotionStream(HttpStream, ABC):

url_base = "https://api.notion.com/v1/"
Expand All @@ -33,8 +51,8 @@ def __init__(self, config: Mapping[str, Any], **kwargs):
self.start_date = config["start_date"]

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None
def availability_strategy(self) -> HttpAvailabilityStrategy:
return NotionAvailabilityStrategy()

@staticmethod
def check_invalid_start_cursor(response: requests.Response):
Expand Down Expand Up @@ -70,9 +88,9 @@ def next_page_token(
"has_more": true,
"results": [ ... ]
}
Doc: https://developers.notion.com/reference/pagination
Doc: https://developers.notion.com/reference/intro#pagination
"""
next_cursor = response.json()["next_cursor"]
next_cursor = response.json().get("next_cursor")
if next_cursor:
return {"next_cursor": next_cursor}

Expand Down
Expand Up @@ -2,14 +2,15 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
import random
from http import HTTPStatus
from unittest.mock import MagicMock

import pytest
import requests
from airbyte_cdk.models import SyncMode
from source_notion.streams import Blocks, NotionStream, Users
from source_notion.streams import Blocks, NotionStream, Pages, Users


@pytest.fixture
Expand All @@ -35,6 +36,18 @@ def test_next_page_token(patch_base_class, requests_mock):
assert stream.next_page_token(**inputs) == expected_token


@pytest.mark.parametrize(
"response_json, expected_output",
[({"next_cursor": "some_cursor", "has_more": True}, {"next_cursor": "some_cursor"}), ({"has_more": False}, None), ({}, None)],
)
def test_next_page_token_with_no_cursor(patch_base_class, response_json, expected_output):
stream = NotionStream(config=MagicMock())
mock_response = MagicMock()
mock_response.json.return_value = response_json
result = stream.next_page_token(mock_response)
assert result == expected_output


def test_parse_response(patch_base_class, requests_mock):
stream = NotionStream(config=MagicMock())
requests_mock.get("https://dummy", json={"results": [{"a": 123}, {"b": "xx"}]})
Expand Down Expand Up @@ -167,3 +180,62 @@ def test_user_stream_handles_pagination_correctly(requests_mock):
records = stream.read_records(sync_mode=SyncMode.full_refresh)
records_length = sum(1 for _ in records)
assert records_length == 220


@pytest.mark.parametrize(
"stream,parent,url,status_code,response_content,expected_availability,expected_reason_substring",
[
(
Users,
None,
"https://api.notion.com/v1/users",
403,
b'{"object": "error", "status": 403, "code": "restricted_resource"}',
False,
"This is likely due to insufficient permissions for your Notion integration.",
),
(
Blocks,
Pages,
"https://api.notion.com/v1/blocks/123/children",
403,
b'{"object": "error", "status": 403, "code": "restricted_resource"}',
False,
"This is likely due to insufficient permissions for your Notion integration.",
),
(
Users,
None,
"https://api.notion.com/v1/users",
200,
b'{"object": "list", "results": [{"id": "123", "object": "user", "type": "person"}]}',
True,
None,
),
],
)
def test_403_error_handling(
requests_mock, stream, parent, url, status_code, response_content, expected_availability, expected_reason_substring
):
"""
Test that availability strategy flags streams with 403 error as unavailable
and returns custom Notion integration message.
"""

requests_mock.get(url=url, status_code=status_code, content=response_content)

if parent:
stream = stream(parent=parent, config=MagicMock())
stream.parent.stream_slices = MagicMock(return_value=[{"id": "123"}])
stream.parent.read_records = MagicMock(return_value=[{"id": "123", "object": "page"}])
else:
stream = stream(config=MagicMock())

is_available, reason = stream.check_availability(logger=logging.Logger, source=MagicMock())

assert is_available is expected_availability

if expected_reason_substring:
assert expected_reason_substring in reason
else:
assert reason is None
3 changes: 2 additions & 1 deletion docs/integrations/sources/notion.md
Expand Up @@ -106,7 +106,8 @@ The connector is restricted by Notion [request limits](https://developers.notion
## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------|
| :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------------------------- |
| 1.2.1 | 2023-10-08 | [30750](https://github.com/airbytehq/airbyte/pull/30750) | Add availability strategy |
| 1.2.0 | 2023-10-04 | [31053](https://github.com/airbytehq/airbyte/pull/31053) | Add undeclared fields for blocks and pages streams |
| 1.1.2 | 2023-08-30 | [29999](https://github.com/airbytehq/airbyte/pull/29999) | Update error handling during connection check |
| 1.1.1 | 2023-06-14 | [26535](https://github.com/airbytehq/airbyte/pull/26535) | Migrate from deprecated `authSpecification` to `advancedAuth` |
Expand Down