Skip to content

Commit

Permalink
Salesforce refactor: add CheckpointMixin for state management (#39517)
Browse files Browse the repository at this point in the history
Co-authored-by: cristina.mariscal <cristina.mariscal@cristina.mariscal--MacBook-Pro---DFJ27FJFXX>
  • Loading branch information
cmm-airbyte and cristina.mariscal committed Jun 17, 2024
1 parent 94234e5 commit 4ac9552
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.5.14
dockerImageTag: 2.5.15
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.5.14"
version = "2.5.15"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import IsoMillisConcurrentStreamStateConverter
from airbyte_cdk.sources.streams.core import Stream, StreamData
from airbyte_cdk.sources.streams.core import CheckpointMixin, Stream, StreamData
from airbyte_cdk.sources.streams.http import HttpClient, HttpStream, HttpSubStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.utils import AirbyteTracedException
Expand Down Expand Up @@ -695,7 +695,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):
return instance


class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
class IncrementalRestSalesforceStream(RestSalesforceStream, CheckpointMixin, ABC):
state_checkpoint_interval = 500
_slice = None

Expand All @@ -704,6 +704,7 @@ def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwar
self.replication_key = replication_key
self._stream_slice_step = stream_slice_step
self._stream_slicer_cursor = None
self._state = {}

def set_cursor(self, cursor: Cursor) -> None:
self._stream_slicer_cursor = cursor
Expand Down Expand Up @@ -764,7 +765,15 @@ def request_params(
def cursor_field(self) -> str:
return self.replication_key

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
@property
def state(self):
return self._state

@state.setter
def state(self, value):
self._state = value

def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state
object and returning an updated state object. Check if latest record is IN stream slice interval => ignore if not
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| 2.5.15 | 2024-06-16 | [39517](https://github.com/airbytehq/airbyte/pull/39517) | Salesforce refactor: add CheckpointMixin for state management |
| 2.5.14 | 2024-06-06 | [39269](https://github.com/airbytehq/airbyte/pull/39269) | [autopull] Upgrade base image to v1.2.2 |
| 2.5.13 | 2024-05-23 | [38563](https://github.com/airbytehq/airbyte/pull/38563) | Use HttpClient to perform HTTP requests for bulk, authentication and schema discovery |
| 2.5.12 | 2024-05-16 | [38255](https://github.com/airbytehq/airbyte/pull/38255) | Replace AirbyteLogger with logging.Logger |
Expand Down

0 comments on commit 4ac9552

Please sign in to comment.