Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Source Notion: add new stream comments #30324

Merged
merged 33 commits into from Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0eb77d4
add comments stream
ChristoGrab Sep 5, 2023
ee4fd6e
fix schema
ChristoGrab Sep 6, 2023
b006eaf
remove bad error handling
ChristoGrab Sep 6, 2023
d76bb90
add stream_slices for comments
ChristoGrab Sep 7, 2023
f3f05d5
comments full refresh works
ChristoGrab Sep 8, 2023
2de91e8
debugging state
ChristoGrab Sep 8, 2023
1143cca
fix stream_slice test
ChristoGrab Sep 8, 2023
84a738d
update test case for read_records
ChristoGrab Sep 8, 2023
2986453
test blocks
ChristoGrab Sep 8, 2023
039c741
incremental working
ChristoGrab Sep 9, 2023
3af8555
begin cleanup
ChristoGrab Sep 9, 2023
fd0b77a
fix comments test cases
ChristoGrab Sep 11, 2023
815cc4c
refactor tests
ChristoGrab Sep 11, 2023
0225a1d
change cursor in catalogs
ChristoGrab Sep 11, 2023
7d3baae
fix
ChristoGrab Sep 11, 2023
dc29bb7
update version
ChristoGrab Sep 11, 2023
7958845
small fix
ChristoGrab Sep 11, 2023
92b9234
add page_last_edited_time to schema
ChristoGrab Sep 11, 2023
3d707e9
changelog
ChristoGrab Sep 11, 2023
b860a31
remove test comment
ChristoGrab Sep 12, 2023
ab63140
fix format
ChristoGrab Sep 12, 2023
a70b519
remove extra args
ChristoGrab Sep 12, 2023
bedd64e
update expected_records
ChristoGrab Sep 13, 2023
e418087
add block_id and missing enum values in schema
ChristoGrab Sep 14, 2023
d56dfcb
format fix
ChristoGrab Sep 14, 2023
04d1ab6
update capabilities section in docs
ChristoGrab Sep 22, 2023
70702b8
Automated Commit - Formatting Changes
ChristoGrab Sep 22, 2023
befaa16
update docs page
ChristoGrab Oct 5, 2023
81ac2d3
Merge branch 'christo/notion-add-comment-stream' of https://github.co…
ChristoGrab Oct 5, 2023
417399b
Merge branch 'master' into christo/notion-add-comment-stream
ChristoGrab Oct 9, 2023
cfff846
Automated Commit - Formatting Changes
ChristoGrab Oct 9, 2023
05584a6
delete inapp doc
ChristoGrab Oct 9, 2023
3c9752a
remove fstring
ChristoGrab Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-notion/Dockerfile
Expand Up @@ -34,5 +34,5 @@ COPY source_notion ./source_notion
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.1.2
LABEL io.airbyte.version=1.2.0
LABEL io.airbyte.name=airbyte/source-notion
Expand Up @@ -26,6 +26,13 @@
"source_defined_cursor": true,
"default_cursor_field": "last_edited_time",
"json_schema": {}
},
{
"name": "comments",
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": "page_last_edited_time",
"json_schema": {}
}
]
}
Expand Up @@ -52,6 +52,20 @@
"cursor_field": ["last_edited_time"],
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "comments",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_primary_key": [["id"]],
"default_cursor_field": ["page_last_edited_time"]
},
"primary_key": [["id"]],
"cursor_field": ["page_last_edited_time"],
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Expand Up @@ -31,5 +31,16 @@
"name": "blocks"
}
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"last_edited_time": "2021-10-10T04:00:00.000Z"
},
"stream_descriptor": {
"name": "comments"
}
}
}
]
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 6e00b415-b02e-4160-bf02-58176a0ae687
dockerImageTag: 1.1.2
dockerImageTag: 1.2.0
dockerRepository: airbyte/source-notion
githubIssueLabel: source-notion
icon: notion.svg
Expand Down
@@ -0,0 +1,90 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": true,
"properties": {
"object": {
"enum": ["comment"]
},
"id": {
"type": "string"
},
"parent": {
"type": "object",
"properties": {
"type": {
"enum": ["page_id"]
},
"page_id": {
"type": "string"
}
}
},
"discussion_id": {
"type": "string"
},
"created_time": {
"type": "string"
},
"last_edited_time": {
"type": "string"
},
"page_last_edited_time": {
"type": "string"
},
"created_by": {
"$ref": "user.json"
},
"rich_text": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"text": {
"type": "object",
"properties": {
"content": {
"type": "string"
},
"link": {
"type": ["null", "object"]
}
}
},
"annotations": {
"type": "object",
"properties": {
"bold": {
"type": "boolean"
},
"italic": {
"type": "boolean"
},
"strikethrough": {
"type": "boolean"
},
"underline": {
"type": "boolean"
},
"code": {
"type": "boolean"
},
"color": {
"type": "string"
}
}
},
"plain_text": {
"type": "string"
},
"href": {
"type": ["null", "string"]
}
}
}
}
}
}
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

from .streams import Blocks, Databases, Pages, Users
from .streams import Blocks, Databases, Pages, Users, Comments


class NotionAuthenticator:
Expand Down Expand Up @@ -64,5 +64,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
args = {"authenticator": authenticator, "config": config}
pages = Pages(**args)
blocks = Blocks(parent=pages, **args)
comments = Comments(parent=pages, **args)

return [Users(**args), Databases(**args), pages, blocks]
return [Users(**args), Databases(**args), pages, blocks, comments]
Expand Up @@ -264,8 +264,8 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
# if reached recursive limit, don't read anymore
if len(self.block_id_stack) > MAX_BLOCK_DEPTH:
return

return
records = super().read_records(**kwargs)
for record in records:
if record.get("has_children", False):
Expand Down Expand Up @@ -298,3 +298,64 @@ def should_retry(self, response: requests.Response) -> bool:
else:
return super().should_retry(response)
return super().should_retry(response)


class Comments(HttpSubStream, IncrementalNotionStream):
"""
Comments Object Docs: https://developers.notion.com/reference/comment-object
Comments Endpoint Docs: https://developers.notion.com/reference/retrieve-a-comment
"""

http_method = "GET"
# We can use the "last edited time" of the parent Page as the cursor field,
# since we cannot guarantee the order of comments between pages.
cursor_field = "page_last_edited_time"

def path(self, **kwargs) -> str:
return f"comments"

def request_params(self, next_page_token: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
block_id = stream_slice.get("block_id")
params = {"block_id": block_id, "page_size": self.page_size}

if next_page_token:
params["start_cursor"] = next_page_token["next_cursor"]

return params

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:

# Get the parent's "last edited time" to compare against state
page_last_edited_time = stream_slice.get("page_last_edited_time", "")
records = response.json().get("results", [])

for record in records:
record["page_last_edited_time"] = page_last_edited_time
state_last_edited_time = stream_state.get(self.cursor_field, "")

if isinstance(state_last_edited_time, StateValueWrapper):
state_last_edited_time = state_last_edited_time.value

if not stream_state or page_last_edited_time >= state_last_edited_time:
yield from transform_properties(record)

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:

yield from IncrementalNotionStream.read_records(self, **kwargs)

def stream_slices(
self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:

# Gather parent stream records in full
parent_records = self.parent.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=self.parent.cursor_field
)

# The parent stream is the Pages stream, but we have to pass its id to the request_params as "block_id"
# because pages are also blocks in the Notion API.
# We also grab the last_edited_time from the parent record to use as the cursor field.
for record in parent_records:
yield {
"block_id": record["id"],
"page_last_edited_time": record["last_edited_time"]
}
Expand Up @@ -5,8 +5,8 @@
from unittest.mock import MagicMock, patch

from airbyte_cdk.models import SyncMode
from pytest import fixture
from source_notion.streams import Blocks, IncrementalNotionStream, Pages
from pytest import fixture, mark
from source_notion.streams import Blocks, IncrementalNotionStream, Pages, Comments


@fixture
Expand Down Expand Up @@ -37,6 +37,11 @@ def blocks(parent, args):
return Blocks(parent=parent, **args)


@fixture
def comments(parent, args):
return Comments(parent=parent, **args)


def test_cursor_field(stream):
expected_cursor_field = "last_edited_time"
assert stream.cursor_field == expected_cursor_field
Expand Down Expand Up @@ -203,3 +208,100 @@ def test_invalid_start_cursor(parent, requests_mock, caplog):
list(stream.read_records(**inputs))
assert search_endpoint.call_count == 6
assert f"Skipping stream pages, error message: {error_message}" in caplog.messages


# Tests for Comments stream
def test_comments_path(comments):
assert comments.path() == 'comments'


def test_comments_request_params(comments):
"""
Test that the request_params function returns the correct parameters for the Comments endpoint
"""
params = comments.request_params(
next_page_token=None,
stream_slice={
'block_id': 'block1',
'page_last_edited_time': '2021-01-01T00:00:00.000Z'
})

assert params == {
'block_id': 'block1',
'page_size': comments.page_size
}


def test_comments_stream_slices(comments, requests_mock):
"""
Test that the stream_slices function returns the parent page ids as "block_id" and the last edited time as "page_last_edited_time"
"""

inputs = {
'sync_mode': SyncMode.incremental,
'cursor_field': comments.cursor_field,
'stream_state': {}
}

requests_mock.post(
"https://api.notion.com/v1/search",
json={
"results":
[
{"name": "page_1", "id": "id_1", "last_edited_time": "2021-01-01T00:00:00.000Z"},
{"name": "page_2", "id": "id_2", "last_edited_time": "2021-20-01T00:00:00.000Z"}
],
"next_cursor": None})

expected_stream_slice = [
{"block_id": "id_1", "page_last_edited_time": "2021-01-01T00:00:00.000Z"},
{"block_id": "id_2", "page_last_edited_time": "2021-20-01T00:00:00.000Z"}
]

actual_stream_slices_list = list(comments.stream_slices(**inputs))
assert actual_stream_slices_list == expected_stream_slice


@mark.parametrize(
"stream_slice, stream_state, mock_data, expected_records",
[
# Test that comments with page_last_edited_time >= stream_state are replicated, regardless of each record's LMD
(
{"block_id": "block_id_1", "page_last_edited_time": "2023-10-10T00:00:00.000Z"},
{"page_last_edited_time": "2021-10-10T00:00:00.000Z"},
[
{"id": "comment_id_1", "rich_text": [{"type": "text", "text": {"content": "I am the Alpha comment"}}], "last_edited_time": "2021-01-01T00:00:00.000Z"},
{"id": "comment_id_2", "rich_text": [{"type": "text", "text": {"content": "I am the Omega comment"}}], "last_edited_time": "2022-12-31T00:00:00.000Z"}
],
[
{"id": "comment_id_1", "rich_text": [{"type": "text", "text": {"content": "I am the Alpha comment"}}], "last_edited_time": "2021-01-01T00:00:00.000Z", "page_last_edited_time": "2023-10-10T00:00:00.000Z"},
{"id": "comment_id_2", "rich_text": [{"type": "text", "text": {"content": "I am the Omega comment"}}], "last_edited_time": "2022-12-31T00:00:00.000Z", "page_last_edited_time": "2023-10-10T00:00:00.000Z"}
]
),
# Test that comments with page_last_edited_time < stream_state are not replicated, regardless of each record's LMD
(
{"block_id": "block_id_2", "page_last_edited_time": "2021-01-01T00:00:00.000Z"},
{"page_last_edited_time": "2022-20-20T00:00:00.000Z"},
[
{"id": "comment_id_1", "rich_text": [{"type": "text", "text": {"content": "I will not be replicated"}}], "last_edited_time": "2021-10-30T00:00:00.000Z"},
{"id": "comment_id_2", "rich_text": [{"type": "text", "text": {"content": "I will also not be replicated"}}], "last_edited_time": "2023-01-01T00:00:00.000Z"}
],
[]
),
]
)
def test_comments_read_records(comments, requests_mock, stream_slice, stream_state, mock_data, expected_records):
inputs = {
"sync_mode": SyncMode.incremental,
"cursor_field": comments.cursor_field,
"stream_state": stream_state,
"stream_slice": stream_slice
}

requests_mock.get(
f"https://api.notion.com/v1/comments?block_id={stream_slice['block_id']}",
json={"results": mock_data, "next_cursor": None}
)

response = list(comments.read_records(**inputs))
assert response == expected_records
Expand Up @@ -43,5 +43,5 @@ def test_streams(mocker):
source = SourceNotion()
config_mock = MagicMock()
streams = source.streams(config_mock)
expected_streams_number = 4
expected_streams_number = 5
assert len(streams) == expected_streams_number