Skip to content

Commit

Permalink
✨Source Notion: add new stream comments (airbytehq#30324)
Browse files Browse the repository at this point in the history
Co-authored-by: ChristoGrab <ChristoGrab@users.noreply.github.com>
  • Loading branch information
2 people authored and ariesgun committed Oct 20, 2023
1 parent 354b77a commit f80aa39
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 92 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-notion/Dockerfile
Expand Up @@ -34,5 +34,5 @@ COPY source_notion ./source_notion
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.2.2
LABEL io.airbyte.version=1.3.0
LABEL io.airbyte.name=airbyte/source-notion
Expand Up @@ -26,6 +26,13 @@
"source_defined_cursor": true,
"default_cursor_field": "last_edited_time",
"json_schema": {}
},
{
"name": "comments",
"supported_sync_modes": ["incremental"],
"source_defined_cursor": true,
"default_cursor_field": "page_last_edited_time",
"json_schema": {}
}
]
}
Expand Up @@ -52,6 +52,20 @@
"cursor_field": ["last_edited_time"],
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "comments",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_primary_key": [["id"]],
"default_cursor_field": ["page_last_edited_time"]
},
"primary_key": [["id"]],
"cursor_field": ["page_last_edited_time"],
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}

Large diffs are not rendered by default.

Expand Up @@ -31,5 +31,16 @@
"name": "blocks"
}
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"last_edited_time": "2021-10-10T04:00:00.000Z"
},
"stream_descriptor": {
"name": "comments"
}
}
}
]
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 6e00b415-b02e-4160-bf02-58176a0ae687
dockerImageTag: 1.2.2
dockerImageTag: 1.3.0
dockerRepository: airbyte/source-notion
githubIssueLabel: source-notion
icon: notion.svg
Expand Down
Expand Up @@ -26,32 +26,38 @@
},
"type": {
"enum": [
"paragraph",
"bookmark",
"breadcrumb",
"bulleted_list_item",
"callout",
"child_database",
"child_page",
"code",
"column",
"column_list",
"divider",
"embed",
"equation",
"file",
"heading_1",
"heading_2",
"heading_3",
"callout",
"bulleted_list_item",
"image",
"link_preview",
"link_to_page",
"numbered_list_item",
"paragraph",
"pdf",
"quote",
"synced_block",
"table",
"table_of_contents",
"table_row",
"template",
"to_do",
"toggle",
"code",
"child_page",
"child_database",
"embed",
"image",
"video",
"file",
"pdf",
"bookmark",
"equation",
"unsupported",
"table_of_contents",
"column_list",
"column",
"divider",
"link_to_page",
"quote"
"video"
]
},
"paragraph": { "$ref": "text_element.json" },
Expand Down
@@ -0,0 +1,90 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": true,
"properties": {
"object": {
"enum": ["comment"]
},
"id": {
"type": "string"
},
"parent": {
"type": "object",
"properties": {
"type": {
"enum": ["page_id"]
},
"page_id": {
"type": "string"
}
}
},
"discussion_id": {
"type": "string"
},
"created_time": {
"type": "string"
},
"last_edited_time": {
"type": "string"
},
"page_last_edited_time": {
"type": "string"
},
"created_by": {
"$ref": "user.json"
},
"rich_text": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"text": {
"type": "object",
"properties": {
"content": {
"type": "string"
},
"link": {
"type": ["null", "object"]
}
}
},
"annotations": {
"type": "object",
"properties": {
"bold": {
"type": "boolean"
},
"italic": {
"type": "boolean"
},
"strikethrough": {
"type": "boolean"
},
"underline": {
"type": "boolean"
},
"code": {
"type": "boolean"
},
"color": {
"type": "string"
}
}
},
"plain_text": {
"type": "string"
},
"href": {
"type": ["null", "string"]
}
}
}
}
}
}
Expand Up @@ -17,6 +17,9 @@
},
"workspace": {
"type": "boolean"
},
"block_id": {
"type": "string"
}
}
}
Expand Up @@ -13,7 +13,7 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

from .streams import Blocks, Databases, Pages, Users
from .streams import Blocks, Comments, Databases, Pages, Users


class SourceNotion(AbstractSource):
Expand Down Expand Up @@ -61,5 +61,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
args = {"authenticator": authenticator, "config": config}
pages = Pages(**args)
blocks = Blocks(parent=pages, **args)
comments = Comments(parent=pages, **args)

return [Users(**args), Databases(**args), pages, blocks]
return [Users(**args), Databases(**args), pages, blocks, comments]
Expand Up @@ -322,3 +322,64 @@ def should_retry(self, response: requests.Response) -> bool:
else:
return super().should_retry(response)
return super().should_retry(response)


class Comments(HttpSubStream, IncrementalNotionStream):
"""
Comments Object Docs: https://developers.notion.com/reference/comment-object
Comments Endpoint Docs: https://developers.notion.com/reference/retrieve-a-comment
"""

http_method = "GET"
# We can use the "last edited time" of the parent Page as the cursor field,
# since we cannot guarantee the order of comments between pages.
cursor_field = "page_last_edited_time"

def path(self, **kwargs) -> str:
return "comments"

def request_params(
self, next_page_token: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
block_id = stream_slice.get("block_id")
params = {"block_id": block_id, "page_size": self.page_size}

if next_page_token:
params["start_cursor"] = next_page_token["next_cursor"]

return params

def parse_response(
self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs
) -> Iterable[Mapping]:

# Get the parent's "last edited time" to compare against state
page_last_edited_time = stream_slice.get("page_last_edited_time", "")
records = response.json().get("results", [])

for record in records:
record["page_last_edited_time"] = page_last_edited_time
state_last_edited_time = stream_state.get(self.cursor_field, "")

if isinstance(state_last_edited_time, StateValueWrapper):
state_last_edited_time = state_last_edited_time.value

if not stream_state or page_last_edited_time >= state_last_edited_time:
yield from transform_properties(record)

def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:

yield from IncrementalNotionStream.read_records(self, **kwargs)

def stream_slices(
self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, **kwargs
) -> Iterable[Optional[Mapping[str, Any]]]:

# Gather parent stream records in full
parent_records = self.parent.read_records(sync_mode=SyncMode.full_refresh, cursor_field=self.parent.cursor_field)

# The parent stream is the Pages stream, but we have to pass its id to the request_params as "block_id"
# because pages are also blocks in the Notion API.
# We also grab the last_edited_time from the parent record to use as the cursor field.
for record in parent_records:
yield {"block_id": record["id"], "page_last_edited_time": record["last_edited_time"]}

0 comments on commit f80aa39

Please sign in to comment.