Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Intercom: handle conversations stream HttpError: 404 - User Not Found #36277

Merged
merged 9 commits into from Mar 22, 2024
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerImageTag: 0.6.1
dockerImageTag: 0.6.2
dockerRepository: airbyte/source-intercom
documentationUrl: https://docs.airbyte.com/integrations/sources/intercom
githubIssueLabel: source-intercom
Expand Down
16 changes: 8 additions & 8 deletions airbyte-integrations/connectors/source-intercom/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.6.1"
version = "0.6.2"
name = "source-intercom"
description = "Source implementation for Intercom Yaml."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Expand Up @@ -33,6 +33,7 @@ class IncrementalSingleSliceCursor(Cursor):

def __post_init__(self, parameters: Mapping[str, Any]):
self._state = {}
self._cursor = None
self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)

def get_request_params(
Expand Down Expand Up @@ -87,12 +88,26 @@ def set_initial_state(self, stream_state: StreamState):
if cursor_value:
self._state[cursor_field] = cursor_value
self._state["prior_state"] = self._state.copy()
self._cursor = cursor_value

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
latest_record = self._state if self.is_greater_than_or_equal(self._state, most_recent_record) else most_recent_record
if latest_record:
cursor_field = self.cursor_field.eval(self.config)
self._state[cursor_field] = latest_record[cursor_field]
def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.

:param stream_slice: The current slice, which may or may not contain the most recently observed record
:param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
"""
record_cursor_value = record.get(self.cursor_field.eval(self.config))
if not record_cursor_value:
return

if self.is_greater_than_or_equal(record, self._state):
self._cursor = record_cursor_value

def close_slice(self, stream_slice: StreamSlice) -> None:
cursor_field = self.cursor_field.eval(self.config)
self._state[cursor_field] = self._cursor

def stream_slices(self) -> Iterable[Mapping[str, Any]]:
yield StreamSlice(partition={}, cursor_slice={})
Expand Down Expand Up @@ -138,51 +153,86 @@ def __post_init__(self, parameters: Mapping[str, Any]):
self.parent_sync_mode: SyncMode = SyncMode.incremental if self.parent_stream.supports_incremental is True else SyncMode.full_refresh
self.substream_slice_field: str = self.parent_stream_configs[0].partition_field.eval(self.config)
self.parent_field: str = self.parent_stream_configs[0].parent_key.eval(self.config)
self._parent_cursor: Optional[str] = None

def set_initial_state(self, stream_state: StreamState):
super().set_initial_state(stream_state=stream_state)
if self.parent_stream_name in stream_state and stream_state.get(self.parent_stream_name, {}).get(self.parent_cursor_field):
parent_stream_state = {self.parent_cursor_field: stream_state[self.parent_stream_name][self.parent_cursor_field]}
parent_stream_state = {
self.parent_cursor_field: stream_state[self.parent_stream_name][self.parent_cursor_field],
}
self._state[self.parent_stream_name] = parent_stream_state
if "prior_state" in self._state:
self._state["prior_state"][self.parent_stream_name] = parent_stream_state

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
super().close_slice(stream_slice=stream_slice, most_recent_record=most_recent_record)
if self.parent_stream:
self._state[self.parent_stream_name] = self.parent_stream.state
def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Extended the default method to be able to track the parent STATE.
"""

# save parent cursor value (STATE) from slice
parent_cursor = stream_slice.get(self.parent_stream_name)
if parent_cursor:
self._parent_cursor = parent_cursor.get(self.parent_cursor_field)

# observe the substream
super().observe(stream_slice, record)

def close_slice(self, stream_slice: StreamSlice) -> None:
super().close_slice(stream_slice=stream_slice)

def stream_slices(self) -> Iterable[Mapping[str, Any]]:
parent_state = (self._state or {}).get(self.parent_stream_name, {})
slices_generator: Iterable[StreamSlice] = self.read_parent_stream(self.parent_sync_mode, self.parent_cursor_field, parent_state)
yield from [slice for slice in slices_generator] if self.parent_complete_fetch else slices_generator

def track_parent_cursor(self, parent_record: dict) -> None:
"""
Tracks the Parent Stream Cursor, using `parent_cursor_field`.
"""
self._parent_cursor = parent_record.get(self.parent_cursor_field)
if self._parent_cursor:
self._state[self.parent_stream_name] = {self.parent_cursor_field: self._parent_cursor}

def read_parent_stream(
self, sync_mode: SyncMode, cursor_field: Optional[str], stream_state: Mapping[str, Any]
self,
sync_mode: SyncMode,
cursor_field: Optional[str],
stream_state: Mapping[str, Any],
) -> Iterable[Mapping[str, Any]]:

self.parent_stream.state = stream_state

parent_stream_slices_gen = self.parent_stream.stream_slices(
sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state
sync_mode=sync_mode,
cursor_field=cursor_field,
stream_state=stream_state,
)

for parent_slice in parent_stream_slices_gen:
parent_records_gen = self.parent_stream.read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=parent_slice, stream_state=stream_state
sync_mode=sync_mode,
cursor_field=cursor_field,
stream_slice=parent_slice,
stream_state=stream_state,
)

for parent_record in parent_records_gen:
# update parent cursor
self.track_parent_cursor(parent_record)
substream_slice_value = parent_record.get(self.parent_field)
if substream_slice_value:
cursor_field = self.cursor_field.eval(self.config)
substream_cursor_value = self._state.get(cursor_field)
parent_cursor_value = self._state.get(self.parent_stream_name, {}).get(self.parent_cursor_field)
yield StreamSlice(
partition={
self.substream_slice_field: substream_slice_value,
},
cursor_slice={
cursor_field: self._state.get(cursor_field),
cursor_field: substream_cursor_value,
self.parent_stream_name: {
self.parent_cursor_field: self._state.get(self.parent_stream_name, {}).get(self.parent_cursor_field)
self.parent_cursor_field: parent_cursor_value,
},
},
)
Expand Down
@@ -1,4 +1,4 @@
version: "0.50.2"
version: 0.72.1

definitions:
## bases
Expand All @@ -20,7 +20,8 @@ definitions:
type: BearerAuthenticator
api_token: "{{ config['access_token'] }}"
request_headers:
# API version header
# There is a bug in interpolation, causing the `2.10` string to be evaluated to `2.1`, cutting off the `0`.
# the workaround is to put the `string` inside the `string`, then it's evaluated properly to `2.10`
Intercom-Version: "'2.10'"
Accept: "application/json"
error_handler:
Expand Down Expand Up @@ -292,6 +293,16 @@ definitions:
page_size: 150
conversations:
$ref: "#/definitions/stream_incremental_search"
retriever:
$ref: "#/definitions/stream_full_refresh/retriever"
requester:
$ref: "#/definitions/requester_incremental_search"
request_headers:
# API version header
# There are 404 - User Not Found issue, when `2.10` is used, for certain users:
# https://github.com/airbytehq/oncall/issues/4514
Intercom-Version: "2.9"
Accept: "application/json"
$parameters:
name: "conversations"
path: "conversations/search"
Expand Down
Expand Up @@ -53,7 +53,8 @@ def get_requester():
def test_slicer():
date_time_dict = {"updated_at": 1662459010}
slicer = IncrementalSingleSliceCursor(config={}, parameters={}, cursor_field="updated_at")
slicer.close_slice(date_time_dict, date_time_dict)
slicer.observe(date_time_dict, date_time_dict)
slicer.close_slice(date_time_dict)
assert slicer.get_stream_state() == date_time_dict
assert slicer.get_request_headers() == {}
assert slicer.get_request_body_data() == {}
Expand Down Expand Up @@ -95,7 +96,8 @@ def test_sub_slicer(last_record, expected, records):
)
slicer.set_initial_state(expected)
stream_slice = next(slicer.stream_slices()) if records else {}
slicer.close_slice(stream_slice, last_record)
slicer.observe(stream_slice, last_record)
slicer.close_slice(stream_slice)
assert slicer.get_stream_state() == expected


Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/intercom.md
Expand Up @@ -74,6 +74,7 @@ The Intercom connector should not run into Intercom API limitations under normal

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------|
| 0.6.2 | 2024-03-22 | [36277](https://github.com/airbytehq/airbyte/pull/36277) | Fixed the bug for `conversations` stream failed due to `404 - User Not Found`, when the `2.10` API version is used |
| 0.6.1 | 2024-03-18 | [36232](https://github.com/airbytehq/airbyte/pull/36232) | Fixed the bug caused the regression when setting the `Intercom-Version` header, updated the source to use the latest CDK version |
| 0.6.0 | 2024-02-12 | [35176](https://github.com/airbytehq/airbyte/pull/35176) | Update the connector to use `2.10` API version |
| 0.5.1 | 2024-02-12 | [35148](https://github.com/airbytehq/airbyte/pull/35148) | Manage dependencies with Poetry. |
Expand Down