From 4ac955228601027799f010874e3c9394898052da Mon Sep 17 00:00:00 2001 From: Cristina Mariscal <166420606+cmm-airbyte@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:51:19 -0600 Subject: [PATCH] Salesforce refactor: add CheckpointMixin for state management (#39517) Co-authored-by: cristina.mariscal --- .../connectors/source-salesforce/metadata.yaml | 2 +- .../connectors/source-salesforce/pyproject.toml | 2 +- .../source_salesforce/streams.py | 15 ++++++++++++--- docs/integrations/sources/salesforce.md | 1 + 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/metadata.yaml b/airbyte-integrations/connectors/source-salesforce/metadata.yaml index cf6160bab57377..8a060feabd66c3 100644 --- a/airbyte-integrations/connectors/source-salesforce/metadata.yaml +++ b/airbyte-integrations/connectors/source-salesforce/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: b117307c-14b6-41aa-9422-947e34922962 - dockerImageTag: 2.5.14 + dockerImageTag: 2.5.15 dockerRepository: airbyte/source-salesforce documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce githubIssueLabel: source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/pyproject.toml b/airbyte-integrations/connectors/source-salesforce/pyproject.toml index 56b352f84dbc53..00474028f4f5f9 100644 --- a/airbyte-integrations/connectors/source-salesforce/pyproject.toml +++ b/airbyte-integrations/connectors/source-salesforce/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.5.14" +version = "2.5.15" name = "source-salesforce" description = "Source implementation for Salesforce." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index f5ddead7d70567..7c8fc6ab146e08 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -21,7 +21,7 @@ from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import IsoMillisConcurrentStreamStateConverter -from airbyte_cdk.sources.streams.core import Stream, StreamData +from airbyte_cdk.sources.streams.core import CheckpointMixin, Stream, StreamData from airbyte_cdk.sources.streams.http import HttpClient, HttpStream, HttpSubStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from airbyte_cdk.utils import AirbyteTracedException @@ -695,7 +695,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any): return instance -class IncrementalRestSalesforceStream(RestSalesforceStream, ABC): +class IncrementalRestSalesforceStream(RestSalesforceStream, CheckpointMixin, ABC): state_checkpoint_interval = 500 _slice = None @@ -704,6 +704,7 @@ def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwar self.replication_key = replication_key self._stream_slice_step = stream_slice_step self._stream_slicer_cursor = None + self._state = {} def set_cursor(self, cursor: Cursor) -> None: self._stream_slicer_cursor = cursor @@ -764,7 +765,15 @@ def request_params( def cursor_field(self) -> str: return self.replication_key - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + @property + def state(self): + return self._state + + @state.setter + def state(self, value): + self._state = value + + def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object and returning an updated state object. Check if latest record is IN stream slice interval => ignore if not diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 113be9a9edd697..310f06e32e11e8 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -195,6 +195,7 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| +| 2.5.15 | 2024-06-16 | [39517](https://github.com/airbytehq/airbyte/pull/39517) | Salesforce refactor: add CheckpointMixin for state management | | 2.5.14 | 2024-06-06 | [39269](https://github.com/airbytehq/airbyte/pull/39269) | [autopull] Upgrade base image to v1.2.2 | | 2.5.13 | 2024-05-23 | [38563](https://github.com/airbytehq/airbyte/pull/38563) | Use HttpClient to perform HTTP requests for bulk, authentication and schema discovery | | 2.5.12 | 2024-05-16 | [38255](https://github.com/airbytehq/airbyte/pull/38255) | Replace AirbyteLogger with logging.Logger |