From 104cf4b22a9f3fe15e62b28910732156c4d69f9a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 12 Jul 2023 20:17:41 +0200 Subject: [PATCH 01/46] Source SalesForce: bump airbyte_cdk version --- airbyte-integrations/connectors/source-salesforce/Dockerfile | 2 +- .../connectors/source-salesforce/metadata.yaml | 3 +-- airbyte-integrations/connectors/source-salesforce/setup.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 84f30cbc8d4ef..6e152d6446254 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -34,5 +34,5 @@ RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=2.1.1 +LABEL io.airbyte.version=2.1.2 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/metadata.yaml b/airbyte-integrations/connectors/source-salesforce/metadata.yaml index 0dbb0e59ec818..d8ec8c3901f87 100644 --- a/airbyte-integrations/connectors/source-salesforce/metadata.yaml +++ b/airbyte-integrations/connectors/source-salesforce/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: api connectorType: source definitionId: b117307c-14b6-41aa-9422-947e34922962 - dockerImageTag: 2.1.1 + dockerImageTag: 2.1.2 dockerRepository: airbyte/source-salesforce githubIssueLabel: source-salesforce icon: salesforce.svg @@ -13,7 +13,6 @@ data: name: Salesforce registries: cloud: - dockerImageTag: 2.0.9 enabled: true oss: enabled: true diff --git a/airbyte-integrations/connectors/source-salesforce/setup.py b/airbyte-integrations/connectors/source-salesforce/setup.py index 7e47fce763a01..2204c7f03aecb 100644 --- a/airbyte-integrations/connectors/source-salesforce/setup.py +++ b/airbyte-integrations/connectors/source-salesforce/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "vcrpy==4.1.1", "pandas"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.44", "pandas"] TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6", "requests_mock", "connector-acceptance-test", "pytest-timeout"] From a536748841c2426cd0b86c2d69df040b5c652075 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 18 Jul 2023 18:03:19 +0200 Subject: [PATCH 02/46] Source SalesForce: handle bulk API errors --- .../source_salesforce/streams.py | 31 ++++++++--- .../source-salesforce/unit_tests/api_test.py | 53 ++++++++++++++++++- 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 10c19e9537ed3..4652963828211 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -15,11 +15,12 @@ import pandas as pd import pendulum import requests # type: ignore[import] -from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode +from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType, SyncMode from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.core import Stream, StreamData from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from airbyte_cdk.utils import AirbyteTracedException from numpy import nan from pendulum import DateTime # type: ignore[attr-defined] from requests import codes, exceptions @@ -347,11 +348,16 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: f"The stream '{self.name}' is not queryable, " f"sobject options: {self.sobject_options}, error message: '{error_message}'" ) + elif ( + error.response.status_code == codes.BAD_REQUEST + and error_code == "API_ERROR" + and error_message.startswith("Implementation restriction") + ): + message = f"Unable to sync '{self.name}'. To prevent future syncs from failing, ensure the authenticated user has \"View all Data\" permissions." + self.logger.error(message) elif error.response.status_code == codes.BAD_REQUEST and error_code == "LIMIT_EXCEEDED": - self.logger.error( - f"Cannot receive data for stream '{self.name}' ," - f"sobject options: {self.sobject_options}, error message: '{error_message}'" - ) + message = "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed." + self.logger.error(message) else: raise error else: @@ -368,7 +374,20 @@ def wait_for_job(self, url: str) -> str: # this value was received empirically time.sleep(0.5) while pendulum.now() < expiration_time: - job_info = self._send_http_request("GET", url=url).json() + try: + job_info = self._send_http_request("GET", url=url).json() + except exceptions.HTTPError as error: + error_data = error.response.json()[0] + error_code = error_data.get("errorCode") + error_message = error_data.get("message", "") + if ( + "We can't complete the action because enabled transaction security policies took too long to complete." in error_message + and error_code == "TXN_SECURITY_METERING_ERROR" + ): + message = 'A transient authentication error occurred. To prevent future syncs from failing, assign the "Exempt from Transaction Security" user permission to the authenticated user.' + raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error) + else: + raise error job_status = job_info["state"] if job_status in ["JobComplete", "Aborted", "Failed"]: if job_status != "JobComplete": diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 49f10f73bef4e..64b9565ee7530 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -13,6 +13,7 @@ import pytest import requests_mock from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type +from airbyte_cdk.utils import AirbyteTracedException from conftest import encoding_symbols_parameters, generate_stream from requests.exceptions import HTTPError from source_salesforce.api import Salesforce @@ -61,7 +62,7 @@ def test_bulk_stream_fallback_to_rest(mocker, requests_mock, stream_config, stre assert list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)) == rest_stream_records -def test_stream_unsupported_by_bulk(stream_config, stream_api, caplog): +def test_stream_unsupported_by_bulk(stream_config, stream_api): """ Stream `AcceptedEventRelation` is not supported by BULK API, so that REST API stream will be used for it. """ @@ -661,3 +662,53 @@ def test_stream_with_no_records_in_response(stream_config, stream_api_v2_pk_too_ ) records = list(stream.read_records(sync_mode=SyncMode.full_refresh)) assert records == [] + + +@pytest.mark.parametrize( + "status_code,response_json,log_message", + [ + (400, [{"errorCode": "INVALIDENTITY", "message": "Account is not supported by the Bulk API"}], "Account is not supported by the Bulk API"), + (403, [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "API limit reached"}], "API limit reached"), + (400, [{"errorCode": "API_ERROR", "message": "API does not support query"}], "The stream 'Account' is not queryable,"), + (400, [{"errorCode": "API_ERROR", "message": "Implementation restriction: Account only allows security evaluation for non-admin users when LIMIT is specified and at most 1000"}], f"Unable to sync 'Account'. To prevent future syncs from failing, ensure the authenticated user has \"View all Data\" permissions."), + (400, [{"errorCode": "LIMIT_EXCEEDED", "message": "Max bulk v2 query jobs (10000) per 24 hrs has been reached (10021)"}], "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed.") + ] +) +def test_bulk_stream_error_in_logs_on_create_job(requests_mock, stream_config, stream_api, status_code, response_json, log_message, caplog): + """ + """ + stream = generate_stream("Account", stream_config, stream_api) + url = f"{stream.sf_api.instance_url}/services/data/{stream.sf_api.version}/jobs/query" + requests_mock.register_uri( + "POST", + url, + status_code=status_code, + json=response_json, + ) + query = "Select Id, Subject from Account" + with caplog.at_level(logging.ERROR): + assert stream.create_stream_job(query, url) is None, "this stream should be skipped" + + # check logs + assert log_message in caplog.records[-1].message + + +@pytest.mark.parametrize( + "status_code,response_json,error_message", + [ + (400, [{"errorCode": "TXN_SECURITY_METERING_ERROR", "message": "We can't complete the action because enabled transaction security policies took too long to complete."}], 'A transient authentication error occurred. To prevent future syncs from failing, assign the "Exempt from Transaction Security" user permission to the authenticated user.'), + ] +) +def test_bulk_stream_error_on_wait_for_job(requests_mock, stream_config, stream_api, status_code, response_json, error_message): + + stream = generate_stream("Account", stream_config, stream_api) + url = f"{stream.sf_api.instance_url}/services/data/{stream.sf_api.version}/jobs/query/queryJobId" + requests_mock.register_uri( + "GET", + url, + status_code=status_code, + json=response_json, + ) + with pytest.raises(AirbyteTracedException) as e: + stream.wait_for_job(url=url) + assert e.value.message == error_message \ No newline at end of file From 096ed102da26ca6b3f9c3ce808e3b56963ccfce7 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Tue, 18 Jul 2023 22:00:20 +0200 Subject: [PATCH 03/46] Source SalesForce: update docs --- docs/integrations/sources/salesforce.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index c960f149f59ae..7f43f7ca5a46f 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -1,8 +1,6 @@ # Salesforce -Setting up the Salesforce source connector involves creating a read-only Salesforce user and configuring the Salesforce connector through the Airbyte UI. - -This page guides you through the process of setting up the Salesforce source connector. +This page contains the setup guide and reference information for the Salesforce source connector. ## Prerequisites @@ -40,7 +38,7 @@ To create a dedicated read only Salesforce user: -**For Airbyte Cloud:** +**For Airbyte Cloud:** To set up Salesforce as a source in Airbyte Cloud: @@ -67,7 +65,7 @@ To set up Salesforce as a source in Airbyte Open Source: 2. When running a curl command, run it with the `-L` option to follow any redirects. 3. If you [created a read-only user](https://docs.google.com/document/d/1wZR8pz4MRdc2zUculc9IqoF8JxN87U40IqVnTtcqdrI/edit#heading=h.w5v6h7b2a9y4), use the user credentials when logging in to generate OAuth tokens. -2. Navigate to the Airbute Open Source dashboard and follow the same steps as [setting up Salesforce as a source in Airbyte Cloud](#for-airbyte-cloud). +2. Navigate to the Airbyte Open Source dashboard and follow the same steps as [setting up Salesforce as a source in Airbyte Cloud](#airbyte-cloud-setup). ## Supported sync modes @@ -120,7 +118,7 @@ Airbyte fetches and handles all the possible and available streams dynamically b - TaskStatus - UndecidedEventRelation -## Salesforce tutorials +## Tutorials Now that you have set up the Salesforce source connector, check out the following Salesforce tutorials: From 8916568060440424e0472c0cbb951343d7c10b4e Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 19 Jul 2023 12:49:43 +0200 Subject: [PATCH 04/46] Source SalesForce: bump CDK version --- airbyte-integrations/connectors/source-salesforce/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/setup.py b/airbyte-integrations/connectors/source-salesforce/setup.py index 2204c7f03aecb..e8f9de037b860 100644 --- a/airbyte-integrations/connectors/source-salesforce/setup.py +++ b/airbyte-integrations/connectors/source-salesforce/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.44", "pandas"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.46", "pandas"] TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6", "requests_mock", "connector-acceptance-test", "pytest-timeout"] From 8ffcf4101f6f66d5429bedd9f1028b121f35468c Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 21 Jul 2023 18:57:23 +0200 Subject: [PATCH 05/46] Source SalesForce: raise config error --- .../connectors/source-salesforce/source_salesforce/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 4652963828211..5ca55e0e633d2 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -354,7 +354,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: and error_message.startswith("Implementation restriction") ): message = f"Unable to sync '{self.name}'. To prevent future syncs from failing, ensure the authenticated user has \"View all Data\" permissions." - self.logger.error(message) + raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error) elif error.response.status_code == codes.BAD_REQUEST and error_code == "LIMIT_EXCEEDED": message = "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed." self.logger.error(message) From 9d3445660b7f1c27930c25fdd49c75c02ab46215 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 24 Jul 2023 18:48:41 +0200 Subject: [PATCH 06/46] Source SalesForce: add stream slice test --- .../source-salesforce/unit_tests/api_test.py | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 64b9565ee7530..c6bd2a7cdf73d 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -10,6 +10,8 @@ from datetime import datetime from unittest.mock import Mock +import freezegun +import pendulum import pytest import requests_mock from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type @@ -670,7 +672,7 @@ def test_stream_with_no_records_in_response(stream_config, stream_api_v2_pk_too_ (400, [{"errorCode": "INVALIDENTITY", "message": "Account is not supported by the Bulk API"}], "Account is not supported by the Bulk API"), (403, [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "API limit reached"}], "API limit reached"), (400, [{"errorCode": "API_ERROR", "message": "API does not support query"}], "The stream 'Account' is not queryable,"), - (400, [{"errorCode": "API_ERROR", "message": "Implementation restriction: Account only allows security evaluation for non-admin users when LIMIT is specified and at most 1000"}], f"Unable to sync 'Account'. To prevent future syncs from failing, ensure the authenticated user has \"View all Data\" permissions."), + (400, [{"errorCode": "API_ERROR", "message": "Implementation restriction: Account only allows security evaluation for non-admin users when LIMIT is specified and at most 1000"}], "Unable to sync 'Account'. To prevent future syncs from failing, ensure the authenticated user has \"View all Data\" permissions."), (400, [{"errorCode": "LIMIT_EXCEEDED", "message": "Max bulk v2 query jobs (10000) per 24 hrs has been reached (10021)"}], "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed.") ] ) @@ -711,4 +713,20 @@ def test_bulk_stream_error_on_wait_for_job(requests_mock, stream_config, stream_ ) with pytest.raises(AirbyteTracedException) as e: stream.wait_for_job(url=url) - assert e.value.message == error_message \ No newline at end of file + assert e.value.message == error_message + + +@freezegun.freeze_time("2023-01-01") +def test_bulk_stream_slices(stream_config_date_format, stream_api): + stream: BulkIncrementalSalesforceStream = generate_stream("FakeBulkStream", stream_config_date_format, stream_api) + stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) + expected_slices = [] + today = pendulum.today(tz="UTC") + start_date = pendulum.parse(stream.start_date, tz="UTC") + while start_date < today: + expected_slices.append({ + 'start_date': start_date.isoformat(timespec="milliseconds"), + 'end_date': min(today, start_date.add(days=stream.STREAM_SLICE_STEP)).isoformat(timespec="milliseconds") + }) + start_date = start_date.add(days=stream.STREAM_SLICE_STEP) + assert expected_slices == stream_slices From 4ddb4642dd6673c42cecc623f2d9935baa43750d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 24 Jul 2023 22:40:37 +0200 Subject: [PATCH 07/46] Source SalesForce: add bulk pagination tests + fixes --- .../source_salesforce/source.py | 19 ++-- .../source_salesforce/spec.yaml | 7 +- .../source_salesforce/streams.py | 82 ++++++++--------- .../source-salesforce/unit_tests/api_test.py | 89 ++++++++++--------- .../unit_tests/test_memory.py | 7 +- 5 files changed, 105 insertions(+), 99 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 77243ea2b57ab..af9361e312519 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -21,6 +21,8 @@ from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalRestSalesforceStream, RestSalesforceStream +logger = logging.getLogger("airbyte") + class AirbyteStopSync(AirbyteTracedException): pass @@ -59,13 +61,21 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> return True, None @classmethod - def _get_api_type(cls, stream_name, properties): + def _get_api_type(cls, stream_name: str, properties: dict, force_use_bulk_api: bool) -> str: # Salesforce BULK API currently does not support loading fields with data type base64 and compound data properties_not_supported_by_bulk = { key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"] } - rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk - if rest_required: + rest_only = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS + if rest_only: + logger.warning(f"BULK API is not supported for stream: {stream_name}") + return "rest" + if force_use_bulk_api and properties_not_supported_by_bulk: + logger.warning( + f"Following properties will be excluded from stream: {stream_name} due to BULK API limitations: {list(properties_not_supported_by_bulk)}" + ) + return "bulk" + if properties_not_supported_by_bulk: return "rest" return "bulk" @@ -77,7 +87,6 @@ def generate_streams( sf_object: Salesforce, ) -> List[Stream]: """ "Generates a list of stream by their names. It can be used for different tests too""" - logger = logging.getLogger() authenticator = TokenAuthenticator(sf_object.access_token) stream_properties = sf_object.generate_schemas(stream_objects) streams = [] @@ -85,7 +94,7 @@ def generate_streams( streams_kwargs = {"sobject_options": sobject_options} selected_properties = stream_properties.get(stream_name, {}).get("properties", {}) - api_type = cls._get_api_type(stream_name, selected_properties) + api_type = cls._get_api_type(stream_name, selected_properties, config.get("force_use_bulk_api", False)) if api_type == "rest": full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream elif api_type == "bulk": diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml index af3a7655217ce..175610b6fccc3 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml @@ -53,9 +53,14 @@ connectionSpecification: - "2021-07-25T00:00:00Z" format: date-time order: 5 + force_use_bulk_api: + type: boolean + description: Toggle to Bulk API (this might cause empty fields for some streams) + default: false + order: 6 streams_criteria: type: array - order: 6 + order: 7 items: type: object required: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 5ca55e0e633d2..2d015a28d29dd 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -24,6 +24,7 @@ from numpy import nan from pendulum import DateTime # type: ignore[attr-defined] from requests import codes, exceptions +from requests.models import PreparedRequest from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce from .availability_strategy import SalesforceAvailabilityStrategy @@ -281,7 +282,6 @@ def _fetch_next_page_for_chunk( class BulkSalesforceStream(SalesforceStream): - page_size = 15000 DEFAULT_WAIT_TIMEOUT_SECONDS = 86400 # 24-hour bulk job running time MAX_CHECK_INTERVAL_SECONDS = 2.0 MAX_RETRY_NUMBER = 3 @@ -292,8 +292,8 @@ def path(self, next_page_token: Mapping[str, Any] = None, **kwargs: Any) -> str: transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization) @default_backoff_handler(max_tries=5, factor=15) - def _send_http_request(self, method: str, url: str, json: dict = None, stream: bool = False): - headers = self.authenticator.get_auth_header() + def _send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False): + headers = self.authenticator.get_auth_header() if not headers else headers | self.authenticator.get_auth_header() response = self._session.request(method, url=url, headers=headers, json=json, stream=stream) if response.status_code not in [200, 204]: self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}") @@ -441,7 +441,7 @@ def filter_null_bytes(self, b: bytes): self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res)) return res - def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]: + def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dict]: """ Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitations. @ url: string - the url of the `executed_job` @@ -450,13 +450,16 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]: """ # set filepath for binary data from response tmp_file = os.path.realpath(os.path.basename(url)) - with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response, open(tmp_file, "wb") as data_file: + with closing(self._send_http_request("GET", url, headers={"Accept-Encoding": "gzip"}, stream=True)) as response, open( + tmp_file, "wb" + ) as data_file: response_encoding = response.encoding or self.encoding + response_headers = response.headers for chunk in response.iter_content(chunk_size=chunk_size): data_file.write(self.filter_null_bytes(chunk)) # check the file exists if os.path.isfile(tmp_file): - return tmp_file, response_encoding + return tmp_file, response_encoding, response_headers else: raise TmpFileIOError(f"The IO/Error occured while verifying binary data. Stream: {self.name}, file {tmp_file} doesn't exist.") @@ -496,10 +499,17 @@ def availability_strategy(self) -> Optional["AvailabilityStrategy"]: return None def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: - if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - return {"next_token": f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' "} # type: ignore[index] return None + def get_query_select_fields(self) -> str: + return ", ".join( + { + key: value + for key, value in self.get_json_schema().get("properties", {}).items() + if value.get("format") != "base64" and "object" not in value["type"] + } + ) + def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: @@ -507,13 +517,11 @@ def request_params( Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm """ - selected_properties = self.get_json_schema().get("properties", {}) - query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " + select_fields = self.get_query_select_fields() + query = f"SELECT {select_fields} FROM {self.name}" if next_page_token: query += next_page_token["next_token"] - if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}" return {"q": query} def read_records( @@ -547,24 +555,19 @@ def read_records( ) return raise SalesforceException(f"Job for {self.name} stream using BULK API was failed.") + salesforce_bulk_api_locator = None + while True: + req = PreparedRequest() + req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator}) # 'maxRecords': 5 + tmp_file, response_encoding, response_headers = self.download_data(url=req.url) + for record in self.read_with_chunks(tmp_file, response_encoding): + yield record - count = 0 - record: Mapping[str, Any] = {} - for record in self.read_with_chunks(*self.download_data(url=job_full_url)): - count += 1 - yield record + if response_headers.get("Sforce-Locator", "null") == "null": + break + salesforce_bulk_api_locator = response_headers.get("Sforce-Locator") self.delete_job(url=job_full_url) - - if count < self.page_size: - # Salesforce doesn't give a next token or something to know the request was - # the last page. The connectors will sync batches in `page_size` and - # considers that batch is smaller than the `page_size` it must be the last page. - break - - next_page_token = self.next_page_token(record) - if not next_page_token: - # not found a next page data. - break + break def get_standard_instance(self) -> SalesforceStream: """Returns a instance of standard logic(non-BULK) with same settings""" @@ -651,17 +654,14 @@ def request_params( select_fields = ",".join(property_chunk.keys()) table_name = self.name where_conditions = [] - order_by_clause = "" if start_date: where_conditions.append(f"{self.cursor_field} >= {start_date}") if end_date: where_conditions.append(f"{self.cursor_field} < {end_date}") - if self.name not in UNSUPPORTED_FILTERING_STREAMS: - order_by_clause = f"ORDER BY {self.cursor_field} ASC" where_clause = f"WHERE {' AND '.join(where_conditions)}" - query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}" + query = f"SELECT {select_fields} FROM {table_name} {where_clause}" return {"q": query} @@ -681,30 +681,20 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream): - def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: - return None + state_checkpoint_interval = None def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: - start_date = max( - (stream_state or {}).get(self.cursor_field, ""), - (stream_slice or {}).get("start_date", ""), - (next_page_token or {}).get("start_date", ""), - ) + start_date = max((stream_state or {}).get(self.cursor_field, ""), (stream_slice or {}).get("start_date", "")) end_date = stream_slice["end_date"] - select_fields = ", ".join(self.get_json_schema().get("properties", {}).keys()) + select_fields = self.get_query_select_fields() table_name = self.name where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"] - order_by_clause = "" - - if self.name not in UNSUPPORTED_FILTERING_STREAMS: - order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field]) - order_by_clause = f"ORDER BY {order_by_fields} ASC" where_clause = f"WHERE {' AND '.join(where_conditions)}" - query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}" + query = f"SELECT {select_fields} FROM {table_name} {where_clause}" return {"q": query} diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index c6bd2a7cdf73d..1865a9b08c828 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -82,31 +82,26 @@ def test_stream_contains_unsupported_properties_by_bulk(stream_config, stream_ap stream = generate_stream(stream_name, stream_config, stream_api_v2) assert not isinstance(stream, BulkSalesforceStream) - -@pytest.mark.parametrize("item_number", [0, 15, 2000, 2324, 3000]) -def test_bulk_sync_pagination(item_number, stream_config, stream_api): +def test_bulk_sync_pagination(stream_config, stream_api, requests_mock): stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) - test_ids = [i for i in range(1, item_number)] - pages = [test_ids[i : i + stream.page_size] for i in range(0, len(test_ids), stream.page_size)] - if not pages: - pages = [[]] - with requests_mock.Mocker() as m: - creation_responses = [] + job_id = f"fake_job" + requests_mock.register_uri("POST", stream.path(), json={"id": job_id}) + requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) + resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)] + result_uri = requests_mock.register_uri("GET", stream.path() + f"/{job_id}/results", + [{'text': "\n".join(resp_text), "headers":{"Sforce-Locator": "somelocator_1"}}, + {'text': "\n".join(resp_text), "headers":{"Sforce-Locator": "somelocator_2"}}, + {'text': "\n".join(resp_text), "headers":{"Sforce-Locator": "null"}} + ]) + requests_mock.register_uri("DELETE", stream.path() + f"/{job_id}") - for page in range(len(pages)): - job_id = f"fake_job_{page}" - creation_responses.append({"json": {"id": job_id}}) - m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) - resp = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in pages[page]] - m.register_uri("GET", stream.path() + f"/{job_id}/results", text="\n".join(resp)) - m.register_uri("DELETE", stream.path() + f"/{job_id}") - m.register_uri("POST", stream.path(), creation_responses) - - stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental))) - loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)] - assert not set(test_ids).symmetric_difference(set(loaded_ids)) - post_request_count = len([r for r in m.request_history if r.method == "POST"]) - assert post_request_count == len(pages) + stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental))) + loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)] + assert loaded_ids == [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4] + assert result_uri.call_count == 3 + assert result_uri.request_history[1].query == "locator=somelocator_1" + assert result_uri.request_history[2].query == "locator=somelocator_2" + print(2) def _prepare_mock(m, stream): @@ -213,46 +208,51 @@ def test_stream_start_datetime_format_should_not_changed(stream_config, stream_a def test_download_data_filter_null_bytes(stream_config, stream_api): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=b"\x00") - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b"\x00") + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [] - m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00') - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00') + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [{"Id": "0014W000027f6UwQAI", "IsDeleted": "false"}] def test_read_with_chunks_should_return_only_object_data_type(stream_config, stream_api): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=b'"IsDeleted","Age"\n"false",24\n') - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b'"IsDeleted","Age"\n"false",24\n') + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [{"IsDeleted": "false", "Age": "24"}] def test_read_with_chunks_should_return_a_string_when_a_string_with_only_digits_is_provided(stream_config, stream_api): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=b'"ZipCode"\n"01234"\n') - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b'"ZipCode"\n"01234"\n') + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [{"ZipCode": "01234"}] def test_read_with_chunks_should_return_null_value_when_no_data_is_provided(stream_config, stream_api): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=b'"IsDeleted","Age","Name"\n"false",,"Airbyte"\n') - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b'"IsDeleted","Age","Name"\n"false",,"Airbyte"\n') + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [{"IsDeleted": "false", "Age": None, "Name": "Airbyte"}] @@ -262,12 +262,13 @@ def test_read_with_chunks_should_return_null_value_when_no_data_is_provided(stre ids=[f"charset: {x[1]}, chunk_size: {x[0]}" for x in encoding_symbols_parameters()], ) def test_encoding_symbols(stream_config, stream_api, chunk_size, content_type, content, expected_result): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", headers={"Content-Type": f"text/html; charset={content_type}"}, content=content) - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url, chunk_size=chunk_size))) + m.register_uri("GET", job_full_url_results, headers={"Content-Type": f"text/html; charset={content_type}"}, content=content) + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == expected_result @@ -477,7 +478,7 @@ def test_pagination_rest(stream_config, stream_api): def test_csv_reader_dialect_unix(): stream: BulkSalesforceStream = BulkSalesforceStream(stream_name=None, sf_api=None, pk=None) - url = "https://fake-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + url_results = "https://fake-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" data = [ {"Id": "1", "Name": '"first_name" "last_name"'}, @@ -493,8 +494,9 @@ def test_csv_reader_dialect_unix(): text = csvfile.getvalue() with requests_mock.Mocker() as m: - m.register_uri("GET", url + "/results", text=text) - result = [i for i in stream.read_with_chunks(*stream.download_data(url))] + m.register_uri("GET", url_results, text=text) + tmp_file, response_encoding, _ = stream.download_data(url=url_results) + result = [i for i in stream.read_with_chunks(tmp_file, response_encoding)] assert result == data @@ -672,7 +674,6 @@ def test_stream_with_no_records_in_response(stream_config, stream_api_v2_pk_too_ (400, [{"errorCode": "INVALIDENTITY", "message": "Account is not supported by the Bulk API"}], "Account is not supported by the Bulk API"), (403, [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "API limit reached"}], "API limit reached"), (400, [{"errorCode": "API_ERROR", "message": "API does not support query"}], "The stream 'Account' is not queryable,"), - (400, [{"errorCode": "API_ERROR", "message": "Implementation restriction: Account only allows security evaluation for non-admin users when LIMIT is specified and at most 1000"}], "Unable to sync 'Account'. To prevent future syncs from failing, ensure the authenticated user has \"View all Data\" permissions."), (400, [{"errorCode": "LIMIT_EXCEEDED", "message": "Max bulk v2 query jobs (10000) per 24 hrs has been reached (10021)"}], "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed.") ] ) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py index 197566f7637aa..75780d6938220 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py @@ -27,16 +27,17 @@ ], ) def test_memory_download_data(stream_config, stream_api, n_records, first_size, first_peak): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) content = b'"Id","IsDeleted"' for _ in range(n_records): content += b'"0014W000027f6UwQAI","false"\n' with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=content) + m.register_uri("GET", job_full_url_results, content=content) tracemalloc.start() - for x in stream.read_with_chunks(*stream.download_data(url=job_full_url)): + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + for x in stream.read_with_chunks(tmp_file, response_encoding): pass fs, fp = tracemalloc.get_traced_memory() first_size_in_mb, first_peak_in_mb = fs / 1024**2, fp / 1024**2 From bfdc7b6b3cd86237d2d88f45c5af5abb799d3852 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 24 Jul 2023 22:58:34 +0200 Subject: [PATCH 08/46] Source SalesForce: add logging + raise error instead of logs --- .../source-salesforce/source_salesforce/source.py | 2 ++ .../source-salesforce/source_salesforce/streams.py | 13 +++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index af9361e312519..721788683a955 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -67,6 +67,7 @@ def _get_api_type(cls, stream_name: str, properties: dict, force_use_bulk_api: b key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"] } rest_only = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS + logger.info(f"{stream_name=}, object-like and binary properties {properties_not_supported_by_bulk=}") if rest_only: logger.warning(f"BULK API is not supported for stream: {stream_name}") return "rest" @@ -95,6 +96,7 @@ def generate_streams( selected_properties = stream_properties.get(stream_name, {}).get("properties", {}) api_type = cls._get_api_type(stream_name, selected_properties, config.get("force_use_bulk_api", False)) + logger.info(f"{stream_name=} is of {api_type=}") if api_type == "rest": full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream elif api_type == "bulk": diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 2d015a28d29dd..23aaa800db36f 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -334,15 +334,15 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: f"sobject options: {self.sobject_options}, error message: '{error_message}'" ) elif error.response.status_code == codes.FORBIDDEN and error_code != "REQUEST_LIMIT_EXCEEDED": - self.logger.error( + raise SalesforceException( f"Cannot receive data for stream '{self.name}' ," f"sobject options: {self.sobject_options}, error message: '{error_message}'" - ) + ) from error elif error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": - self.logger.error( + raise SalesforceException( f"Cannot receive data for stream '{self.name}' ," f"sobject options: {self.sobject_options}, Error message: '{error_data.get('message')}'" - ) + ) from error elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"): self.logger.error( f"The stream '{self.name}' is not queryable, " @@ -357,7 +357,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error) elif error.response.status_code == codes.BAD_REQUEST and error_code == "LIMIT_EXCEEDED": message = "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed." - self.logger.error(message) + raise SalesforceException(message) from error else: raise error else: @@ -455,6 +455,7 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dic ) as data_file: response_encoding = response.encoding or self.encoding response_headers = response.headers + self.logger.info(f"job with {url=} download_data {response.headers=}") for chunk in response.iter_content(chunk_size=chunk_size): data_file.write(self.filter_null_bytes(chunk)) # check the file exists @@ -521,7 +522,7 @@ def request_params( query = f"SELECT {select_fields} FROM {self.name}" if next_page_token: query += next_page_token["next_token"] - + self.logger.info(f"{query=}") return {"q": query} def read_records( From 5cc3b04dc8fa9a06e069e4e572e5c120e0de22fa Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Mon, 24 Jul 2023 20:40:01 -0700 Subject: [PATCH 09/46] Update Dockerfile --- .../connectors/source-salesforce/Dockerfile | 29 +++---------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 6e152d6446254..4ee90ce8a6a15 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -1,17 +1,7 @@ -# Using alpine to remove several vulnerabilities frm slim image -# https://security-tracker.debian.org/tracker/CVE-2023-29383 -# https://security-tracker.debian.org/tracker/CVE-2023-31484 -# https://security-tracker.debian.org/tracker/CVE-2016-2781 -FROM python:3.9-alpine3.18 - - -RUN apk add --update --no-cache \ - build-base \ - openssl-dev \ - libffi-dev \ - zlib-dev \ - bzip2-dev +FROM python:3.9-slim +# Bash is installed for more convenient debugging. +RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" @@ -19,18 +9,7 @@ WORKDIR /airbyte/integration_code COPY source_salesforce ./source_salesforce COPY setup.py ./ COPY main.py ./ - -# Fixing https://nvd.nist.gov/vuln/detail/CVE-2022-40897 -# calling this twice as one upgrades the system pip /usr/local/bin/pip the -# seconf time upgrades the under for the venv /opt/.venv/bin/pip -RUN pip install --upgrade pip setuptools wheel && \ - pip install . -RUN pip install --upgrade pip setuptools - -# add default timezone settings -ENV TZ UTC -RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone - +RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] From e703670e19d461472f2904a734bc712b6bb80351 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 26 Jul 2023 10:48:48 +0200 Subject: [PATCH 10/46] Source SalesForce: fix start_date in BULK request params --- .../connectors/source-salesforce/source_salesforce/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 23aaa800db36f..809fb2fb6220f 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -687,7 +687,7 @@ class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSales def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: - start_date = max((stream_state or {}).get(self.cursor_field, ""), (stream_slice or {}).get("start_date", "")) + start_date = stream_slice["start_date"] end_date = stream_slice["end_date"] select_fields = self.get_query_select_fields() From 64e3f5c94aeb971041d26532d48c32d2ce74b5d1 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 26 Jul 2023 14:30:30 +0200 Subject: [PATCH 11/46] Source SalesForce: add pattern descriptor --- .../connectors/source-salesforce/source_salesforce/spec.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml index 175610b6fccc3..28394669515fa 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml @@ -44,10 +44,11 @@ connectionSpecification: start_date: title: Start Date description: >- - Enter the date in the YYYY-MM-DD format. Airbyte will replicate the data added on and after this date. If this field is blank, Airbyte will replicate the data for last two years. + Enter the date (or date-time) in the YYYY-MM-DD or YYYY-MM-DDTHH:mm:ssZ format. Airbyte will replicate the data added on and after this date. If this field is blank, Airbyte will replicate the data for last two years. type: string pattern: >- ^([0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?)$ + pattern_descriptor: "YYYY-MM-DD or YYYY-MM-DDTHH:mm:ssZ" examples: - "2021-07-25" - "2021-07-25T00:00:00Z" From f2c51bc97f6b516ebf039f0657f097a6d59802f9 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 26 Jul 2023 15:41:17 +0200 Subject: [PATCH 12/46] Source SalesForce: update description in spec --- .../connectors/source-salesforce/source_salesforce/spec.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml index 28394669515fa..ffcd53197ff06 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml @@ -87,8 +87,7 @@ connectionSpecification: title: Search value order: 2 title: Filter Salesforce Objects - description: >- - Filter streams relevant to you + description: Add filters to select only required stream based on `SObject` name. See the docs for more information. advanced_auth: auth_flow_type: oauth2.0 predicate_key: From 954445ff4dc4ffafd3fc00cb00dceeffac969bd9 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 26 Jul 2023 18:46:42 +0200 Subject: [PATCH 13/46] Source SalesForce: update docs --- .../source-salesforce/source_salesforce/spec.yaml | 3 ++- docs/integrations/sources/salesforce.md | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml index ffcd53197ff06..1e518beefc404 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml @@ -55,8 +55,9 @@ connectionSpecification: format: date-time order: 5 force_use_bulk_api: + title: Force to use BULK API type: boolean - description: Toggle to Bulk API (this might cause empty fields for some streams) + description: Toggle to use Bulk API (this might cause empty fields for some streams) default: false order: 6 streams_criteria: diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 7f43f7ca5a46f..6efe2c638650e 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -85,6 +85,13 @@ The Salesforce connector retrieves deleted records from Salesforce. For the stre The Salesforce connector is restricted by Salesforce’s [Daily Rate Limits](https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_api.htm). The connector syncs data until it hits the daily rate limit, then ends the sync early with success status, and starts the next sync from where it left off. Note that picking up from where it ends will work only for incremental sync, which is why we recommend using the [Incremental Sync - Deduped History](https://docs.airbyte.com/understanding-airbyte/connections/incremental-deduped-history) sync mode. +:::tip + +Airbyte recommends to use at least [Enterpise edition](https://help.salesforce.com/s/articleView?id=sf.users_add_products_subscription_management.htm&type=5) for daily base syncs. +You may also turn on feature `Force to use BULK API` for faster syncs. + +::: + ## Supported Objects The Salesforce connector supports reading both Standard Objects and Custom Objects from Salesforce. Each object is read as a separate stream. See a list of all Salesforce Standard Objects [here](https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_list.htm). From 83f58062fa4b75b08fe6ed94cd474b14853dd603 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Jul 2023 16:16:40 +0200 Subject: [PATCH 14/46] Revert "Source SalesForce: add logging + raise error instead of logs" This reverts commit bfdc7b6b3cd86237d2d88f45c5af5abb799d3852. --- .../source-salesforce/source_salesforce/source.py | 2 -- .../source-salesforce/source_salesforce/streams.py | 13 ++++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 721788683a955..af9361e312519 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -67,7 +67,6 @@ def _get_api_type(cls, stream_name: str, properties: dict, force_use_bulk_api: b key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"] } rest_only = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS - logger.info(f"{stream_name=}, object-like and binary properties {properties_not_supported_by_bulk=}") if rest_only: logger.warning(f"BULK API is not supported for stream: {stream_name}") return "rest" @@ -96,7 +95,6 @@ def generate_streams( selected_properties = stream_properties.get(stream_name, {}).get("properties", {}) api_type = cls._get_api_type(stream_name, selected_properties, config.get("force_use_bulk_api", False)) - logger.info(f"{stream_name=} is of {api_type=}") if api_type == "rest": full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream elif api_type == "bulk": diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 809fb2fb6220f..38193822995f8 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -334,15 +334,15 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: f"sobject options: {self.sobject_options}, error message: '{error_message}'" ) elif error.response.status_code == codes.FORBIDDEN and error_code != "REQUEST_LIMIT_EXCEEDED": - raise SalesforceException( + self.logger.error( f"Cannot receive data for stream '{self.name}' ," f"sobject options: {self.sobject_options}, error message: '{error_message}'" - ) from error + ) elif error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": - raise SalesforceException( + self.logger.error( f"Cannot receive data for stream '{self.name}' ," f"sobject options: {self.sobject_options}, Error message: '{error_data.get('message')}'" - ) from error + ) elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"): self.logger.error( f"The stream '{self.name}' is not queryable, " @@ -357,7 +357,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error) elif error.response.status_code == codes.BAD_REQUEST and error_code == "LIMIT_EXCEEDED": message = "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed." - raise SalesforceException(message) from error + self.logger.error(message) else: raise error else: @@ -455,7 +455,6 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dic ) as data_file: response_encoding = response.encoding or self.encoding response_headers = response.headers - self.logger.info(f"job with {url=} download_data {response.headers=}") for chunk in response.iter_content(chunk_size=chunk_size): data_file.write(self.filter_null_bytes(chunk)) # check the file exists @@ -522,7 +521,7 @@ def request_params( query = f"SELECT {select_fields} FROM {self.name}" if next_page_token: query += next_page_token["next_token"] - self.logger.info(f"{query=}") + return {"q": query} def read_records( From 66273acff78dcf715cc1d123f9803ce121dba221 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Jul 2023 22:15:05 +0200 Subject: [PATCH 15/46] Source Salesforce: update docs --- docs/integrations/sources/salesforce.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index caebec22a2273..4b696312fc815 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -138,6 +138,7 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| +| 2.1.2 | 2023-07-06 | [28781](https://github.com/airbytehq/airbyte/pull/28781) | Fix pagination for BULK API jobs; Addoption to force use BULK API | | 2.1.1 | 2023-07-06 | [28021](https://github.com/airbytehq/airbyte/pull/28021) | Several Vulnerabilities Fixes; switched to use alpine instead of slim, CVE-2022-40897, CVE-2023-29383, CVE-2023-31484, CVE-2016-2781 | | 2.1.0 | 2023-06-26 | [27726](https://github.com/airbytehq/airbyte/pull/27726) | License Update: Elv2 | | 2.0.14 | 2023-05-04 | [25794](https://github.com/airbytehq/airbyte/pull/25794) | Avoid pandas inferring wrong data types by forcing all data type as object | From c4f3fe19b9835b57c66b8bf90d76f0e8794fa21d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Jul 2023 22:35:06 +0200 Subject: [PATCH 16/46] Source Salesforce: Code format --- .../source-salesforce/unit_tests/api_test.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 1865a9b08c828..bd9af16276e28 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -82,17 +82,19 @@ def test_stream_contains_unsupported_properties_by_bulk(stream_config, stream_ap stream = generate_stream(stream_name, stream_config, stream_api_v2) assert not isinstance(stream, BulkSalesforceStream) + def test_bulk_sync_pagination(stream_config, stream_api, requests_mock): stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) - job_id = f"fake_job" + job_id = "fake_job" requests_mock.register_uri("POST", stream.path(), json={"id": job_id}) requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)] result_uri = requests_mock.register_uri("GET", stream.path() + f"/{job_id}/results", - [{'text': "\n".join(resp_text), "headers":{"Sforce-Locator": "somelocator_1"}}, - {'text': "\n".join(resp_text), "headers":{"Sforce-Locator": "somelocator_2"}}, - {'text': "\n".join(resp_text), "headers":{"Sforce-Locator": "null"}} - ]) + [{'text': "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_1"}}, + {'text': "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_2"}}, + {'text': "\n".join(resp_text), "headers": {"Sforce-Locator": "null"}} + ] + ) requests_mock.register_uri("DELETE", stream.path() + f"/{job_id}") stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental))) From a4d939c23997e41d0e4daa9f1af84ada7fe22f4f Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 27 Jul 2023 22:36:20 +0200 Subject: [PATCH 17/46] Source Salesforce: Add test deps --- airbyte-integrations/connectors/source-salesforce/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/setup.py b/airbyte-integrations/connectors/source-salesforce/setup.py index e8f9de037b860..376a5671bd3e6 100644 --- a/airbyte-integrations/connectors/source-salesforce/setup.py +++ b/airbyte-integrations/connectors/source-salesforce/setup.py @@ -7,7 +7,7 @@ MAIN_REQUIREMENTS = ["airbyte-cdk~=0.46", "pandas"] -TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6", "requests_mock", "connector-acceptance-test", "pytest-timeout"] +TEST_REQUIREMENTS = ["freezegun", "pytest~=6.1", "pytest-mock~=3.6", "requests_mock", "connector-acceptance-test", "pytest-timeout"] setup( name="source_salesforce", From d6d0c1be0665910e42f696339c1b769a64e04f46 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 31 Jul 2023 21:45:08 +0200 Subject: [PATCH 18/46] Source SalesForce: Ref --- .../connectors/source-salesforce/unit_tests/api_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index bd9af16276e28..6e44ac1818246 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -103,7 +103,6 @@ def test_bulk_sync_pagination(stream_config, stream_api, requests_mock): assert result_uri.call_count == 3 assert result_uri.request_history[1].query == "locator=somelocator_1" assert result_uri.request_history[2].query == "locator=somelocator_2" - print(2) def _prepare_mock(m, stream): From 8fea378166f0a1382e9bd79f9d76f8430d14c936 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 31 Jul 2023 21:46:43 +0200 Subject: [PATCH 19/46] Source SalesForce: check stream_state < stream_slice --- .../source-salesforce/source_salesforce/streams.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 38193822995f8..a36ff90d9b216 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -602,6 +602,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any): class IncrementalRestSalesforceStream(RestSalesforceStream, ABC): state_checkpoint_interval = 500 STREAM_SLICE_STEP = 30 + _slice = None def __init__(self, replication_key: str, start_date: Optional[str], **kwargs): super().__init__(**kwargs) @@ -626,6 +627,7 @@ def stream_slices( while not end == now: start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP) end = min(now, initial_date.add(days=slice_number * self.STREAM_SLICE_STEP)) + self._slice = {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")} yield {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")} slice_number = slice_number + 1 @@ -674,10 +676,13 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object and returning an updated state object. """ - latest_benchmark = latest_record[self.cursor_field] + latest_record_value: pendulum.DateTime = pendulum.parse(latest_record[self.cursor_field]) + slice_max_value: pendulum.DateTime = pendulum.parse(self._slice.get('end_date')) + max_possible_value = min(latest_record_value, slice_max_value) if current_stream_state.get(self.cursor_field): - return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])} - return {self.cursor_field: latest_benchmark} + max_cursor_value = max(latest_record_value, pendulum.parse(current_stream_state[self.cursor_field])) + max_possible_value = min(max_cursor_value, slice_max_value) + return {self.cursor_field: max_possible_value.isoformat()} class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream): From e4aa7003d59cfe02cd6af569d062d8ff0e384fea Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 31 Jul 2023 21:59:40 +0200 Subject: [PATCH 20/46] Source SalesForce: fix stream_state < stream_slice --- .../source-salesforce/source_salesforce/streams.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index a36ff90d9b216..9c11d5692a73f 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -680,8 +680,9 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late slice_max_value: pendulum.DateTime = pendulum.parse(self._slice.get('end_date')) max_possible_value = min(latest_record_value, slice_max_value) if current_stream_state.get(self.cursor_field): - max_cursor_value = max(latest_record_value, pendulum.parse(current_stream_state[self.cursor_field])) - max_possible_value = min(max_cursor_value, slice_max_value) + if latest_record_value > slice_max_value: + return {self.cursor_field: current_stream_state[self.cursor_field]} + max_possible_value = max(latest_record_value, pendulum.parse(current_stream_state[self.cursor_field])) return {self.cursor_field: max_possible_value.isoformat()} From 32141cc9da48e90f2f843cfddc93747d27c477fa Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 31 Jul 2023 22:10:01 +0200 Subject: [PATCH 21/46] Source SalesForce: update comments --- .../connectors/source-salesforce/source_salesforce/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 9c11d5692a73f..d24cc501ef06a 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -674,7 +674,7 @@ def cursor_field(self) -> str: def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state - object and returning an updated state object. + object and returning an updated state object. Check if latest record is IN stream slice interval => ignore if not """ latest_record_value: pendulum.DateTime = pendulum.parse(latest_record[self.cursor_field]) slice_max_value: pendulum.DateTime = pendulum.parse(self._slice.get('end_date')) From cb34d2f92a0ac35af8b34adfbc5e56ec28a4fc20 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 31 Jul 2023 22:20:31 +0200 Subject: [PATCH 22/46] Empty-Commit From de21c1ae76419d019f8edfee423573db0a1b668c Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 31 Jul 2023 22:24:32 +0200 Subject: [PATCH 23/46] Empty-Commit From 4427a1859f2df68a5f376e12fb26cd214ec7b188 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 31 Jul 2023 22:28:32 +0200 Subject: [PATCH 24/46] Empty-Commit From fe12f3dd6f7b5a47e80ce6efcb50743efd95d9ed Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Wed, 9 Aug 2023 21:44:38 +0200 Subject: [PATCH 25/46] Source Salesforce: update docs + spec description --- .../connectors/source-salesforce/source_salesforce/spec.yaml | 4 ++-- docs/integrations/sources/salesforce.md | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml index 1e518beefc404..642e180c078ba 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml @@ -44,7 +44,7 @@ connectionSpecification: start_date: title: Start Date description: >- - Enter the date (or date-time) in the YYYY-MM-DD or YYYY-MM-DDTHH:mm:ssZ format. Airbyte will replicate the data added on and after this date. If this field is blank, Airbyte will replicate the data for last two years. + Enter the date (or date-time) in the YYYY-MM-DD or YYYY-MM-DDTHH:mm:ssZ format. Airbyte will replicate the data updated on and after this date. If this field is blank, Airbyte will replicate the data for last two years. type: string pattern: >- ^([0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?)$ @@ -88,7 +88,7 @@ connectionSpecification: title: Search value order: 2 title: Filter Salesforce Objects - description: Add filters to select only required stream based on `SObject` name. See the docs for more information. + description: Add filters to select only required stream based on `SObject` name. Use this field to filter which tables are displayed by this connector. This is useful if your Salesforce account has a large number of tables (>1000), in which case you may find it easier to navigate the UI and speed up the connector's performance if you restrict the tables displayed by this connector. advanced_auth: auth_flow_type: oauth2.0 predicate_key: diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 4b696312fc815..73a628b241b00 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -90,7 +90,6 @@ The Salesforce connector is restricted by Salesforce’s [Daily Rate Limits](htt :::tip Airbyte recommends to use at least [Enterpise edition](https://help.salesforce.com/s/articleView?id=sf.users_add_products_subscription_management.htm&type=5) for daily base syncs. -You may also turn on feature `Force to use BULK API` for faster syncs. ::: From 7cd490533ff5873aaa23a13809687a5735c7bbac Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 00:10:01 +0200 Subject: [PATCH 26/46] Source Salesforce: update docs --- docs/integrations/sources/salesforce.md | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 73a628b241b00..3e688f497092a 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -89,10 +89,27 @@ The Salesforce connector is restricted by Salesforce’s [Daily Rate Limits](htt :::tip -Airbyte recommends to use at least [Enterpise edition](https://help.salesforce.com/s/articleView?id=sf.users_add_products_subscription_management.htm&type=5) for daily base syncs. +Airbyte recommends to use at least [Enterprise edition](https://help.salesforce.com/s/articleView?id=sf.users_add_products_subscription_management.htm&type=5) for daily base syncs. This subscription level provides the minimum required API limits. ::: +### BULK API vs REST API + +The main difference between BULK API and REST API is that **REST** API uses the standard synchronous approach (request -> response), while **BULK** uses an asynchronous one (create job with query -> wait for completion -> download response). +Bulk API is recommended to use by SalesForce if data operation includes more than 2,000 records. + +Connector uses BULK API if it is possible, unless any of conditions met: + +- Stream has unsupported properties in schema: `base64` or `object`-like +- Stream is not supported by BULK API (list was obtained experimentally) + +:::danger BULK API + +If you set an option `Force Use Bulk API` to `true`, connector will ignore unsupported properties and sync stream using BULK API. + +::: + + ## Supported Objects The Salesforce connector supports reading both Standard Objects and Custom Objects from Salesforce. Each object is read as a separate stream. See a list of all Salesforce Standard Objects [here](https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_list.htm). @@ -103,7 +120,9 @@ Airbyte fetches and handles all the possible and available streams dynamically b - If the stream has the queryable property set to true. Airbyte can fetch only queryable streams via the API. If you don’t see your object available via Airbyte, check if it is API-accessible to the Salesforce user you authenticated with. -**Note:** [BULK API](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_intro.htm) cannot be used to receive data from the following streams due to Salesforce API limitations. The Salesforce connector syncs them using the REST API which will occasionally cost more of your API quota: +:::note BULK API Limitations + +[BULK API](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_intro.htm) cannot be used to receive data from the following streams due to Salesforce API limitations. The Salesforce connector syncs them using the REST API which will occasionally cost more of your API quota: - AcceptedEventRelation - Attachment @@ -126,6 +145,8 @@ Airbyte fetches and handles all the possible and available streams dynamically b - TaskStatus - UndecidedEventRelation +::: + ## Tutorials Now that you have set up the Salesforce source connector, check out the following Salesforce tutorials: From 62994ae10eb5c43e9a909cbda73667256c74271b Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 14:03:34 +0200 Subject: [PATCH 27/46] Source Salesforce: update docs --- docs/integrations/sources/salesforce.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 3e688f497092a..edb914cc29819 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -103,9 +103,9 @@ Connector uses BULK API if it is possible, unless any of conditions met: - Stream has unsupported properties in schema: `base64` or `object`-like - Stream is not supported by BULK API (list was obtained experimentally) -:::danger BULK API +:::danger Force Use Bulk API -If you set an option `Force Use Bulk API` to `true`, connector will ignore unsupported properties and sync stream using BULK API. +If you set an option `Force Use Bulk API` to `true`, connector will ignore unsupported properties and sync Stream using BULK API. ::: @@ -158,7 +158,7 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| -| 2.1.2 | 2023-07-06 | [28781](https://github.com/airbytehq/airbyte/pull/28781) | Fix pagination for BULK API jobs; Addoption to force use BULK API | +| 2.1.2 | 2023-07-06 | [28781](https://github.com/airbytehq/airbyte/pull/28781) | Fix pagination for BULK API jobs; Add option to force use BULK API | | 2.1.1 | 2023-07-06 | [28021](https://github.com/airbytehq/airbyte/pull/28021) | Several Vulnerabilities Fixes; switched to use alpine instead of slim, CVE-2022-40897, CVE-2023-29383, CVE-2023-31484, CVE-2016-2781 | | 2.1.0 | 2023-06-26 | [27726](https://github.com/airbytehq/airbyte/pull/27726) | License Update: Elv2 | | 2.0.14 | 2023-05-04 | [25794](https://github.com/airbytehq/airbyte/pull/25794) | Avoid pandas inferring wrong data types by forcing all data type as object | From 4883426bf29633697d2bf9e2ed000504deefb1f7 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 18:32:36 +0200 Subject: [PATCH 28/46] Source Salesforce: add unittest --- .../source_salesforce/streams.py | 2 +- .../source-salesforce/unit_tests/api_test.py | 34 +++++++++++++++++++ .../source-salesforce/unit_tests/utils.py | 14 ++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/connectors/source-salesforce/unit_tests/utils.py diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index d24cc501ef06a..c1d9e0600f0df 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -677,7 +677,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late object and returning an updated state object. Check if latest record is IN stream slice interval => ignore if not """ latest_record_value: pendulum.DateTime = pendulum.parse(latest_record[self.cursor_field]) - slice_max_value: pendulum.DateTime = pendulum.parse(self._slice.get('end_date')) + slice_max_value: pendulum.DateTime = pendulum.parse(self._slice.get("end_date")) max_possible_value = min(latest_record_value, slice_max_value) if current_stream_state.get(self.cursor_field): if latest_record_value > slice_max_value: diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 6e44ac1818246..0eee1c109c322 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -28,6 +28,8 @@ RestSalesforceStream, ) +from .utils import read_full_refresh + def test_bulk_sync_creation_failed(stream_config, stream_api): stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) @@ -732,3 +734,35 @@ def test_bulk_stream_slices(stream_config_date_format, stream_api): }) start_date = start_date.add(days=stream.STREAM_SLICE_STEP) assert expected_slices == stream_slices + + +@freezegun.freeze_time("2023-04-01") +def test_bulk_stream_request_params(stream_config_date_format, stream_api, requests_mock): + """Check that request params ignore records cursor and use start date from slice ONLY""" + stream_config_date_format.update({'start_date': '2023-01-01'}) + stream: BulkIncrementalSalesforceStream = generate_stream("FakeBulkStream", stream_config_date_format, stream_api) + + job_id_1 = "fake_job_1" + requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": {"state": "JobComplete"}}]) + requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_1}") + requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}/results", text="Field1,LastModifiedDate,ID\ntest,2023-03-15,1") + requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_1}") + + job_id_2 = "fake_job_2" + requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": {"state": "JobComplete"}}]) + requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_2}") + requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,2") + requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_2}") + + job_id_3 = "fake_job_3" + queries_history = requests_mock.register_uri("POST", stream.path(), [{"json": {"id": job_id_1}}, + {"json": {"id": job_id_2}}, + {"json": {"id": job_id_3}}]) + requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": {"state": "JobComplete"}}]) + requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_3}") + requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,3") + requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_3}") + list(read_full_refresh(stream)) + assert "LastModifiedDate >= 2023-01-01T00:00:00.000+00:00 AND LastModifiedDate < 2023-01-31T00:00:00.000+00:00" in queries_history.request_history[0].text + assert "LastModifiedDate >= 2023-01-31T00:00:00.000+00:00 AND LastModifiedDate < 2023-03-02T00:00:00.000+00:00" in queries_history.request_history[1].text + assert "LastModifiedDate >= 2023-03-02T00:00:00.000+00:00 AND LastModifiedDate < 2023-04-01T00:00:00.000+00:00" in queries_history.request_history[2].text diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/utils.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/utils.py new file mode 100644 index 0000000000000..4de4a92bef0bd --- /dev/null +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/utils.py @@ -0,0 +1,14 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.streams import Stream +from airbyte_protocol.models import SyncMode + + +def read_full_refresh(stream_instance: Stream): + slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh) + for _slice in slices: + records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh) + for record in records: + yield record From 28e7e156b2ccd3f96cea6087ed8601c387df411c Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 18:54:31 +0200 Subject: [PATCH 29/46] Source Salesforce: add typing --- .../connectors/source-salesforce/source_salesforce/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index af9361e312519..9def53730d791 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -61,7 +61,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> return True, None @classmethod - def _get_api_type(cls, stream_name: str, properties: dict, force_use_bulk_api: bool) -> str: + def _get_api_type(cls, stream_name: str, properties: Mapping[str, Any], force_use_bulk_api: bool) -> str: # Salesforce BULK API currently does not support loading fields with data type base64 and compound data properties_not_supported_by_bulk = { key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"] From eddc86d614fd34f85c8cbade0d65890caf3c2de9 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 19:01:18 +0200 Subject: [PATCH 30/46] Source Salesforce: fix import --- .../connectors/source-salesforce/unit_tests/api_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 0eee1c109c322..76d80245622ff 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -28,7 +28,7 @@ RestSalesforceStream, ) -from .utils import read_full_refresh +from utils import read_full_refresh def test_bulk_sync_creation_failed(stream_config, stream_api): From 8d44d4b56d4fe734ea153349ca497159c28bcd0a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 19:23:33 +0200 Subject: [PATCH 31/46] Source Salesforce: fix formatting --- .../connectors/source-salesforce/source_salesforce/api.py | 2 +- .../source-salesforce/source_salesforce/rate_limiting.py | 5 ++--- .../connectors/source-salesforce/source_salesforce/source.py | 2 +- .../connectors/source-salesforce/unit_tests/api_test.py | 1 - 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py index aea3bfb0f876f..68fa3c35dc413 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py @@ -296,7 +296,7 @@ def _make_request( resp = self.session.post(url, headers=headers, data=body) resp.raise_for_status() except HTTPError as err: - self.logger.warn(f"http error body: {err.response.text}") + self.logger.warning(f"http error body: {err.response.text}") raise return resp diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py index 286339fcbe1cf..344a6412e0245 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py @@ -2,11 +2,10 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import logging import sys import backoff -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException from requests import codes, exceptions # type: ignore[import] @@ -18,7 +17,7 @@ exceptions.HTTPError, ) -logger = AirbyteLogger() +logger = logging.getLogger("airbyte") def default_backoff_handler(max_tries: int, factor: int, **kwargs): diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 9def53730d791..744658872754d 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -12,7 +12,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator from airbyte_cdk.sources.utils.schema_helpers import InternalConfig from airbyte_cdk.utils.traced_exception import AirbyteTracedException from dateutil.relativedelta import relativedelta diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 76d80245622ff..86b9c31fa2c53 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -27,7 +27,6 @@ IncrementalRestSalesforceStream, RestSalesforceStream, ) - from utils import read_full_refresh From 15b8a556d58bdc7d5fca1b221881c08935f5d2a3 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 19:26:24 +0200 Subject: [PATCH 32/46] Source Salesforce: remove extra comment --- .../connectors/source-salesforce/source_salesforce/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index c1d9e0600f0df..837c2f5990e4a 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -558,7 +558,7 @@ def read_records( salesforce_bulk_api_locator = None while True: req = PreparedRequest() - req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator}) # 'maxRecords': 5 + req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator}) tmp_file, response_encoding, response_headers = self.download_data(url=req.url) for record in self.read_with_chunks(tmp_file, response_encoding): yield record From c77d3c9a46cde66166a51c0241c74bacce115a56 Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Thu, 10 Aug 2023 10:30:11 -0700 Subject: [PATCH 33/46] Apply suggestions from code review --- docs/integrations/sources/salesforce.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 665e133917377..c15ea475a31e5 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -89,11 +89,11 @@ The Salesforce connector is restricted by Salesforce’s [Daily Rate Limits](htt :::tip -Airbyte recommends to use at least [Enterprise edition](https://help.salesforce.com/s/articleView?id=sf.users_add_products_subscription_management.htm&type=5) for daily base syncs. This subscription level provides the minimum required API limits. +To use this connector, you'll need at least the Enterprise edition of Salesforce or the Professional Edition with API access purchased as an add-on. Reference the [Salesforce docs about API access](https://help.salesforce.com/s/articleView?id=000385436&type=1) for more information. ::: -### BULK API vs REST API +### A note on the BULK API vs REST API The main difference between BULK API and REST API is that **REST** API uses the standard synchronous approach (request -> response), while **BULK** uses an asynchronous one (create job with query -> wait for completion -> download response). Bulk API is recommended to use by SalesForce if data operation includes more than 2,000 records. From 6f6dc63364ac88f4f306120c4a017d583efbeeaf Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 20:39:25 +0200 Subject: [PATCH 34/46] Source Salesforce: fix unit tests --- .../connectors/source-salesforce/unit_tests/api_test.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 86b9c31fa2c53..4a43958819e54 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -318,6 +318,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state): While reading `stream_1` if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process. Next streams should not be executed. """ + stream_config.update({'start_date': '2021-10-01'}) stream_1: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) stream_2: BulkIncrementalSalesforceStream = generate_stream("Asset", stream_config, stream_api) streams = [stream_1, stream_2] @@ -341,7 +342,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state): m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) - resp = ["Field1,LastModifiedDate,Id"] + [f"test,2021-11-0{i},{i}" for i in range(1, 7)] # 6 records per page + resp = ["Field1,LastModifiedDate,Id"] + [f"test,2021-10-0{i},{i}" for i in range(1, 7)] # 6 records per page if page == 1: # Read the first page successfully @@ -364,7 +365,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state): assert len(records) == 6 # stream page size: 6 state_record = [item for item in result if item.type == Type.STATE][0] - assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-11-05" # state checkpoint interval is 5. + assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-10-05T00:00:00+00:00" # state checkpoint interval is 5. def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state): @@ -374,6 +375,7 @@ def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state): While reading `stream_1` if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process. Next streams should not be executed. """ + stream_config.update({'start_date': '2021-11-01'}) stream_1: IncrementalRestSalesforceStream = generate_stream("KnowledgeArticle", stream_config, stream_api) stream_2: IncrementalRestSalesforceStream = generate_stream("AcceptedEventRelation", stream_config, stream_api) @@ -432,7 +434,7 @@ def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state): assert len(records) == 5 state_record = [item for item in result if item.type == Type.STATE][0] - assert state_record.state.data["KnowledgeArticle"]["LastModifiedDate"] == "2021-11-17" + assert state_record.state.data["KnowledgeArticle"]["LastModifiedDate"] == "2021-11-17T00:00:00+00:00" def test_pagination_rest(stream_config, stream_api): From 363c8dc9b48dcea4ec2840b12de06203d84d6782 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 20:41:46 +0200 Subject: [PATCH 35/46] Source Salesforce: refactor --- .../source_salesforce/streams.py | 62 +++++++++---------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 837c2f5990e4a..0d8be9092092d 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -534,40 +534,38 @@ def read_records( stream_state = stream_state or {} next_page_token = None - while True: - params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - path = self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - job_full_url, job_status = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") - if not job_full_url: - if job_status == "Failed": - # As rule as BULK logic returns unhandled error. For instance: - # error message: 'Unexpected exception encountered in query processing. - # Please contact support with the following id: 326566388-63578 (-436445966)'" - # Thus we can try to switch to GET sync request because its response returns obvious error message - standard_instance = self.get_standard_instance() - self.logger.warning("switch to STANDARD(non-BULK) sync. Because the SalesForce BULK job has returned a failed status") - stream_is_available, error = standard_instance.check_availability(self.logger, None) - if not stream_is_available: - self.logger.warning(f"Skipped syncing stream '{standard_instance.name}' because it was unavailable. Error: {error}") - return - yield from standard_instance.read_records( - sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state - ) + params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + path = self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + job_full_url, job_status = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") + if not job_full_url: + if job_status == "Failed": + # As rule as BULK logic returns unhandled error. For instance: + # error message: 'Unexpected exception encountered in query processing. + # Please contact support with the following id: 326566388-63578 (-436445966)'" + # Thus we can try to switch to GET sync request because its response returns obvious error message + standard_instance = self.get_standard_instance() + self.logger.warning("switch to STANDARD(non-BULK) sync. Because the SalesForce BULK job has returned a failed status") + stream_is_available, error = standard_instance.check_availability(self.logger, None) + if not stream_is_available: + self.logger.warning(f"Skipped syncing stream '{standard_instance.name}' because it was unavailable. Error: {error}") return - raise SalesforceException(f"Job for {self.name} stream using BULK API was failed.") - salesforce_bulk_api_locator = None - while True: - req = PreparedRequest() - req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator}) - tmp_file, response_encoding, response_headers = self.download_data(url=req.url) - for record in self.read_with_chunks(tmp_file, response_encoding): - yield record + yield from standard_instance.read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ) + return + raise SalesforceException(f"Job for {self.name} stream using BULK API was failed.") + salesforce_bulk_api_locator = None + while True: + req = PreparedRequest() + req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator}) + tmp_file, response_encoding, response_headers = self.download_data(url=req.url) + for record in self.read_with_chunks(tmp_file, response_encoding): + yield record - if response_headers.get("Sforce-Locator", "null") == "null": - break - salesforce_bulk_api_locator = response_headers.get("Sforce-Locator") - self.delete_job(url=job_full_url) - break + if response_headers.get("Sforce-Locator", "null") == "null": + break + salesforce_bulk_api_locator = response_headers.get("Sforce-Locator") + self.delete_job(url=job_full_url) def get_standard_instance(self) -> SalesforceStream: """Returns a instance of standard logic(non-BULK) with same settings""" From b162418df5dd51304a79f403784cc1b4aaf635a3 Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Thu, 10 Aug 2023 12:34:33 -0700 Subject: [PATCH 36/46] Update salesforce.md --- docs/integrations/sources/salesforce.md | 94 +++++++++++-------------- 1 file changed, 43 insertions(+), 51 deletions(-) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index c15ea475a31e5..d1c4ed6643003 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -10,6 +10,13 @@ This page contains the setup guide and reference information for the Salesforce - (For Airbyte Open Source) Salesforce [OAuth](https://help.salesforce.com/s/articleView?id=sf.remoteaccess_oauth_tokens_scopes.htm&type=5) credentials + +:::tip + +To use this connector, you'll need at least the Enterprise edition of Salesforce or the Professional Edition with API access purchased as an add-on. Reference the [Salesforce docs about API access](https://help.salesforce.com/s/articleView?id=000385436&type=1) for more information. + +::: + ## Setup guide ### Step 1: (Optional, Recommended) Create a read-only Salesforce user @@ -81,35 +88,12 @@ The Salesforce source connector supports the following sync modes: ### Incremental Deletes sync -The Salesforce connector retrieves deleted records from Salesforce. For the streams which support it, a deleted record will be marked with the `isDeleted=true` value in the respective field. +The Salesforce connector retrieves deleted records from Salesforce. For the streams which support it, a deleted record will be marked with `isDeleted=true`. ## Performance considerations The Salesforce connector is restricted by Salesforce’s [Daily Rate Limits](https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_api.htm). The connector syncs data until it hits the daily rate limit, then ends the sync early with success status, and starts the next sync from where it left off. Note that picking up from where it ends will work only for incremental sync, which is why we recommend using the [Incremental Sync - Append + Deduped](https://docs.airbyte.com/understanding-airbyte/connections/incremental-append-deduped) sync mode. -:::tip - -To use this connector, you'll need at least the Enterprise edition of Salesforce or the Professional Edition with API access purchased as an add-on. Reference the [Salesforce docs about API access](https://help.salesforce.com/s/articleView?id=000385436&type=1) for more information. - -::: - -### A note on the BULK API vs REST API - -The main difference between BULK API and REST API is that **REST** API uses the standard synchronous approach (request -> response), while **BULK** uses an asynchronous one (create job with query -> wait for completion -> download response). -Bulk API is recommended to use by SalesForce if data operation includes more than 2,000 records. - -Connector uses BULK API if it is possible, unless any of conditions met: - -- Stream has unsupported properties in schema: `base64` or `object`-like -- Stream is not supported by BULK API (list was obtained experimentally) - -:::danger Force Use Bulk API - -If you set an option `Force Use Bulk API` to `true`, connector will ignore unsupported properties and sync Stream using BULK API. - -::: - - ## Supported Objects The Salesforce connector supports reading both Standard Objects and Custom Objects from Salesforce. Each object is read as a separate stream. See a list of all Salesforce Standard Objects [here](https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_list.htm). @@ -117,36 +101,44 @@ The Salesforce connector supports reading both Standard Objects and Custom Objec Airbyte fetches and handles all the possible and available streams dynamically based on: - If the authenticated Salesforce user has the Role and Permissions to read and fetch objects - -- If the stream has the queryable property set to true. Airbyte can fetch only queryable streams via the API. If you don’t see your object available via Airbyte, check if it is API-accessible to the Salesforce user you authenticated with. - -:::note BULK API Limitations - -[BULK API](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_intro.htm) cannot be used to receive data from the following streams due to Salesforce API limitations. The Salesforce connector syncs them using the REST API which will occasionally cost more of your API quota: - -- AcceptedEventRelation -- Attachment -- CaseStatus -- ContractStatus -- DeclinedEventRelation -- FieldSecurityClassification -- KnowledgeArticle -- KnowledgeArticleVersion -- KnowledgeArticleVersionHistory -- KnowledgeArticleViewStat -- KnowledgeArticleVoteStat -- OrderStatus -- PartnerRole -- RecentlyViewed -- ServiceAppointmentStatus -- ShiftStatus -- SolutionStatus -- TaskPriority -- TaskStatus -- UndecidedEventRelation +- If the stream has the queryable property set to true. Airbyte can fetch only queryable streams via the API. If you don’t see your object available via Airbyte, and it is queryable, check if it is API-accessible to the Salesforce user you authenticated with. + +### A note on the BULK API vs REST API and their limitations + +Salesforce allows extracting data using either the [BULK API](https://developer.salesforce.com/docs/atlas.en-us.236.0.api_asynch.meta/api_asynch/asynch_api_intro.htm) or [REST API](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/intro_what_is_rest_api.htm). To achieve fast performance, Salesforce recommends using the BULK API for extracting larger amounts of data (more than 2,000 records). For this reason, the Salesforce connector uses the BULK API by default to extract any Salesforce objects, unless any of the following conditions are met: + +- The Salesforce object has columns which are unsupported by the BULK API, like properties in schema: `base64` or `object`-like +- The Salesforce object is not supported by BULK API. In this case we sync the objects via the REST API which will occasionalyl cost more of your API quota. This list of objects was obtained experimentally, and includes the following objects: + - AcceptedEventRelation + - Attachment + - CaseStatus + - ContractStatus + - DeclinedEventRelation + - FieldSecurityClassification + - KnowledgeArticle + - KnowledgeArticleVersion + - KnowledgeArticleVersionHistory + - KnowledgeArticleViewStat + - KnowledgeArticleVoteStat + - OrderStatus + - PartnerRole + - RecentlyViewed + - ServiceAppointmentStatus + - ShiftStatus + - SolutionStatus + - TaskPriority + - TaskStatus + - UndecidedEventRelation + +More information on the differences between various Salesforce APIs can be found [here](https://help.salesforce.com/s/articleView?id=sf.integrate_what_is_api.htm&type=5). + +:::info Force Using Bulk API + +If you set the `Force Use Bulk API` option to `true`, the connector will ignore unsupported properties and sync Stream using BULK API. ::: + ## Tutorials Now that you have set up the Salesforce source connector, check out the following Salesforce tutorials: From e76dfe0a6a880bf0a2ef913ef31f7bb76ef24ae7 Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Thu, 10 Aug 2023 12:44:09 -0700 Subject: [PATCH 37/46] Update salesforce.md --- docs/integrations/sources/salesforce.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index d1c4ed6643003..b16fac2c41cad 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -98,10 +98,10 @@ The Salesforce connector is restricted by Salesforce’s [Daily Rate Limits](htt The Salesforce connector supports reading both Standard Objects and Custom Objects from Salesforce. Each object is read as a separate stream. See a list of all Salesforce Standard Objects [here](https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_list.htm). -Airbyte fetches and handles all the possible and available streams dynamically based on: +Airbyte allows exporting all available Salesforce objects dynamically based on: - If the authenticated Salesforce user has the Role and Permissions to read and fetch objects -- If the stream has the queryable property set to true. Airbyte can fetch only queryable streams via the API. If you don’t see your object available via Airbyte, and it is queryable, check if it is API-accessible to the Salesforce user you authenticated with. +- If the salesforce object has the queryable property set to true. Airbyte can only fetch objects which are queryable. If you don’t see an object available via Airbyte, and it is queryable, check if it is API-accessible to the Salesforce user you authenticated with. ### A note on the BULK API vs REST API and their limitations From b206ee78f07cfe8ed89a7e63f4c9168ff1da9d95 Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Thu, 10 Aug 2023 12:59:34 -0700 Subject: [PATCH 38/46] Update docs/integrations/sources/salesforce.md --- docs/integrations/sources/salesforce.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index b16fac2c41cad..ba1cdff9b0d2f 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -107,7 +107,7 @@ Airbyte allows exporting all available Salesforce objects dynamically based on: Salesforce allows extracting data using either the [BULK API](https://developer.salesforce.com/docs/atlas.en-us.236.0.api_asynch.meta/api_asynch/asynch_api_intro.htm) or [REST API](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/intro_what_is_rest_api.htm). To achieve fast performance, Salesforce recommends using the BULK API for extracting larger amounts of data (more than 2,000 records). For this reason, the Salesforce connector uses the BULK API by default to extract any Salesforce objects, unless any of the following conditions are met: -- The Salesforce object has columns which are unsupported by the BULK API, like properties in schema: `base64` or `object`-like +- The Salesforce object has columns which are unsupported by the BULK API, like columns with a `base64` or `complexvalue` type - The Salesforce object is not supported by BULK API. In this case we sync the objects via the REST API which will occasionalyl cost more of your API quota. This list of objects was obtained experimentally, and includes the following objects: - AcceptedEventRelation - Attachment From 0fa9a2e4231b8c1f60c875c380b22396ff3dc0cb Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Thu, 10 Aug 2023 23:15:10 +0200 Subject: [PATCH 39/46] Source Salesforce: refactor + update unit test --- .../source_salesforce/streams.py | 2 +- .../source-salesforce/unit_tests/api_test.py | 41 +++++++++++++------ .../source-salesforce/unit_tests/utils.py | 14 ------- 3 files changed, 29 insertions(+), 28 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-salesforce/unit_tests/utils.py diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 0d8be9092092d..71002b78c30ab 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -679,7 +679,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late max_possible_value = min(latest_record_value, slice_max_value) if current_stream_state.get(self.cursor_field): if latest_record_value > slice_max_value: - return {self.cursor_field: current_stream_state[self.cursor_field]} + return {self.cursor_field: max_possible_value.isoformat()} max_possible_value = max(latest_record_value, pendulum.parse(current_stream_state[self.cursor_field])) return {self.cursor_field: max_possible_value.isoformat()} diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 4a43958819e54..f5f10bc7ddf09 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -27,7 +27,6 @@ IncrementalRestSalesforceStream, RestSalesforceStream, ) -from utils import read_full_refresh def test_bulk_sync_creation_failed(stream_config, stream_api): @@ -91,9 +90,9 @@ def test_bulk_sync_pagination(stream_config, stream_api, requests_mock): requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)] result_uri = requests_mock.register_uri("GET", stream.path() + f"/{job_id}/results", - [{'text': "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_1"}}, - {'text': "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_2"}}, - {'text': "\n".join(resp_text), "headers": {"Sforce-Locator": "null"}} + [{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_1"}}, + {"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_2"}}, + {"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "null"}} ] ) requests_mock.register_uri("DELETE", stream.path() + f"/{job_id}") @@ -738,21 +737,25 @@ def test_bulk_stream_slices(stream_config_date_format, stream_api): @freezegun.freeze_time("2023-04-01") -def test_bulk_stream_request_params(stream_config_date_format, stream_api, requests_mock): +def test_bulk_stream_request_params_states(stream_config_date_format, stream_api, bulk_catalog, requests_mock): """Check that request params ignore records cursor and use start date from slice ONLY""" - stream_config_date_format.update({'start_date': '2023-01-01'}) - stream: BulkIncrementalSalesforceStream = generate_stream("FakeBulkStream", stream_config_date_format, stream_api) + stream_config_date_format.update({"start_date": "2023-01-01"}) + stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config_date_format, stream_api) + + source = SourceSalesforce() + source.streams = Mock() + source.streams.return_value = [stream] job_id_1 = "fake_job_1" requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": {"state": "JobComplete"}}]) requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_1}") - requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}/results", text="Field1,LastModifiedDate,ID\ntest,2023-03-15,1") + requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}/results", text="Field1,LastModifiedDate,ID\ntest,2023-01-15,1") requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_1}") job_id_2 = "fake_job_2" requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": {"state": "JobComplete"}}]) requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_2}") - requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,2") + requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,2\ntest,2023-02-20,22") requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_2}") job_id_3 = "fake_job_3" @@ -763,7 +766,19 @@ def test_bulk_stream_request_params(stream_config_date_format, stream_api, reque requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_3}") requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,3") requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_3}") - list(read_full_refresh(stream)) - assert "LastModifiedDate >= 2023-01-01T00:00:00.000+00:00 AND LastModifiedDate < 2023-01-31T00:00:00.000+00:00" in queries_history.request_history[0].text - assert "LastModifiedDate >= 2023-01-31T00:00:00.000+00:00 AND LastModifiedDate < 2023-03-02T00:00:00.000+00:00" in queries_history.request_history[1].text - assert "LastModifiedDate >= 2023-03-02T00:00:00.000+00:00 AND LastModifiedDate < 2023-04-01T00:00:00.000+00:00" in queries_history.request_history[2].text + + logger = logging.getLogger("airbyte") + state = {"Account": {"LastModifiedDate": "2023-01-01T10:10:10.000Z"}} + bulk_catalog.streams.pop(1) + result = [i for i in source.read(logger=logger, config=stream_config_date_format, catalog=bulk_catalog, state=state)] + + actual_state_values = [item.state.data.get("Account").get(stream.cursor_field) for item in result if item.type == Type.STATE] + # assert request params + assert "LastModifiedDate >= 2023-01-01T10:10:10.000+00:00 AND LastModifiedDate < 2023-01-31T10:10:10.000+00:00" in queries_history.request_history[0].text + assert "LastModifiedDate >= 2023-01-31T10:10:10.000+00:00 AND LastModifiedDate < 2023-03-02T10:10:10.000+00:00" in queries_history.request_history[1].text + assert "LastModifiedDate >= 2023-03-02T10:10:10.000+00:00 AND LastModifiedDate < 2023-04-01T00:00:00.000+00:00" in queries_history.request_history[2].text + + # assert states + # if connector meets record with cursor `2023-04-01` out of current slice range 2023-01-31 <> 2023-03-02, we ignore all other values and set state to slice end_date + expected_state_values = ["2023-01-15T00:00:00+00:00", "2023-03-02T10:10:10+00:00", "2023-04-01T00:00:00+00:00"] + assert actual_state_values == expected_state_values diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/utils.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/utils.py deleted file mode 100644 index 4de4a92bef0bd..0000000000000 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/utils.py +++ /dev/null @@ -1,14 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from airbyte_cdk.sources.streams import Stream -from airbyte_protocol.models import SyncMode - - -def read_full_refresh(stream_instance: Stream): - slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh) - for _slice in slices: - records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh) - for record in records: - yield record From fa150b263dff4def4663d073c71ce1410e2e8b88 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 11 Aug 2023 09:05:43 +0200 Subject: [PATCH 40/46] retrigger ci From 143cb6750152d53bd63f0c01c52a33561a30c707 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 11 Aug 2023 11:43:41 +0200 Subject: [PATCH 41/46] retrigger ci From 039a57d41764d903aa37b22f3c507ff93c2874f2 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 11 Aug 2023 20:19:40 +0200 Subject: [PATCH 42/46] retrigger ci --- .../configured_catalog_custom.json | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 airbyte-integrations/connectors/source-linkedin-ads/integration_tests/configured_catalog_custom.json diff --git a/airbyte-integrations/connectors/source-linkedin-ads/integration_tests/configured_catalog_custom.json b/airbyte-integrations/connectors/source-linkedin-ads/integration_tests/configured_catalog_custom.json new file mode 100644 index 0000000000000..f82ff0d5d1588 --- /dev/null +++ b/airbyte-integrations/connectors/source-linkedin-ads/integration_tests/configured_catalog_custom.json @@ -0,0 +1,16 @@ +{ + "streams": [ + { + "stream": { + "name": "custom_creative_ad_by_month", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["end_date"] + }, + "sync_mode": "incremental", + "cursor_field": ["end_date"], + "destination_sync_mode": "append" + } + ] +} From c72a74c6c1be0827b5df100fb8259ff51da46e21 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 11 Aug 2023 20:26:57 +0200 Subject: [PATCH 43/46] Revert "retrigger ci" This reverts commit 039a57d41764d903aa37b22f3c507ff93c2874f2. --- .../configured_catalog_custom.json | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-linkedin-ads/integration_tests/configured_catalog_custom.json diff --git a/airbyte-integrations/connectors/source-linkedin-ads/integration_tests/configured_catalog_custom.json b/airbyte-integrations/connectors/source-linkedin-ads/integration_tests/configured_catalog_custom.json deleted file mode 100644 index f82ff0d5d1588..0000000000000 --- a/airbyte-integrations/connectors/source-linkedin-ads/integration_tests/configured_catalog_custom.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "streams": [ - { - "stream": { - "name": "custom_creative_ad_by_month", - "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["end_date"] - }, - "sync_mode": "incremental", - "cursor_field": ["end_date"], - "destination_sync_mode": "append" - } - ] -} From 4ee7bc0e34b46d20177ea2caa79fd9402d64c30d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sun, 13 Aug 2023 16:36:17 +0200 Subject: [PATCH 44/46] retrigger ci From dc45bcba83df4699b485b747ec2408640ec0d850 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sun, 13 Aug 2023 19:25:24 +0200 Subject: [PATCH 45/46] Source Zendesk Support: Fix integration test --- .../source-salesforce/integration_tests/integration_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py index c0daa413049c3..80ca2de023b53 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py @@ -71,7 +71,7 @@ def get_stream_state(): def test_update_for_deleted_record(stream): - headers = stream.authenticator.get_auth_header() + headers = stream.authenticator.token() stream_state = get_stream_state() time.sleep(1) response = create_note(stream, headers) @@ -134,7 +134,7 @@ def test_update_for_deleted_record(stream): def test_deleted_record(stream): - headers = stream.authenticator.get_auth_header() + headers = stream.authenticator.token() response = create_note(stream, headers) assert response.status_code == 201, "Note was note created" From a813d176a9df109f501637e9f1e5b14d4fa78e99 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Sun, 13 Aug 2023 22:39:15 +0200 Subject: [PATCH 46/46] Source SalesForce: fix integration tests --- .../source-salesforce/integration_tests/integration_test.py | 4 ++-- airbyte-integrations/connectors/source-salesforce/setup.py | 2 +- .../connectors/source-salesforce/source_salesforce/source.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py index 80ca2de023b53..c0daa413049c3 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/integration_test.py @@ -71,7 +71,7 @@ def get_stream_state(): def test_update_for_deleted_record(stream): - headers = stream.authenticator.token() + headers = stream.authenticator.get_auth_header() stream_state = get_stream_state() time.sleep(1) response = create_note(stream, headers) @@ -134,7 +134,7 @@ def test_update_for_deleted_record(stream): def test_deleted_record(stream): - headers = stream.authenticator.token() + headers = stream.authenticator.get_auth_header() response = create_note(stream, headers) assert response.status_code == 201, "Note was note created" diff --git a/airbyte-integrations/connectors/source-salesforce/setup.py b/airbyte-integrations/connectors/source-salesforce/setup.py index b909d444a3339..44c137056bbd4 100644 --- a/airbyte-integrations/connectors/source-salesforce/setup.py +++ b/airbyte-integrations/connectors/source-salesforce/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.46", "pandas"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.50", "pandas"] TEST_REQUIREMENTS = ["freezegun", "pytest~=6.1", "pytest-mock~=3.6", "requests-mock~=1.9.3", "pytest-timeout"] diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 744658872754d..9def53730d791 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -12,7 +12,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator from airbyte_cdk.sources.utils.schema_helpers import InternalConfig from airbyte_cdk.utils.traced_exception import AirbyteTracedException from dateutil.relativedelta import relativedelta