diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index af9361e312519..721788683a955 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -67,6 +67,7 @@ def _get_api_type(cls, stream_name: str, properties: dict, force_use_bulk_api: b key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"] } rest_only = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS + logger.info(f"{stream_name=}, object-like and binary properties {properties_not_supported_by_bulk=}") if rest_only: logger.warning(f"BULK API is not supported for stream: {stream_name}") return "rest" @@ -95,6 +96,7 @@ def generate_streams( selected_properties = stream_properties.get(stream_name, {}).get("properties", {}) api_type = cls._get_api_type(stream_name, selected_properties, config.get("force_use_bulk_api", False)) + logger.info(f"{stream_name=} is of {api_type=}") if api_type == "rest": full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream elif api_type == "bulk": diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 2d015a28d29dd..23aaa800db36f 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -334,15 +334,15 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: f"sobject options: {self.sobject_options}, error message: '{error_message}'" ) elif error.response.status_code == codes.FORBIDDEN and error_code != "REQUEST_LIMIT_EXCEEDED": - self.logger.error( + raise SalesforceException( f"Cannot receive data for stream '{self.name}' ," f"sobject options: {self.sobject_options}, error message: '{error_message}'" - ) + ) from error elif error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": - self.logger.error( + raise SalesforceException( f"Cannot receive data for stream '{self.name}' ," f"sobject options: {self.sobject_options}, Error message: '{error_data.get('message')}'" - ) + ) from error elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"): self.logger.error( f"The stream '{self.name}' is not queryable, " @@ -357,7 +357,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error) elif error.response.status_code == codes.BAD_REQUEST and error_code == "LIMIT_EXCEEDED": message = "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed." - self.logger.error(message) + raise SalesforceException(message) from error else: raise error else: @@ -455,6 +455,7 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dic ) as data_file: response_encoding = response.encoding or self.encoding response_headers = response.headers + self.logger.info(f"job with {url=} download_data {response.headers=}") for chunk in response.iter_content(chunk_size=chunk_size): data_file.write(self.filter_null_bytes(chunk)) # check the file exists @@ -521,7 +522,7 @@ def request_params( query = f"SELECT {select_fields} FROM {self.name}" if next_page_token: query += next_page_token["next_token"] - + self.logger.info(f"{query=}") return {"q": query} def read_records(