Skip to content

Commit

Permalink
Source Salesforce: concurrent incremental syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Dec 15, 2023
1 parent 07519e5 commit 548bba6
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 35 deletions.
2 changes: 2 additions & 0 deletions airbyte-integrations/connectors/source-salesforce/main.py
Expand Up @@ -16,10 +16,12 @@
def _get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceSalesforce(
SourceSalesforce.read_catalog(catalog_path) if catalog_path else None,
SourceSalesforce.read_config(config_path) if config_path else None,
SourceSalesforce.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
Expand Down
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.2.1
dockerImageTag: 2.3.0
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Expand Up @@ -16,7 +16,8 @@
from airbyte_cdk.sources.message import InMemoryMessageRepository
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, NoopCursor
from airbyte_cdk.sources.streams.concurrent.state_converter import IsoMillisConcurrentStreamStateConverter
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -50,7 +51,7 @@ class SourceSalesforce(ConcurrentSourceAdapter):

message_repository = InMemoryMessageRepository(Level(AirbyteLogFormatter.level_mapping[logger.level]))

def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], **kwargs):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: AirbyteStateMessage, **kwargs):
if config:
concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY)
else:
Expand All @@ -61,6 +62,7 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
)
super().__init__(concurrent_source)
self.catalog = catalog
self.state = state

@staticmethod
def _get_sf_object(config: Mapping[str, Any]) -> Salesforce:
Expand Down Expand Up @@ -192,16 +194,37 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream_objects = sf.get_validated_streams(config=config, catalog=self.catalog)
streams = self.generate_streams(config, stream_objects, sf)
streams.append(Describe(sf_api=sf, catalog=self.catalog))
# TODO: incorporate state & ConcurrentCursor when we support incremental
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self.state)

configured_streams = []

for stream in streams:
sync_mode = self._get_sync_mode_from_catalog(stream)
if sync_mode == SyncMode.full_refresh:
configured_streams.append(StreamFacade.create_from_stream(stream, self, logger, None, NoopCursor()))
cursor = NoopCursor()
state = None
else:
configured_streams.append(stream)
state_converter = IsoMillisConcurrentStreamStateConverter(stream.cursor_field, self.DATETIME_FORMAT)
state = state_converter.get_concurrent_stream_state(
state_manager.get_stream_state(stream.name, stream.namespace)
)
cursor = ConcurrentCursor(
stream.name,
stream.namespace,
state,
self.message_repository,
state_manager,
state_converter,
CursorField(stream.cursor_field),
self._get_slice_boundary_fields(stream, state_manager),
)

configured_streams.append(StreamFacade.create_from_stream(stream, self, logger, state, cursor))
return configured_streams

def _get_slice_boundary_fields(self, stream: Stream, state_manager: ConnectorStateManager) -> Optional[Tuple[str, str]]:
return ("start_date", "end_date")

def _get_sync_mode_from_catalog(self, stream: Stream) -> Optional[SyncMode]:
if self.catalog:
for catalog_stream in self.catalog.streams:
Expand Down
Expand Up @@ -37,6 +37,7 @@

_ANY_CATALOG = ConfiguredAirbyteCatalog.parse_obj({"streams": []})
_ANY_CONFIG = {}
_ANY_STATE = None


@pytest.mark.parametrize(
Expand Down Expand Up @@ -65,7 +66,7 @@
def test_login_authentication_error_handler(
stream_config, requests_mock, login_status_code, login_json_resp, expected_error_msg, is_config_error
):
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE)
logger = logging.getLogger("airbyte")
requests_mock.register_uri(
"POST", "https://login.salesforce.com/services/oauth2/token", json=login_json_resp, status_code=login_status_code
Expand Down Expand Up @@ -345,7 +346,7 @@ def test_encoding_symbols(stream_config, stream_api, chunk_size, content_type_he
def test_check_connection_rate_limit(
stream_config, login_status_code, login_json_resp, discovery_status_code, discovery_resp_json, expected_error_msg
):
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE)
logger = logging.getLogger("airbyte")

with requests_mock.Mocker() as m:
Expand Down Expand Up @@ -382,7 +383,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state):
stream_1.page_size = 6
stream_1.state_checkpoint_interval = 5

source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE)
source.streams = Mock()
source.streams.return_value = streams
logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -438,7 +439,7 @@ def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state):
stream_1.state_checkpoint_interval = 3
configure_request_params_mock(stream_1, stream_2)

source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE)
source.streams = Mock()
source.streams.return_value = [stream_1, stream_2]

Expand Down Expand Up @@ -623,7 +624,7 @@ def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_
],
},
)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE)
source.catalog = catalog
streams = source.streams(config=stream_config)
expected_names = catalog_stream_names if catalog else stream_names
Expand All @@ -638,28 +639,6 @@ def test_forwarding_sobject_options(stream_config, stream_names, catalog_stream_
return


@pytest.mark.parametrize(
"stream_names,catalog_stream_names,",
(
(
["stream_1", "stream_2", "Describe"],
None,
),
(
["stream_1", "stream_2"],
["stream_1", "stream_2", "Describe"],
),
(
["stream_1", "stream_2", "stream_3", "Describe"],
["stream_1", "Describe"],
),
),
)
def test_unspecified_and_incremental_streams_are_not_concurrent(stream_config, stream_names, catalog_stream_names) -> None:
for stream in _get_streams(stream_config, stream_names, catalog_stream_names, SyncMode.incremental):
assert isinstance(stream, (SalesforceStream, Describe))


@pytest.mark.parametrize(
"stream_names,catalog_stream_names,",
(
Expand Down Expand Up @@ -723,7 +702,7 @@ def _get_streams(stream_config, stream_names, catalog_stream_names, sync_type) -
],
},
)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE)
source.catalog = catalog
return source.streams(config=stream_config)

Expand Down Expand Up @@ -908,7 +887,7 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
stream_config_date_format.update({"start_date": "2023-01-01"})
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config_date_format, stream_api)

source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG)
source = SourceSalesforce(_ANY_CATALOG, _ANY_CONFIG, _ANY_STATE)
source.streams = Mock()
source.streams.return_value = [stream]

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Expand Up @@ -154,6 +154,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.0 | 2023-12-15 | [33522](https://github.com/airbytehq/airbyte/pull/33522) | Sync streams concurrently in all sync modes |
| 2.2.1 | 2023-12-12 | [33342](https://github.com/airbytehq/airbyte/pull/33342) | Added new ContentDocumentLink stream |
| 2.2.0 | 2023-12-12 | [33350](https://github.com/airbytehq/airbyte/pull/33350) | Sync streams concurrently on full refresh |
| 2.1.6 | 2023-11-28 | [32535](https://github.com/airbytehq/airbyte/pull/32535) | Run full refresh syncs concurrently |
Expand Down

0 comments on commit 548bba6

Please sign in to comment.