From 5b9cb34db5773f91446af6f4f7709d9eb79d5379 Mon Sep 17 00:00:00 2001 From: Catherine Noll Date: Wed, 13 Dec 2023 22:43:21 -0500 Subject: [PATCH] Source Salesforce: concurrent incremental syncs --- .../connectors/source-salesforce/main.py | 2 + .../source-salesforce/metadata.yaml | 2 +- .../source_salesforce/source.py | 33 ++++++++++++++--- .../source-salesforce/unit_tests/api_test.py | 37 ++++--------------- docs/integrations/sources/salesforce.md | 1 + 5 files changed, 40 insertions(+), 35 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/main.py b/airbyte-integrations/connectors/source-salesforce/main.py index 5ec9f05e104205..db0011c2616d23 100644 --- a/airbyte-integrations/connectors/source-salesforce/main.py +++ b/airbyte-integrations/connectors/source-salesforce/main.py @@ -16,10 +16,12 @@ def _get_source(args: List[str]): catalog_path = AirbyteEntrypoint.extract_catalog(args) config_path = AirbyteEntrypoint.extract_config(args) + state_path = AirbyteEntrypoint.extract_state(args) try: return SourceSalesforce( SourceSalesforce.read_catalog(catalog_path) if catalog_path else None, SourceSalesforce.read_config(config_path) if config_path else None, + SourceSalesforce.read_state(state_path) if state_path else None, ) except Exception as error: print( diff --git a/airbyte-integrations/connectors/source-salesforce/metadata.yaml b/airbyte-integrations/connectors/source-salesforce/metadata.yaml index c1a446b30196e0..9688083f23188e 100644 --- a/airbyte-integrations/connectors/source-salesforce/metadata.yaml +++ b/airbyte-integrations/connectors/source-salesforce/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: b117307c-14b6-41aa-9422-947e34922962 - dockerImageTag: 2.2.1 + dockerImageTag: 2.3.0 dockerRepository: airbyte/source-salesforce documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce githubIssueLabel: source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 30eea954dfe0fa..c3d567d274d405 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -16,7 +16,8 @@ from airbyte_cdk.sources.message import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade -from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, NoopCursor +from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import IsoMillisConcurrentStreamStateConverter from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator from airbyte_cdk.sources.utils.schema_helpers import InternalConfig from airbyte_cdk.utils.traced_exception import AirbyteTracedException @@ -50,7 +51,7 @@ class SourceSalesforce(ConcurrentSourceAdapter): message_repository = InMemoryMessageRepository(Level(AirbyteLogFormatter.level_mapping[logger.level])) - def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], **kwargs): + def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: AirbyteStateMessage, **kwargs): if config: concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY) else: @@ -61,6 +62,7 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional ) super().__init__(concurrent_source) self.catalog = catalog + self.state = state @staticmethod def _get_sf_object(config: Mapping[str, Any]) -> Salesforce: @@ -192,16 +194,37 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: stream_objects = sf.get_validated_streams(config=config, catalog=self.catalog) streams = self.generate_streams(config, stream_objects, sf) streams.append(Describe(sf_api=sf, catalog=self.catalog)) - # TODO: incorporate state & ConcurrentCursor when we support incremental + state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self.state) + configured_streams = [] + for stream in streams: sync_mode = self._get_sync_mode_from_catalog(stream) if sync_mode == SyncMode.full_refresh: - configured_streams.append(StreamFacade.create_from_stream(stream, self, logger, None, NoopCursor())) + cursor = NoopCursor() + state = None else: - configured_streams.append(stream) + state_converter = IsoMillisConcurrentStreamStateConverter(stream.cursor_field, self.DATETIME_FORMAT) + state = state_converter.get_concurrent_stream_state( + state_manager.get_stream_state(stream.name, stream.namespace) + ) + cursor = ConcurrentCursor( + stream.name, + stream.namespace, + state, + self.message_repository, + state_manager, + state_converter, + CursorField(stream.cursor_field), + self._get_slice_boundary_fields(stream, state_manager), + ) + + configured_streams.append(StreamFacade.create_from_stream(stream, self, logger, state, cursor)) return configured_streams + def _get_slice_boundary_fields(self, stream: Stream, state_manager: ConnectorStateManager) -> Optional[Tuple[str, str]]: + return ("start_date", "end_date") + def _get_sync_mode_from_catalog(self, stream: Stream) -> Optional[SyncMode]: if self.catalog: for catalog_stream in self.catalog.streams: diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 8f87e2bd58cddc..be9bc963664414 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -37,6 +37,7 @@ _ANY_CATALOG = ConfiguredAirbyteCatalog.parse_obj({"streams": []}) _ANY_CONFIG = {} +_ANY_STATE = None @pytest.mark.parametrize( @@ -65,7 +66,7 @@ def test_login_authentication_error_handler( stream_config, requests_mock, login_status_code, login_json_resp, expected_error_msg, is_config_error ): - source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG) + source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE) logger = logging.getLogger("airbyte") requests_mock.register_uri( "POST", "https://login.salesforce.com/services/oauth2/token", json=login_json_resp, status_code=login_status_code @@ -345,7 +346,7 @@ def test_encoding_symbols(stream_config, stream_api, chunk_size, content_type_he def test_check_connection_rate_limit( stream_config, login_status_code, login_json_resp, discovery_status_code, discovery_resp_json, expected_error_msg ): - source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG) + source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE) logger = logging.getLogger("airbyte") with requests_mock.Mocker() as m: @@ -382,7 +383,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state): stream_1.page_size = 6 stream_1.state_checkpoint_interval = 5 - source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG) + source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE) source.streams = Mock() source.streams.return_value = streams logger = logging.getLogger("airbyte") @@ -438,7 +439,7 @@ def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state): stream_1.state_checkpoint_interval = 3 configure_request_params_mock(stream_1, stream_2) - source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG) + source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE) source.streams = Mock() source.streams.return_value = [stream_1, stream_2] @@ -623,7 +624,7 @@ def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_ ], }, ) - source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG) + source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE) source.catalog = catalog streams = source.streams(config=stream_config) expected_names = catalog_stream_names if catalog else stream_names @@ -638,28 +639,6 @@ def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_ return -@pytest.mark.parametrize( - "stream_names,catalog_stream_names,", - ( - ( - ["stream_1", "stream_2", "Describe"], - None, - ), - ( - ["stream_1", "stream_2"], - ["stream_1", "stream_2", "Describe"], - ), - ( - ["stream_1", "stream_2", "stream_3", "Describe"], - ["stream_1", "Describe"], - ), - ), -) -def test_unspecified_and_incremental_streams_are_not_concurrent(stream_config, stream_names, catalog_stream_names) -> None: - for stream in _get_streams(stream_config, stream_names, catalog_stream_names, SyncMode.incremental): - assert isinstance(stream, (SalesforceStream, Describe)) - - @pytest.mark.parametrize( "stream_names,catalog_stream_names,", ( @@ -723,7 +702,7 @@ def _get_streams(stream_config, stream_names, catalog_stream_names, sync_type) - ], }, ) - source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG) + source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE) source.catalog = catalog return source.streams(config=stream_config) @@ -908,7 +887,7 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api stream_config_date_format.update({"start_date": "2023-01-01"}) stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config_date_format, stream_api) - source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG) + source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE) source.streams = Mock() source.streams.return_value = [stream] diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 80b29c6ba9378f..fb0402cf43e46a 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -154,6 +154,7 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| +| 2.3.0 | 2023-12-15 | [33522](https://github.com/airbytehq/airbyte/pull/33522) | Sync streams concurrently in all sync modes | | 2.2.1 | 2023-12-12 | [33342](https://github.com/airbytehq/airbyte/pull/33342) | Added new ContentDocumentLink stream | | 2.2.0 | 2023-12-12 | [33350](https://github.com/airbytehq/airbyte/pull/33350) | Sync streams concurrently on full refresh | | 2.1.6 | 2023-11-28 | [32535](https://github.com/airbytehq/airbyte/pull/32535) | Run full refresh syncs concurrently |