Skip to content

Commit

Permalink
🐛 Source Intercom: handle conversations stream HttpError: `404 - Us…
Browse files Browse the repository at this point in the history
…er Not Found` (#36277)
  • Loading branch information
bazarnov committed Mar 22, 2024
1 parent 878eb09 commit f11f1cb
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 29 deletions.
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerImageTag: 0.6.1
dockerImageTag: 0.6.2
dockerRepository: airbyte/source-intercom
documentationUrl: https://docs.airbyte.com/integrations/sources/intercom
githubIssueLabel: source-intercom
Expand Down
16 changes: 8 additions & 8 deletions airbyte-integrations/connectors/source-intercom/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.6.1"
version = "0.6.2"
name = "source-intercom"
description = "Source implementation for Intercom Yaml."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Expand Up @@ -33,6 +33,7 @@ class IncrementalSingleSliceCursor(Cursor):

def __post_init__(self, parameters: Mapping[str, Any]):
self._state = {}
self._cursor = None
self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)

def get_request_params(
Expand Down Expand Up @@ -87,12 +88,26 @@ def set_initial_state(self, stream_state: StreamState):
if cursor_value:
self._state[cursor_field] = cursor_value
self._state["prior_state"] = self._state.copy()
self._cursor = cursor_value

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
latest_record = self._state if self.is_greater_than_or_equal(self._state, most_recent_record) else most_recent_record
if latest_record:
cursor_field = self.cursor_field.eval(self.config)
self._state[cursor_field] = latest_record[cursor_field]
def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
:param stream_slice: The current slice, which may or may not contain the most recently observed record
:param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
"""
record_cursor_value = record.get(self.cursor_field.eval(self.config))
if not record_cursor_value:
return

if self.is_greater_than_or_equal(record, self._state):
self._cursor = record_cursor_value

def close_slice(self, stream_slice: StreamSlice) -> None:
cursor_field = self.cursor_field.eval(self.config)
self._state[cursor_field] = self._cursor

def stream_slices(self) -> Iterable[Mapping[str, Any]]:
yield StreamSlice(partition={}, cursor_slice={})
Expand Down Expand Up @@ -138,51 +153,86 @@ def __post_init__(self, parameters: Mapping[str, Any]):
self.parent_sync_mode: SyncMode = SyncMode.incremental if self.parent_stream.supports_incremental is True else SyncMode.full_refresh
self.substream_slice_field: str = self.parent_stream_configs[0].partition_field.eval(self.config)
self.parent_field: str = self.parent_stream_configs[0].parent_key.eval(self.config)
self._parent_cursor: Optional[str] = None

def set_initial_state(self, stream_state: StreamState):
super().set_initial_state(stream_state=stream_state)
if self.parent_stream_name in stream_state and stream_state.get(self.parent_stream_name, {}).get(self.parent_cursor_field):
parent_stream_state = {self.parent_cursor_field: stream_state[self.parent_stream_name][self.parent_cursor_field]}
parent_stream_state = {
self.parent_cursor_field: stream_state[self.parent_stream_name][self.parent_cursor_field],
}
self._state[self.parent_stream_name] = parent_stream_state
if "prior_state" in self._state:
self._state["prior_state"][self.parent_stream_name] = parent_stream_state

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
super().close_slice(stream_slice=stream_slice, most_recent_record=most_recent_record)
if self.parent_stream:
self._state[self.parent_stream_name] = self.parent_stream.state
def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Extended the default method to be able to track the parent STATE.
"""

# save parent cursor value (STATE) from slice
parent_cursor = stream_slice.get(self.parent_stream_name)
if parent_cursor:
self._parent_cursor = parent_cursor.get(self.parent_cursor_field)

# observe the substream
super().observe(stream_slice, record)

def close_slice(self, stream_slice: StreamSlice) -> None:
super().close_slice(stream_slice=stream_slice)

def stream_slices(self) -> Iterable[Mapping[str, Any]]:
parent_state = (self._state or {}).get(self.parent_stream_name, {})
slices_generator: Iterable[StreamSlice] = self.read_parent_stream(self.parent_sync_mode, self.parent_cursor_field, parent_state)
yield from [slice for slice in slices_generator] if self.parent_complete_fetch else slices_generator

def track_parent_cursor(self, parent_record: dict) -> None:
"""
Tracks the Parent Stream Cursor, using `parent_cursor_field`.
"""
self._parent_cursor = parent_record.get(self.parent_cursor_field)
if self._parent_cursor:
self._state[self.parent_stream_name] = {self.parent_cursor_field: self._parent_cursor}

def read_parent_stream(
self, sync_mode: SyncMode, cursor_field: Optional[str], stream_state: Mapping[str, Any]
self,
sync_mode: SyncMode,
cursor_field: Optional[str],
stream_state: Mapping[str, Any],
) -> Iterable[Mapping[str, Any]]:

self.parent_stream.state = stream_state

parent_stream_slices_gen = self.parent_stream.stream_slices(
sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state
sync_mode=sync_mode,
cursor_field=cursor_field,
stream_state=stream_state,
)

for parent_slice in parent_stream_slices_gen:
parent_records_gen = self.parent_stream.read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=parent_slice, stream_state=stream_state
sync_mode=sync_mode,
cursor_field=cursor_field,
stream_slice=parent_slice,
stream_state=stream_state,
)

for parent_record in parent_records_gen:
# update parent cursor
self.track_parent_cursor(parent_record)
substream_slice_value = parent_record.get(self.parent_field)
if substream_slice_value:
cursor_field = self.cursor_field.eval(self.config)
substream_cursor_value = self._state.get(cursor_field)
parent_cursor_value = self._state.get(self.parent_stream_name, {}).get(self.parent_cursor_field)
yield StreamSlice(
partition={
self.substream_slice_field: substream_slice_value,
},
cursor_slice={
cursor_field: self._state.get(cursor_field),
cursor_field: substream_cursor_value,
self.parent_stream_name: {
self.parent_cursor_field: self._state.get(self.parent_stream_name, {}).get(self.parent_cursor_field)
self.parent_cursor_field: parent_cursor_value,
},
},
)
Expand Down
@@ -1,4 +1,4 @@
version: "0.50.2"
version: 0.72.1

definitions:
## bases
Expand All @@ -20,7 +20,8 @@ definitions:
type: BearerAuthenticator
api_token: "{{ config['access_token'] }}"
request_headers:
# API version header
# There is a bug in interpolation, causing the `2.10` string to be evaluated to `2.1`, cutting off the `0`.
# the workaround is to put the `string` inside the `string`, then it's evaluated properly to `2.10`
Intercom-Version: "'2.10'"
Accept: "application/json"
error_handler:
Expand Down Expand Up @@ -292,6 +293,16 @@ definitions:
page_size: 150
conversations:
$ref: "#/definitions/stream_incremental_search"
retriever:
$ref: "#/definitions/stream_full_refresh/retriever"
requester:
$ref: "#/definitions/requester_incremental_search"
request_headers:
# API version header
# There are 404 - User Not Found issue, when `2.10` is used, for certain users:
# https://github.com/airbytehq/oncall/issues/4514
Intercom-Version: "2.9"
Accept: "application/json"
$parameters:
name: "conversations"
path: "conversations/search"
Expand Down
Expand Up @@ -53,7 +53,8 @@ def get_requester():
def test_slicer():
date_time_dict = {"updated_at": 1662459010}
slicer = IncrementalSingleSliceCursor(config={}, parameters={}, cursor_field="updated_at")
slicer.close_slice(date_time_dict, date_time_dict)
slicer.observe(date_time_dict, date_time_dict)
slicer.close_slice(date_time_dict)
assert slicer.get_stream_state() == date_time_dict
assert slicer.get_request_headers() == {}
assert slicer.get_request_body_data() == {}
Expand Down Expand Up @@ -95,7 +96,8 @@ def test_sub_slicer(last_record, expected, records):
)
slicer.set_initial_state(expected)
stream_slice = next(slicer.stream_slices()) if records else {}
slicer.close_slice(stream_slice, last_record)
slicer.observe(stream_slice, last_record)
slicer.close_slice(stream_slice)
assert slicer.get_stream_state() == expected


Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/intercom.md
Expand Up @@ -74,6 +74,7 @@ The Intercom connector should not run into Intercom API limitations under normal

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------|
| 0.6.2 | 2024-03-22 | [36277](https://github.com/airbytehq/airbyte/pull/36277) | Fixed the bug for `conversations` stream failed due to `404 - User Not Found`, when the `2.10` API version is used |
| 0.6.1 | 2024-03-18 | [36232](https://github.com/airbytehq/airbyte/pull/36232) | Fixed the bug caused the regression when setting the `Intercom-Version` header, updated the source to use the latest CDK version |
| 0.6.0 | 2024-02-12 | [35176](https://github.com/airbytehq/airbyte/pull/35176) | Update the connector to use `2.10` API version |
| 0.5.1 | 2024-02-12 | [35148](https://github.com/airbytehq/airbyte/pull/35148) | Manage dependencies with Poetry. |
Expand Down

0 comments on commit f11f1cb

Please sign in to comment.