diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 84f30cbc8d4ef..4ee90ce8a6a15 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -1,17 +1,7 @@ -# Using alpine to remove several vulnerabilities frm slim image -# https://security-tracker.debian.org/tracker/CVE-2023-29383 -# https://security-tracker.debian.org/tracker/CVE-2023-31484 -# https://security-tracker.debian.org/tracker/CVE-2016-2781 -FROM python:3.9-alpine3.18 - - -RUN apk add --update --no-cache \ - build-base \ - openssl-dev \ - libffi-dev \ - zlib-dev \ - bzip2-dev +FROM python:3.9-slim +# Bash is installed for more convenient debugging. +RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" @@ -19,20 +9,9 @@ WORKDIR /airbyte/integration_code COPY source_salesforce ./source_salesforce COPY setup.py ./ COPY main.py ./ - -# Fixing https://nvd.nist.gov/vuln/detail/CVE-2022-40897 -# calling this twice as one upgrades the system pip /usr/local/bin/pip the -# seconf time upgrades the under for the venv /opt/.venv/bin/pip -RUN pip install --upgrade pip setuptools wheel && \ - pip install . -RUN pip install --upgrade pip setuptools - -# add default timezone settings -ENV TZ UTC -RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone - +RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=2.1.1 +LABEL io.airbyte.version=2.1.2 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/metadata.yaml b/airbyte-integrations/connectors/source-salesforce/metadata.yaml index 7e20661a2937b..c4fd1f7b78817 100644 --- a/airbyte-integrations/connectors/source-salesforce/metadata.yaml +++ b/airbyte-integrations/connectors/source-salesforce/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: api connectorType: source definitionId: b117307c-14b6-41aa-9422-947e34922962 - dockerImageTag: 2.1.1 + dockerImageTag: 2.1.2 dockerRepository: airbyte/source-salesforce githubIssueLabel: source-salesforce icon: salesforce.svg @@ -13,7 +13,6 @@ data: name: Salesforce registries: cloud: - dockerImageTag: 2.0.9 enabled: true oss: enabled: true diff --git a/airbyte-integrations/connectors/source-salesforce/setup.py b/airbyte-integrations/connectors/source-salesforce/setup.py index b485a84ba5ad9..44c137056bbd4 100644 --- a/airbyte-integrations/connectors/source-salesforce/setup.py +++ b/airbyte-integrations/connectors/source-salesforce/setup.py @@ -5,9 +5,9 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "vcrpy==4.1.1", "pandas"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.50", "pandas"] -TEST_REQUIREMENTS = ["requests-mock~=1.9.3", "pytest~=6.1", "pytest-mock~=3.6", "requests_mock", "pytest-timeout"] +TEST_REQUIREMENTS = ["freezegun", "pytest~=6.1", "pytest-mock~=3.6", "requests-mock~=1.9.3", "pytest-timeout"] setup( name="source_salesforce", diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py index aea3bfb0f876f..68fa3c35dc413 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py @@ -296,7 +296,7 @@ def _make_request( resp = self.session.post(url, headers=headers, data=body) resp.raise_for_status() except HTTPError as err: - self.logger.warn(f"http error body: {err.response.text}") + self.logger.warning(f"http error body: {err.response.text}") raise return resp diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py index 286339fcbe1cf..344a6412e0245 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/rate_limiting.py @@ -2,11 +2,10 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import logging import sys import backoff -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException from requests import codes, exceptions # type: ignore[import] @@ -18,7 +17,7 @@ exceptions.HTTPError, ) -logger = AirbyteLogger() +logger = logging.getLogger("airbyte") def default_backoff_handler(max_tries: int, factor: int, **kwargs): diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 77243ea2b57ab..9def53730d791 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -21,6 +21,8 @@ from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalRestSalesforceStream, RestSalesforceStream +logger = logging.getLogger("airbyte") + class AirbyteStopSync(AirbyteTracedException): pass @@ -59,13 +61,21 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> return True, None @classmethod - def _get_api_type(cls, stream_name, properties): + def _get_api_type(cls, stream_name: str, properties: Mapping[str, Any], force_use_bulk_api: bool) -> str: # Salesforce BULK API currently does not support loading fields with data type base64 and compound data properties_not_supported_by_bulk = { key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"] } - rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk - if rest_required: + rest_only = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS + if rest_only: + logger.warning(f"BULK API is not supported for stream: {stream_name}") + return "rest" + if force_use_bulk_api and properties_not_supported_by_bulk: + logger.warning( + f"Following properties will be excluded from stream: {stream_name} due to BULK API limitations: {list(properties_not_supported_by_bulk)}" + ) + return "bulk" + if properties_not_supported_by_bulk: return "rest" return "bulk" @@ -77,7 +87,6 @@ def generate_streams( sf_object: Salesforce, ) -> List[Stream]: """ "Generates a list of stream by their names. It can be used for different tests too""" - logger = logging.getLogger() authenticator = TokenAuthenticator(sf_object.access_token) stream_properties = sf_object.generate_schemas(stream_objects) streams = [] @@ -85,7 +94,7 @@ def generate_streams( streams_kwargs = {"sobject_options": sobject_options} selected_properties = stream_properties.get(stream_name, {}).get("properties", {}) - api_type = cls._get_api_type(stream_name, selected_properties) + api_type = cls._get_api_type(stream_name, selected_properties, config.get("force_use_bulk_api", False)) if api_type == "rest": full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream elif api_type == "bulk": diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml index af3a7655217ce..642e180c078ba 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml @@ -44,18 +44,25 @@ connectionSpecification: start_date: title: Start Date description: >- - Enter the date in the YYYY-MM-DD format. Airbyte will replicate the data added on and after this date. If this field is blank, Airbyte will replicate the data for last two years. + Enter the date (or date-time) in the YYYY-MM-DD or YYYY-MM-DDTHH:mm:ssZ format. Airbyte will replicate the data updated on and after this date. If this field is blank, Airbyte will replicate the data for last two years. type: string pattern: >- ^([0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?)$ + pattern_descriptor: "YYYY-MM-DD or YYYY-MM-DDTHH:mm:ssZ" examples: - "2021-07-25" - "2021-07-25T00:00:00Z" format: date-time order: 5 + force_use_bulk_api: + title: Force to use BULK API + type: boolean + description: Toggle to use Bulk API (this might cause empty fields for some streams) + default: false + order: 6 streams_criteria: type: array - order: 6 + order: 7 items: type: object required: @@ -81,8 +88,7 @@ connectionSpecification: title: Search value order: 2 title: Filter Salesforce Objects - description: >- - Filter streams relevant to you + description: Add filters to select only required stream based on `SObject` name. Use this field to filter which tables are displayed by this connector. This is useful if your Salesforce account has a large number of tables (>1000), in which case you may find it easier to navigate the UI and speed up the connector's performance if you restrict the tables displayed by this connector. advanced_auth: auth_flow_type: oauth2.0 predicate_key: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 10c19e9537ed3..71002b78c30ab 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -15,14 +15,16 @@ import pandas as pd import pendulum import requests # type: ignore[import] -from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode +from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType, SyncMode from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.core import Stream, StreamData from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from airbyte_cdk.utils import AirbyteTracedException from numpy import nan from pendulum import DateTime # type: ignore[attr-defined] from requests import codes, exceptions +from requests.models import PreparedRequest from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce from .availability_strategy import SalesforceAvailabilityStrategy @@ -280,7 +282,6 @@ def _fetch_next_page_for_chunk( class BulkSalesforceStream(SalesforceStream): - page_size = 15000 DEFAULT_WAIT_TIMEOUT_SECONDS = 86400 # 24-hour bulk job running time MAX_CHECK_INTERVAL_SECONDS = 2.0 MAX_RETRY_NUMBER = 3 @@ -291,8 +292,8 @@ def path(self, next_page_token: Mapping[str, Any] = None, **kwargs: Any) -> str: transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization) @default_backoff_handler(max_tries=5, factor=15) - def _send_http_request(self, method: str, url: str, json: dict = None, stream: bool = False): - headers = self.authenticator.get_auth_header() + def _send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False): + headers = self.authenticator.get_auth_header() if not headers else headers | self.authenticator.get_auth_header() response = self._session.request(method, url=url, headers=headers, json=json, stream=stream) if response.status_code not in [200, 204]: self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}") @@ -347,11 +348,16 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: f"The stream '{self.name}' is not queryable, " f"sobject options: {self.sobject_options}, error message: '{error_message}'" ) + elif ( + error.response.status_code == codes.BAD_REQUEST + and error_code == "API_ERROR" + and error_message.startswith("Implementation restriction") + ): + message = f"Unable to sync '{self.name}'. To prevent future syncs from failing, ensure the authenticated user has \"View all Data\" permissions." + raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error) elif error.response.status_code == codes.BAD_REQUEST and error_code == "LIMIT_EXCEEDED": - self.logger.error( - f"Cannot receive data for stream '{self.name}' ," - f"sobject options: {self.sobject_options}, error message: '{error_message}'" - ) + message = "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed." + self.logger.error(message) else: raise error else: @@ -368,7 +374,20 @@ def wait_for_job(self, url: str) -> str: # this value was received empirically time.sleep(0.5) while pendulum.now() < expiration_time: - job_info = self._send_http_request("GET", url=url).json() + try: + job_info = self._send_http_request("GET", url=url).json() + except exceptions.HTTPError as error: + error_data = error.response.json()[0] + error_code = error_data.get("errorCode") + error_message = error_data.get("message", "") + if ( + "We can't complete the action because enabled transaction security policies took too long to complete." in error_message + and error_code == "TXN_SECURITY_METERING_ERROR" + ): + message = 'A transient authentication error occurred. To prevent future syncs from failing, assign the "Exempt from Transaction Security" user permission to the authenticated user.' + raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error) + else: + raise error job_status = job_info["state"] if job_status in ["JobComplete", "Aborted", "Failed"]: if job_status != "JobComplete": @@ -422,7 +441,7 @@ def filter_null_bytes(self, b: bytes): self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res)) return res - def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]: + def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dict]: """ Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitations. @ url: string - the url of the `executed_job` @@ -431,13 +450,16 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]: """ # set filepath for binary data from response tmp_file = os.path.realpath(os.path.basename(url)) - with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response, open(tmp_file, "wb") as data_file: + with closing(self._send_http_request("GET", url, headers={"Accept-Encoding": "gzip"}, stream=True)) as response, open( + tmp_file, "wb" + ) as data_file: response_encoding = response.encoding or self.encoding + response_headers = response.headers for chunk in response.iter_content(chunk_size=chunk_size): data_file.write(self.filter_null_bytes(chunk)) # check the file exists if os.path.isfile(tmp_file): - return tmp_file, response_encoding + return tmp_file, response_encoding, response_headers else: raise TmpFileIOError(f"The IO/Error occured while verifying binary data. Stream: {self.name}, file {tmp_file} doesn't exist.") @@ -477,10 +499,17 @@ def availability_strategy(self) -> Optional["AvailabilityStrategy"]: return None def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: - if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - return {"next_token": f"WHERE {self.primary_key} >= '{last_record[self.primary_key]}' "} # type: ignore[index] return None + def get_query_select_fields(self) -> str: + return ", ".join( + { + key: value + for key, value in self.get_json_schema().get("properties", {}).items() + if value.get("format") != "base64" and "object" not in value["type"] + } + ) + def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: @@ -488,13 +517,11 @@ def request_params( Salesforce SOQL Query: https://developer.salesforce.com/docs/atlas.en-us.232.0.api_rest.meta/api_rest/dome_queryall.htm """ - selected_properties = self.get_json_schema().get("properties", {}) - query = f"SELECT {','.join(selected_properties.keys())} FROM {self.name} " + select_fields = self.get_query_select_fields() + query = f"SELECT {select_fields} FROM {self.name}" if next_page_token: query += next_page_token["next_token"] - if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: - query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}" return {"q": query} def read_records( @@ -507,45 +534,38 @@ def read_records( stream_state = stream_state or {} next_page_token = None - while True: - params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - path = self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - job_full_url, job_status = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") - if not job_full_url: - if job_status == "Failed": - # As rule as BULK logic returns unhandled error. For instance: - # error message: 'Unexpected exception encountered in query processing. - # Please contact support with the following id: 326566388-63578 (-436445966)'" - # Thus we can try to switch to GET sync request because its response returns obvious error message - standard_instance = self.get_standard_instance() - self.logger.warning("switch to STANDARD(non-BULK) sync. Because the SalesForce BULK job has returned a failed status") - stream_is_available, error = standard_instance.check_availability(self.logger, None) - if not stream_is_available: - self.logger.warning(f"Skipped syncing stream '{standard_instance.name}' because it was unavailable. Error: {error}") - return - yield from standard_instance.read_records( - sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state - ) + params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + path = self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + job_full_url, job_status = self.execute_job(query=params["q"], url=f"{self.url_base}{path}") + if not job_full_url: + if job_status == "Failed": + # As rule as BULK logic returns unhandled error. For instance: + # error message: 'Unexpected exception encountered in query processing. + # Please contact support with the following id: 326566388-63578 (-436445966)'" + # Thus we can try to switch to GET sync request because its response returns obvious error message + standard_instance = self.get_standard_instance() + self.logger.warning("switch to STANDARD(non-BULK) sync. Because the SalesForce BULK job has returned a failed status") + stream_is_available, error = standard_instance.check_availability(self.logger, None) + if not stream_is_available: + self.logger.warning(f"Skipped syncing stream '{standard_instance.name}' because it was unavailable. Error: {error}") return - raise SalesforceException(f"Job for {self.name} stream using BULK API was failed.") - - count = 0 - record: Mapping[str, Any] = {} - for record in self.read_with_chunks(*self.download_data(url=job_full_url)): - count += 1 + yield from standard_instance.read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ) + return + raise SalesforceException(f"Job for {self.name} stream using BULK API was failed.") + salesforce_bulk_api_locator = None + while True: + req = PreparedRequest() + req.prepare_url(f"{job_full_url}/results", {"locator": salesforce_bulk_api_locator}) + tmp_file, response_encoding, response_headers = self.download_data(url=req.url) + for record in self.read_with_chunks(tmp_file, response_encoding): yield record - self.delete_job(url=job_full_url) - - if count < self.page_size: - # Salesforce doesn't give a next token or something to know the request was - # the last page. The connectors will sync batches in `page_size` and - # considers that batch is smaller than the `page_size` it must be the last page. - break - next_page_token = self.next_page_token(record) - if not next_page_token: - # not found a next page data. + if response_headers.get("Sforce-Locator", "null") == "null": break + salesforce_bulk_api_locator = response_headers.get("Sforce-Locator") + self.delete_job(url=job_full_url) def get_standard_instance(self) -> SalesforceStream: """Returns a instance of standard logic(non-BULK) with same settings""" @@ -580,6 +600,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any): class IncrementalRestSalesforceStream(RestSalesforceStream, ABC): state_checkpoint_interval = 500 STREAM_SLICE_STEP = 30 + _slice = None def __init__(self, replication_key: str, start_date: Optional[str], **kwargs): super().__init__(**kwargs) @@ -604,6 +625,7 @@ def stream_slices( while not end == now: start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP) end = min(now, initial_date.add(days=slice_number * self.STREAM_SLICE_STEP)) + self._slice = {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")} yield {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")} slice_number = slice_number + 1 @@ -632,17 +654,14 @@ def request_params( select_fields = ",".join(property_chunk.keys()) table_name = self.name where_conditions = [] - order_by_clause = "" if start_date: where_conditions.append(f"{self.cursor_field} >= {start_date}") if end_date: where_conditions.append(f"{self.cursor_field} < {end_date}") - if self.name not in UNSUPPORTED_FILTERING_STREAMS: - order_by_clause = f"ORDER BY {self.cursor_field} ASC" where_clause = f"WHERE {' AND '.join(where_conditions)}" - query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}" + query = f"SELECT {select_fields} FROM {table_name} {where_clause}" return {"q": query} @@ -653,39 +672,33 @@ def cursor_field(self) -> str: def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state - object and returning an updated state object. + object and returning an updated state object. Check if latest record is IN stream slice interval => ignore if not """ - latest_benchmark = latest_record[self.cursor_field] + latest_record_value: pendulum.DateTime = pendulum.parse(latest_record[self.cursor_field]) + slice_max_value: pendulum.DateTime = pendulum.parse(self._slice.get("end_date")) + max_possible_value = min(latest_record_value, slice_max_value) if current_stream_state.get(self.cursor_field): - return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])} - return {self.cursor_field: latest_benchmark} + if latest_record_value > slice_max_value: + return {self.cursor_field: max_possible_value.isoformat()} + max_possible_value = max(latest_record_value, pendulum.parse(current_stream_state[self.cursor_field])) + return {self.cursor_field: max_possible_value.isoformat()} class BulkIncrementalSalesforceStream(BulkSalesforceStream, IncrementalRestSalesforceStream): - def next_page_token(self, last_record: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: - return None + state_checkpoint_interval = None def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: - start_date = max( - (stream_state or {}).get(self.cursor_field, ""), - (stream_slice or {}).get("start_date", ""), - (next_page_token or {}).get("start_date", ""), - ) + start_date = stream_slice["start_date"] end_date = stream_slice["end_date"] - select_fields = ", ".join(self.get_json_schema().get("properties", {}).keys()) + select_fields = self.get_query_select_fields() table_name = self.name where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"] - order_by_clause = "" - - if self.name not in UNSUPPORTED_FILTERING_STREAMS: - order_by_fields = ", ".join([self.cursor_field, self.primary_key] if self.primary_key else [self.cursor_field]) - order_by_clause = f"ORDER BY {order_by_fields} ASC" where_clause = f"WHERE {' AND '.join(where_conditions)}" - query = f"SELECT {select_fields} FROM {table_name} {where_clause} {order_by_clause}" + query = f"SELECT {select_fields} FROM {table_name} {where_clause}" return {"q": query} diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 49f10f73bef4e..f5f10bc7ddf09 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -10,9 +10,12 @@ from datetime import datetime from unittest.mock import Mock +import freezegun +import pendulum import pytest import requests_mock from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type +from airbyte_cdk.utils import AirbyteTracedException from conftest import encoding_symbols_parameters, generate_stream from requests.exceptions import HTTPError from source_salesforce.api import Salesforce @@ -61,7 +64,7 @@ def test_bulk_stream_fallback_to_rest(mocker, requests_mock, stream_config, stre assert list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)) == rest_stream_records -def test_stream_unsupported_by_bulk(stream_config, stream_api, caplog): +def test_stream_unsupported_by_bulk(stream_config, stream_api): """ Stream `AcceptedEventRelation` is not supported by BULK API, so that REST API stream will be used for it. """ @@ -80,30 +83,26 @@ def test_stream_contains_unsupported_properties_by_bulk(stream_config, stream_ap assert not isinstance(stream, BulkSalesforceStream) -@pytest.mark.parametrize("item_number", [0, 15, 2000, 2324, 3000]) -def test_bulk_sync_pagination(item_number, stream_config, stream_api): +def test_bulk_sync_pagination(stream_config, stream_api, requests_mock): stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) - test_ids = [i for i in range(1, item_number)] - pages = [test_ids[i : i + stream.page_size] for i in range(0, len(test_ids), stream.page_size)] - if not pages: - pages = [[]] - with requests_mock.Mocker() as m: - creation_responses = [] - - for page in range(len(pages)): - job_id = f"fake_job_{page}" - creation_responses.append({"json": {"id": job_id}}) - m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) - resp = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in pages[page]] - m.register_uri("GET", stream.path() + f"/{job_id}/results", text="\n".join(resp)) - m.register_uri("DELETE", stream.path() + f"/{job_id}") - m.register_uri("POST", stream.path(), creation_responses) + job_id = "fake_job" + requests_mock.register_uri("POST", stream.path(), json={"id": job_id}) + requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) + resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)] + result_uri = requests_mock.register_uri("GET", stream.path() + f"/{job_id}/results", + [{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_1"}}, + {"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_2"}}, + {"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "null"}} + ] + ) + requests_mock.register_uri("DELETE", stream.path() + f"/{job_id}") - stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental))) - loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)] - assert not set(test_ids).symmetric_difference(set(loaded_ids)) - post_request_count = len([r for r in m.request_history if r.method == "POST"]) - assert post_request_count == len(pages) + stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental))) + loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)] + assert loaded_ids == [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4] + assert result_uri.call_count == 3 + assert result_uri.request_history[1].query == "locator=somelocator_1" + assert result_uri.request_history[2].query == "locator=somelocator_2" def _prepare_mock(m, stream): @@ -210,46 +209,51 @@ def test_stream_start_datetime_format_should_not_changed(stream_config, stream_a def test_download_data_filter_null_bytes(stream_config, stream_api): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=b"\x00") - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b"\x00") + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [] - m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00') - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00') + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [{"Id": "0014W000027f6UwQAI", "IsDeleted": "false"}] def test_read_with_chunks_should_return_only_object_data_type(stream_config, stream_api): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=b'"IsDeleted","Age"\n"false",24\n') - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b'"IsDeleted","Age"\n"false",24\n') + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [{"IsDeleted": "false", "Age": "24"}] def test_read_with_chunks_should_return_a_string_when_a_string_with_only_digits_is_provided(stream_config, stream_api): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=b'"ZipCode"\n"01234"\n') - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b'"ZipCode"\n"01234"\n') + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [{"ZipCode": "01234"}] def test_read_with_chunks_should_return_null_value_when_no_data_is_provided(stream_config, stream_api): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=b'"IsDeleted","Age","Name"\n"false",,"Airbyte"\n') - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) + m.register_uri("GET", job_full_url_results, content=b'"IsDeleted","Age","Name"\n"false",,"Airbyte"\n') + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == [{"IsDeleted": "false", "Age": None, "Name": "Airbyte"}] @@ -259,12 +263,13 @@ def test_read_with_chunks_should_return_null_value_when_no_data_is_provided(stre ids=[f"charset: {x[1]}, chunk_size: {x[0]}" for x in encoding_symbols_parameters()], ) def test_encoding_symbols(stream_config, stream_api, chunk_size, content_type, content, expected_result): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", headers={"Content-Type": f"text/html; charset={content_type}"}, content=content) - res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url, chunk_size=chunk_size))) + m.register_uri("GET", job_full_url_results, headers={"Content-Type": f"text/html; charset={content_type}"}, content=content) + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + res = list(stream.read_with_chunks(tmp_file, response_encoding)) assert res == expected_result @@ -312,6 +317,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state): While reading `stream_1` if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process. Next streams should not be executed. """ + stream_config.update({'start_date': '2021-10-01'}) stream_1: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) stream_2: BulkIncrementalSalesforceStream = generate_stream("Asset", stream_config, stream_api) streams = [stream_1, stream_2] @@ -335,7 +341,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state): m.register_uri("GET", stream.path() + f"/{job_id}", json={"state": "JobComplete"}) - resp = ["Field1,LastModifiedDate,Id"] + [f"test,2021-11-0{i},{i}" for i in range(1, 7)] # 6 records per page + resp = ["Field1,LastModifiedDate,Id"] + [f"test,2021-10-0{i},{i}" for i in range(1, 7)] # 6 records per page if page == 1: # Read the first page successfully @@ -358,7 +364,7 @@ def test_rate_limit_bulk(stream_config, stream_api, bulk_catalog, state): assert len(records) == 6 # stream page size: 6 state_record = [item for item in result if item.type == Type.STATE][0] - assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-11-05" # state checkpoint interval is 5. + assert state_record.state.data["Account"]["LastModifiedDate"] == "2021-10-05T00:00:00+00:00" # state checkpoint interval is 5. def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state): @@ -368,6 +374,7 @@ def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state): While reading `stream_1` if 403 (Rate Limit) is received, it should finish that stream with success and stop the sync process. Next streams should not be executed. """ + stream_config.update({'start_date': '2021-11-01'}) stream_1: IncrementalRestSalesforceStream = generate_stream("KnowledgeArticle", stream_config, stream_api) stream_2: IncrementalRestSalesforceStream = generate_stream("AcceptedEventRelation", stream_config, stream_api) @@ -426,7 +433,7 @@ def test_rate_limit_rest(stream_config, stream_api, rest_catalog, state): assert len(records) == 5 state_record = [item for item in result if item.type == Type.STATE][0] - assert state_record.state.data["KnowledgeArticle"]["LastModifiedDate"] == "2021-11-17" + assert state_record.state.data["KnowledgeArticle"]["LastModifiedDate"] == "2021-11-17T00:00:00+00:00" def test_pagination_rest(stream_config, stream_api): @@ -474,7 +481,7 @@ def test_pagination_rest(stream_config, stream_api): def test_csv_reader_dialect_unix(): stream: BulkSalesforceStream = BulkSalesforceStream(stream_name=None, sf_api=None, pk=None) - url = "https://fake-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + url_results = "https://fake-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" data = [ {"Id": "1", "Name": '"first_name" "last_name"'}, @@ -490,8 +497,9 @@ def test_csv_reader_dialect_unix(): text = csvfile.getvalue() with requests_mock.Mocker() as m: - m.register_uri("GET", url + "/results", text=text) - result = [i for i in stream.read_with_chunks(*stream.download_data(url))] + m.register_uri("GET", url_results, text=text) + tmp_file, response_encoding, _ = stream.download_data(url=url_results) + result = [i for i in stream.read_with_chunks(tmp_file, response_encoding)] assert result == data @@ -661,3 +669,116 @@ def test_stream_with_no_records_in_response(stream_config, stream_api_v2_pk_too_ ) records = list(stream.read_records(sync_mode=SyncMode.full_refresh)) assert records == [] + + +@pytest.mark.parametrize( + "status_code,response_json,log_message", + [ + (400, [{"errorCode": "INVALIDENTITY", "message": "Account is not supported by the Bulk API"}], "Account is not supported by the Bulk API"), + (403, [{"errorCode": "REQUEST_LIMIT_EXCEEDED", "message": "API limit reached"}], "API limit reached"), + (400, [{"errorCode": "API_ERROR", "message": "API does not support query"}], "The stream 'Account' is not queryable,"), + (400, [{"errorCode": "LIMIT_EXCEEDED", "message": "Max bulk v2 query jobs (10000) per 24 hrs has been reached (10021)"}], "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed.") + ] +) +def test_bulk_stream_error_in_logs_on_create_job(requests_mock, stream_config, stream_api, status_code, response_json, log_message, caplog): + """ + """ + stream = generate_stream("Account", stream_config, stream_api) + url = f"{stream.sf_api.instance_url}/services/data/{stream.sf_api.version}/jobs/query" + requests_mock.register_uri( + "POST", + url, + status_code=status_code, + json=response_json, + ) + query = "Select Id, Subject from Account" + with caplog.at_level(logging.ERROR): + assert stream.create_stream_job(query, url) is None, "this stream should be skipped" + + # check logs + assert log_message in caplog.records[-1].message + + +@pytest.mark.parametrize( + "status_code,response_json,error_message", + [ + (400, [{"errorCode": "TXN_SECURITY_METERING_ERROR", "message": "We can't complete the action because enabled transaction security policies took too long to complete."}], 'A transient authentication error occurred. To prevent future syncs from failing, assign the "Exempt from Transaction Security" user permission to the authenticated user.'), + ] +) +def test_bulk_stream_error_on_wait_for_job(requests_mock, stream_config, stream_api, status_code, response_json, error_message): + + stream = generate_stream("Account", stream_config, stream_api) + url = f"{stream.sf_api.instance_url}/services/data/{stream.sf_api.version}/jobs/query/queryJobId" + requests_mock.register_uri( + "GET", + url, + status_code=status_code, + json=response_json, + ) + with pytest.raises(AirbyteTracedException) as e: + stream.wait_for_job(url=url) + assert e.value.message == error_message + + +@freezegun.freeze_time("2023-01-01") +def test_bulk_stream_slices(stream_config_date_format, stream_api): + stream: BulkIncrementalSalesforceStream = generate_stream("FakeBulkStream", stream_config_date_format, stream_api) + stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh)) + expected_slices = [] + today = pendulum.today(tz="UTC") + start_date = pendulum.parse(stream.start_date, tz="UTC") + while start_date < today: + expected_slices.append({ + 'start_date': start_date.isoformat(timespec="milliseconds"), + 'end_date': min(today, start_date.add(days=stream.STREAM_SLICE_STEP)).isoformat(timespec="milliseconds") + }) + start_date = start_date.add(days=stream.STREAM_SLICE_STEP) + assert expected_slices == stream_slices + + +@freezegun.freeze_time("2023-04-01") +def test_bulk_stream_request_params_states(stream_config_date_format, stream_api, bulk_catalog, requests_mock): + """Check that request params ignore records cursor and use start date from slice ONLY""" + stream_config_date_format.update({"start_date": "2023-01-01"}) + stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config_date_format, stream_api) + + source = SourceSalesforce() + source.streams = Mock() + source.streams.return_value = [stream] + + job_id_1 = "fake_job_1" + requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": {"state": "JobComplete"}}]) + requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_1}") + requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}/results", text="Field1,LastModifiedDate,ID\ntest,2023-01-15,1") + requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_1}") + + job_id_2 = "fake_job_2" + requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": {"state": "JobComplete"}}]) + requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_2}") + requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,2\ntest,2023-02-20,22") + requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_2}") + + job_id_3 = "fake_job_3" + queries_history = requests_mock.register_uri("POST", stream.path(), [{"json": {"id": job_id_1}}, + {"json": {"id": job_id_2}}, + {"json": {"id": job_id_3}}]) + requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": {"state": "JobComplete"}}]) + requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_3}") + requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,3") + requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_3}") + + logger = logging.getLogger("airbyte") + state = {"Account": {"LastModifiedDate": "2023-01-01T10:10:10.000Z"}} + bulk_catalog.streams.pop(1) + result = [i for i in source.read(logger=logger, config=stream_config_date_format, catalog=bulk_catalog, state=state)] + + actual_state_values = [item.state.data.get("Account").get(stream.cursor_field) for item in result if item.type == Type.STATE] + # assert request params + assert "LastModifiedDate >= 2023-01-01T10:10:10.000+00:00 AND LastModifiedDate < 2023-01-31T10:10:10.000+00:00" in queries_history.request_history[0].text + assert "LastModifiedDate >= 2023-01-31T10:10:10.000+00:00 AND LastModifiedDate < 2023-03-02T10:10:10.000+00:00" in queries_history.request_history[1].text + assert "LastModifiedDate >= 2023-03-02T10:10:10.000+00:00 AND LastModifiedDate < 2023-04-01T00:00:00.000+00:00" in queries_history.request_history[2].text + + # assert states + # if connector meets record with cursor `2023-04-01` out of current slice range 2023-01-31 <> 2023-03-02, we ignore all other values and set state to slice end_date + expected_state_values = ["2023-01-15T00:00:00+00:00", "2023-03-02T10:10:10+00:00", "2023-04-01T00:00:00+00:00"] + assert actual_state_values == expected_state_values diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py index 197566f7637aa..75780d6938220 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py @@ -27,16 +27,17 @@ ], ) def test_memory_download_data(stream_config, stream_api, n_records, first_size, first_peak): - job_full_url: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA" + job_full_url_results: str = "https://fase-account.salesforce.com/services/data/v57.0/jobs/query/7504W00000bkgnpQAA/results" stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) content = b'"Id","IsDeleted"' for _ in range(n_records): content += b'"0014W000027f6UwQAI","false"\n' with requests_mock.Mocker() as m: - m.register_uri("GET", f"{job_full_url}/results", content=content) + m.register_uri("GET", job_full_url_results, content=content) tracemalloc.start() - for x in stream.read_with_chunks(*stream.download_data(url=job_full_url)): + tmp_file, response_encoding, _ = stream.download_data(url=job_full_url_results) + for x in stream.read_with_chunks(tmp_file, response_encoding): pass fs, fp = tracemalloc.get_traced_memory() first_size_in_mb, first_peak_in_mb = fs / 1024**2, fp / 1024**2 diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index ba5d388273c42..ba1cdff9b0d2f 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -10,6 +10,13 @@ This page contains the setup guide and reference information for the Salesforce - (For Airbyte Open Source) Salesforce [OAuth](https://help.salesforce.com/s/articleView?id=sf.remoteaccess_oauth_tokens_scopes.htm&type=5) credentials + +:::tip + +To use this connector, you'll need at least the Enterprise edition of Salesforce or the Professional Edition with API access purchased as an add-on. Reference the [Salesforce docs about API access](https://help.salesforce.com/s/articleView?id=000385436&type=1) for more information. + +::: + ## Setup guide ### Step 1: (Optional, Recommended) Create a read-only Salesforce user @@ -81,7 +88,7 @@ The Salesforce source connector supports the following sync modes: ### Incremental Deletes sync -The Salesforce connector retrieves deleted records from Salesforce. For the streams which support it, a deleted record will be marked with the `isDeleted=true` value in the respective field. +The Salesforce connector retrieves deleted records from Salesforce. For the streams which support it, a deleted record will be marked with `isDeleted=true`. ## Performance considerations @@ -91,36 +98,48 @@ The Salesforce connector is restricted by Salesforce’s [Daily Rate Limits](htt The Salesforce connector supports reading both Standard Objects and Custom Objects from Salesforce. Each object is read as a separate stream. See a list of all Salesforce Standard Objects [here](https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_list.htm). -Airbyte fetches and handles all the possible and available streams dynamically based on: +Airbyte allows exporting all available Salesforce objects dynamically based on: - If the authenticated Salesforce user has the Role and Permissions to read and fetch objects +- If the salesforce object has the queryable property set to true. Airbyte can only fetch objects which are queryable. If you don’t see an object available via Airbyte, and it is queryable, check if it is API-accessible to the Salesforce user you authenticated with. + +### A note on the BULK API vs REST API and their limitations + +Salesforce allows extracting data using either the [BULK API](https://developer.salesforce.com/docs/atlas.en-us.236.0.api_asynch.meta/api_asynch/asynch_api_intro.htm) or [REST API](https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/intro_what_is_rest_api.htm). To achieve fast performance, Salesforce recommends using the BULK API for extracting larger amounts of data (more than 2,000 records). For this reason, the Salesforce connector uses the BULK API by default to extract any Salesforce objects, unless any of the following conditions are met: + +- The Salesforce object has columns which are unsupported by the BULK API, like columns with a `base64` or `complexvalue` type +- The Salesforce object is not supported by BULK API. In this case we sync the objects via the REST API which will occasionalyl cost more of your API quota. This list of objects was obtained experimentally, and includes the following objects: + - AcceptedEventRelation + - Attachment + - CaseStatus + - ContractStatus + - DeclinedEventRelation + - FieldSecurityClassification + - KnowledgeArticle + - KnowledgeArticleVersion + - KnowledgeArticleVersionHistory + - KnowledgeArticleViewStat + - KnowledgeArticleVoteStat + - OrderStatus + - PartnerRole + - RecentlyViewed + - ServiceAppointmentStatus + - ShiftStatus + - SolutionStatus + - TaskPriority + - TaskStatus + - UndecidedEventRelation + +More information on the differences between various Salesforce APIs can be found [here](https://help.salesforce.com/s/articleView?id=sf.integrate_what_is_api.htm&type=5). + +:::info Force Using Bulk API + +If you set the `Force Use Bulk API` option to `true`, the connector will ignore unsupported properties and sync Stream using BULK API. + +::: + -- If the stream has the queryable property set to true. Airbyte can fetch only queryable streams via the API. If you don’t see your object available via Airbyte, check if it is API-accessible to the Salesforce user you authenticated with. - -**Note:** [BULK API](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_intro.htm) cannot be used to receive data from the following streams due to Salesforce API limitations. The Salesforce connector syncs them using the REST API which will occasionally cost more of your API quota: - -- AcceptedEventRelation -- Attachment -- CaseStatus -- ContractStatus -- DeclinedEventRelation -- FieldSecurityClassification -- KnowledgeArticle -- KnowledgeArticleVersion -- KnowledgeArticleVersionHistory -- KnowledgeArticleViewStat -- KnowledgeArticleVoteStat -- OrderStatus -- PartnerRole -- RecentlyViewed -- ServiceAppointmentStatus -- ShiftStatus -- SolutionStatus -- TaskPriority -- TaskStatus -- UndecidedEventRelation - -## Salesforce tutorials +## Tutorials Now that you have set up the Salesforce source connector, check out the following Salesforce tutorials: @@ -130,7 +149,8 @@ Now that you have set up the Salesforce source connector, check out the followin ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------- | +|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| +| 2.1.2 | 2023-08-10 | [28781](https://github.com/airbytehq/airbyte/pull/28781) | Fix pagination for BULK API jobs; Add option to force use BULK API | | 2.1.1 | 2023-07-06 | [28021](https://github.com/airbytehq/airbyte/pull/28021) | Several Vulnerabilities Fixes; switched to use alpine instead of slim, CVE-2022-40897, CVE-2023-29383, CVE-2023-31484, CVE-2016-2781 | | 2.1.0 | 2023-06-26 | [27726](https://github.com/airbytehq/airbyte/pull/27726) | License Update: Elv2 | | 2.0.14 | 2023-05-04 | [25794](https://github.com/airbytehq/airbyte/pull/25794) | Avoid pandas inferring wrong data types by forcing all data type as object |