Skip to content

Commit

Permalink
Source SalesForce: add logging + raise error instead of logs
Browse files Browse the repository at this point in the history
  • Loading branch information
artem1205 committed Jul 24, 2023
1 parent 4ddb464 commit bfdc7b6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Expand Up @@ -67,6 +67,7 @@ def _get_api_type(cls, stream_name: str, properties: dict, force_use_bulk_api: b
key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
}
rest_only = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS
logger.info(f"{stream_name=}, object-like and binary properties {properties_not_supported_by_bulk=}")
if rest_only:
logger.warning(f"BULK API is not supported for stream: {stream_name}")
return "rest"
Expand Down Expand Up @@ -95,6 +96,7 @@ def generate_streams(
selected_properties = stream_properties.get(stream_name, {}).get("properties", {})

api_type = cls._get_api_type(stream_name, selected_properties, config.get("force_use_bulk_api", False))
logger.info(f"{stream_name=} is of {api_type=}")
if api_type == "rest":
full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream
elif api_type == "bulk":
Expand Down
Expand Up @@ -334,15 +334,15 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
f"sobject options: {self.sobject_options}, error message: '{error_message}'"
)
elif error.response.status_code == codes.FORBIDDEN and error_code != "REQUEST_LIMIT_EXCEEDED":
self.logger.error(
raise SalesforceException(
f"Cannot receive data for stream '{self.name}' ,"
f"sobject options: {self.sobject_options}, error message: '{error_message}'"
)
) from error
elif error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
self.logger.error(
raise SalesforceException(
f"Cannot receive data for stream '{self.name}' ,"
f"sobject options: {self.sobject_options}, Error message: '{error_data.get('message')}'"
)
) from error
elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"):
self.logger.error(
f"The stream '{self.name}' is not queryable, "
Expand All @@ -357,7 +357,7 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
raise AirbyteTracedException(message=message, failure_type=FailureType.config_error, exception=error)
elif error.response.status_code == codes.BAD_REQUEST and error_code == "LIMIT_EXCEEDED":
message = "Your API key for Salesforce has reached its limit for the 24-hour period. We will resume replication once the limit has elapsed."
self.logger.error(message)
raise SalesforceException(message) from error
else:
raise error
else:
Expand Down Expand Up @@ -455,6 +455,7 @@ def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str, dic
) as data_file:
response_encoding = response.encoding or self.encoding
response_headers = response.headers
self.logger.info(f"job with {url=} download_data {response.headers=}")
for chunk in response.iter_content(chunk_size=chunk_size):
data_file.write(self.filter_null_bytes(chunk))
# check the file exists
Expand Down Expand Up @@ -521,7 +522,7 @@ def request_params(
query = f"SELECT {select_fields} FROM {self.name}"
if next_page_token:
query += next_page_token["next_token"]

self.logger.info(f"{query=}")
return {"q": query}

def read_records(
Expand Down

0 comments on commit bfdc7b6

Please sign in to comment.