diff --git a/airbyte-integrations/connectors/source-intercom/metadata.yaml b/airbyte-integrations/connectors/source-intercom/metadata.yaml index aff68e95b18b2..88db91af4a149 100644 --- a/airbyte-integrations/connectors/source-intercom/metadata.yaml +++ b/airbyte-integrations/connectors/source-intercom/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a - dockerImageTag: 0.6.1 + dockerImageTag: 0.6.2 dockerRepository: airbyte/source-intercom documentationUrl: https://docs.airbyte.com/integrations/sources/intercom githubIssueLabel: source-intercom diff --git a/airbyte-integrations/connectors/source-intercom/poetry.lock b/airbyte-integrations/connectors/source-intercom/poetry.lock index bd559e58c17ab..57a3a3020baab 100644 --- a/airbyte-integrations/connectors/source-intercom/poetry.lock +++ b/airbyte-integrations/connectors/source-intercom/poetry.lock @@ -2,13 +2,13 @@ [[package]] name = "airbyte-cdk" -version = "0.71.0" +version = "0.74.0" description = "A framework for writing Airbyte Connectors." optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ - {file = "airbyte-cdk-0.71.0.tar.gz", hash = "sha256:110959840681b770e9378f9bcbca7a4b50c75b11de74e9fb809112407c4f50fa"}, - {file = "airbyte_cdk-0.71.0-py3-none-any.whl", hash = "sha256:730365365e826311d88dc0a8a5ebbd6227cc41b3dc342ef1525061b6d93f889c"}, + {file = "airbyte-cdk-0.74.0.tar.gz", hash = "sha256:74241a055c205403a951383f43801067b7f451370e14d553d13d0cc476cbfff7"}, + {file = "airbyte_cdk-0.74.0-py3-none-any.whl", hash = "sha256:7e5b201d69ec0e7daab7e627dbc6add4dbba4a2f779132e86aaf6713650ff4d5"}, ] [package.dependencies] @@ -662,17 +662,17 @@ testing = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygm [[package]] name = "pytest-mock" -version = "3.12.0" +version = "3.14.0" description = "Thin-wrapper around the mock package for easier use with pytest" optional = false python-versions = ">=3.8" files = [ - {file = "pytest-mock-3.12.0.tar.gz", hash = "sha256:31a40f038c22cad32287bb43932054451ff5583ff094bca6f675df2f8bc1a6e9"}, - {file = "pytest_mock-3.12.0-py3-none-any.whl", hash = "sha256:0972719a7263072da3a21c7f4773069bcc7486027d7e8e1f81d98a47e701bc4f"}, + {file = "pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0"}, + {file = "pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f"}, ] [package.dependencies] -pytest = ">=5.0" +pytest = ">=6.2.5" [package.extras] dev = ["pre-commit", "pytest-asyncio", "tox"] diff --git a/airbyte-integrations/connectors/source-intercom/pyproject.toml b/airbyte-integrations/connectors/source-intercom/pyproject.toml index 853926f5be58f..0aba623b0c8ba 100644 --- a/airbyte-integrations/connectors/source-intercom/pyproject.toml +++ b/airbyte-integrations/connectors/source-intercom/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "0.6.1" +version = "0.6.2" name = "source-intercom" description = "Source implementation for Intercom Yaml." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-intercom/source_intercom/components.py b/airbyte-integrations/connectors/source-intercom/source_intercom/components.py index 6d9d29b45dfcb..75eb81e807bb1 100644 --- a/airbyte-integrations/connectors/source-intercom/source_intercom/components.py +++ b/airbyte-integrations/connectors/source-intercom/source_intercom/components.py @@ -33,6 +33,7 @@ class IncrementalSingleSliceCursor(Cursor): def __post_init__(self, parameters: Mapping[str, Any]): self._state = {} + self._cursor = None self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters) def get_request_params( @@ -87,12 +88,26 @@ def set_initial_state(self, stream_state: StreamState): if cursor_value: self._state[cursor_field] = cursor_value self._state["prior_state"] = self._state.copy() + self._cursor = cursor_value - def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: - latest_record = self._state if self.is_greater_than_or_equal(self._state, most_recent_record) else most_recent_record - if latest_record: - cursor_field = self.cursor_field.eval(self.config) - self._state[cursor_field] = latest_record[cursor_field] + def observe(self, stream_slice: StreamSlice, record: Record) -> None: + """ + Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. + + :param stream_slice: The current slice, which may or may not contain the most recently observed record + :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the + stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. + """ + record_cursor_value = record.get(self.cursor_field.eval(self.config)) + if not record_cursor_value: + return + + if self.is_greater_than_or_equal(record, self._state): + self._cursor = record_cursor_value + + def close_slice(self, stream_slice: StreamSlice) -> None: + cursor_field = self.cursor_field.eval(self.config) + self._state[cursor_field] = self._cursor def stream_slices(self) -> Iterable[Mapping[str, Any]]: yield StreamSlice(partition={}, cursor_slice={}) @@ -138,51 +153,86 @@ def __post_init__(self, parameters: Mapping[str, Any]): self.parent_sync_mode: SyncMode = SyncMode.incremental if self.parent_stream.supports_incremental is True else SyncMode.full_refresh self.substream_slice_field: str = self.parent_stream_configs[0].partition_field.eval(self.config) self.parent_field: str = self.parent_stream_configs[0].parent_key.eval(self.config) + self._parent_cursor: Optional[str] = None def set_initial_state(self, stream_state: StreamState): super().set_initial_state(stream_state=stream_state) if self.parent_stream_name in stream_state and stream_state.get(self.parent_stream_name, {}).get(self.parent_cursor_field): - parent_stream_state = {self.parent_cursor_field: stream_state[self.parent_stream_name][self.parent_cursor_field]} + parent_stream_state = { + self.parent_cursor_field: stream_state[self.parent_stream_name][self.parent_cursor_field], + } self._state[self.parent_stream_name] = parent_stream_state if "prior_state" in self._state: self._state["prior_state"][self.parent_stream_name] = parent_stream_state - def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: - super().close_slice(stream_slice=stream_slice, most_recent_record=most_recent_record) - if self.parent_stream: - self._state[self.parent_stream_name] = self.parent_stream.state + def observe(self, stream_slice: StreamSlice, record: Record) -> None: + """ + Extended the default method to be able to track the parent STATE. + """ + + # save parent cursor value (STATE) from slice + parent_cursor = stream_slice.get(self.parent_stream_name) + if parent_cursor: + self._parent_cursor = parent_cursor.get(self.parent_cursor_field) + + # observe the substream + super().observe(stream_slice, record) + + def close_slice(self, stream_slice: StreamSlice) -> None: + super().close_slice(stream_slice=stream_slice) def stream_slices(self) -> Iterable[Mapping[str, Any]]: parent_state = (self._state or {}).get(self.parent_stream_name, {}) slices_generator: Iterable[StreamSlice] = self.read_parent_stream(self.parent_sync_mode, self.parent_cursor_field, parent_state) yield from [slice for slice in slices_generator] if self.parent_complete_fetch else slices_generator + def track_parent_cursor(self, parent_record: dict) -> None: + """ + Tracks the Parent Stream Cursor, using `parent_cursor_field`. + """ + self._parent_cursor = parent_record.get(self.parent_cursor_field) + if self._parent_cursor: + self._state[self.parent_stream_name] = {self.parent_cursor_field: self._parent_cursor} + def read_parent_stream( - self, sync_mode: SyncMode, cursor_field: Optional[str], stream_state: Mapping[str, Any] + self, + sync_mode: SyncMode, + cursor_field: Optional[str], + stream_state: Mapping[str, Any], ) -> Iterable[Mapping[str, Any]]: + self.parent_stream.state = stream_state parent_stream_slices_gen = self.parent_stream.stream_slices( - sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state + sync_mode=sync_mode, + cursor_field=cursor_field, + stream_state=stream_state, ) for parent_slice in parent_stream_slices_gen: parent_records_gen = self.parent_stream.read_records( - sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=parent_slice, stream_state=stream_state + sync_mode=sync_mode, + cursor_field=cursor_field, + stream_slice=parent_slice, + stream_state=stream_state, ) for parent_record in parent_records_gen: + # update parent cursor + self.track_parent_cursor(parent_record) substream_slice_value = parent_record.get(self.parent_field) if substream_slice_value: cursor_field = self.cursor_field.eval(self.config) + substream_cursor_value = self._state.get(cursor_field) + parent_cursor_value = self._state.get(self.parent_stream_name, {}).get(self.parent_cursor_field) yield StreamSlice( partition={ self.substream_slice_field: substream_slice_value, }, cursor_slice={ - cursor_field: self._state.get(cursor_field), + cursor_field: substream_cursor_value, self.parent_stream_name: { - self.parent_cursor_field: self._state.get(self.parent_stream_name, {}).get(self.parent_cursor_field) + self.parent_cursor_field: parent_cursor_value, }, }, ) diff --git a/airbyte-integrations/connectors/source-intercom/source_intercom/manifest.yaml b/airbyte-integrations/connectors/source-intercom/source_intercom/manifest.yaml index 69d2219095fb6..dc9c32e65f371 100644 --- a/airbyte-integrations/connectors/source-intercom/source_intercom/manifest.yaml +++ b/airbyte-integrations/connectors/source-intercom/source_intercom/manifest.yaml @@ -1,4 +1,4 @@ -version: "0.50.2" +version: 0.72.1 definitions: ## bases @@ -20,7 +20,8 @@ definitions: type: BearerAuthenticator api_token: "{{ config['access_token'] }}" request_headers: - # API version header + # There is a bug in interpolation, causing the `2.10` string to be evaluated to `2.1`, cutting off the `0`. + # the workaround is to put the `string` inside the `string`, then it's evaluated properly to `2.10` Intercom-Version: "'2.10'" Accept: "application/json" error_handler: @@ -292,6 +293,16 @@ definitions: page_size: 150 conversations: $ref: "#/definitions/stream_incremental_search" + retriever: + $ref: "#/definitions/stream_full_refresh/retriever" + requester: + $ref: "#/definitions/requester_incremental_search" + request_headers: + # API version header + # There are 404 - User Not Found issue, when `2.10` is used, for certain users: + # https://github.com/airbytehq/oncall/issues/4514 + Intercom-Version: "2.9" + Accept: "application/json" $parameters: name: "conversations" path: "conversations/search" diff --git a/airbyte-integrations/connectors/source-intercom/unit_tests/test_components.py b/airbyte-integrations/connectors/source-intercom/unit_tests/test_components.py index 2da6517110ece..1a1681aabdb73 100644 --- a/airbyte-integrations/connectors/source-intercom/unit_tests/test_components.py +++ b/airbyte-integrations/connectors/source-intercom/unit_tests/test_components.py @@ -53,7 +53,8 @@ def get_requester(): def test_slicer(): date_time_dict = {"updated_at": 1662459010} slicer = IncrementalSingleSliceCursor(config={}, parameters={}, cursor_field="updated_at") - slicer.close_slice(date_time_dict, date_time_dict) + slicer.observe(date_time_dict, date_time_dict) + slicer.close_slice(date_time_dict) assert slicer.get_stream_state() == date_time_dict assert slicer.get_request_headers() == {} assert slicer.get_request_body_data() == {} @@ -95,7 +96,8 @@ def test_sub_slicer(last_record, expected, records): ) slicer.set_initial_state(expected) stream_slice = next(slicer.stream_slices()) if records else {} - slicer.close_slice(stream_slice, last_record) + slicer.observe(stream_slice, last_record) + slicer.close_slice(stream_slice) assert slicer.get_stream_state() == expected diff --git a/docs/integrations/sources/intercom.md b/docs/integrations/sources/intercom.md index 80e5ebef4eb05..0a61208ec0a7a 100644 --- a/docs/integrations/sources/intercom.md +++ b/docs/integrations/sources/intercom.md @@ -74,6 +74,7 @@ The Intercom connector should not run into Intercom API limitations under normal | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------| +| 0.6.2 | 2024-03-22 | [36277](https://github.com/airbytehq/airbyte/pull/36277) | Fixed the bug for `conversations` stream failed due to `404 - User Not Found`, when the `2.10` API version is used | | 0.6.1 | 2024-03-18 | [36232](https://github.com/airbytehq/airbyte/pull/36232) | Fixed the bug caused the regression when setting the `Intercom-Version` header, updated the source to use the latest CDK version | | 0.6.0 | 2024-02-12 | [35176](https://github.com/airbytehq/airbyte/pull/35176) | Update the connector to use `2.10` API version | | 0.5.1 | 2024-02-12 | [35148](https://github.com/airbytehq/airbyte/pull/35148) | Manage dependencies with Poetry. |