Skip to content

Commit

Permalink
馃悰 Source Salesforce: 400 error for non-queryable streams (#9005)
Browse files Browse the repository at this point in the history
  • Loading branch information
antixar committed Dec 23, 2021
1 parent ff5648a commit a4be492
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
"name": "Salesforce",
"dockerRepository": "airbyte/source-salesforce",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
"icon": "salesforce.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6119,7 +6119,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:0.1.9"
- dockerImage: "airbyte/source-salesforce:0.1.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
import logging
from pathlib import Path
from typing import Any, Mapping

import pytest
from airbyte_cdk.sources.streams import Stream
from source_salesforce.source import SourceSalesforce

HERE = Path(__file__).parent


@pytest.fixture(name="input_config")
def parse_input_config():
with open(HERE.parent / "secrets/config_bulk.json", "r") as file:
return json.loads(file.read())


def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream:
stream_cls = type("a", (object,), {"name": stream_name})
configured_stream_cls = type("b", (object,), {"stream": stream_cls()})
catalog_cls = type("c", (object,), {"streams": [configured_stream_cls()]})
return SourceSalesforce().streams(input_config, catalog_cls())[0]


def get_any_real_stream(input_config: Mapping[str, Any]) -> Stream:
return get_stream(input_config, "Account")


def test_not_queryable_stream(caplog, input_config):
stream = get_any_real_stream(input_config)
url = f"{stream.sf_api.instance_url}/services/data/v52.0/jobs/query"

# test non queryable BULK streams
query = "Select Id, Subject from ActivityHistory"
with caplog.at_level(logging.WARNING):
assert stream.create_stream_job(query, url) is None, "this stream should be skipped"

# check logs
assert "is not queryable" in caplog.records[-1].message
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from typing import Any, List, Mapping, Optional, Tuple

import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from requests.exceptions import HTTPError

from .exceptions import TypeSalesforceException
from .rate_limiting import default_backoff_handler
Expand Down Expand Up @@ -168,6 +170,7 @@


class Salesforce:
logger = AirbyteLogger()
version = "v52.0"

def __init__(
Expand Down Expand Up @@ -227,12 +230,15 @@ def get_validated_streams(self, catalog: ConfiguredAirbyteCatalog = None):
def _make_request(
self, http_method: str, url: str, headers: dict = None, body: dict = None, stream: bool = False, params: dict = None
) -> requests.models.Response:
if http_method == "GET":
resp = self.session.get(url, headers=headers, stream=stream, params=params)
elif http_method == "POST":
resp = self.session.post(url, headers=headers, data=body)
resp.raise_for_status()

try:
if http_method == "GET":
resp = self.session.get(url, headers=headers, stream=stream, params=params)
elif http_method == "POST":
resp = self.session.post(url, headers=headers, data=body)
resp.raise_for_status()
except HTTPError as err:
self.logger.warn(f"http error body: {err.response.text}")
raise
return resp

def login(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def should_give_up(exc):
give_up = False

if give_up:
logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}, body: {exc.response.json()}")
logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}, body: {exc.response.text}")
return give_up

return backoff.on_exception(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@


class SalesforceStream(HttpStream, ABC):

page_size = 2000

transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
Expand Down Expand Up @@ -99,7 +98,6 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:


class BulkSalesforceStream(SalesforceStream):

page_size = 30000
DEFAULT_WAIT_TIMEOUT_MINS = 10
MAX_CHECK_INTERVAL_SECONDS = 2.0
Expand Down Expand Up @@ -143,20 +141,34 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
return job_id
except exceptions.HTTPError as error:
if error.response.status_code in [codes.FORBIDDEN, codes.BAD_REQUEST]:
# A part of streams can't be used by BULK API. Every API version can have a custom list of
# these sobjects. Another part of them can be generated dynamically. That's why we can't track
# them preliminarily and there is only one way is to except error with necessary messages about
# their limitations. Now we know about 3 different reasons of similar errors:
# 1) some SaleForce sobjects(streams) is not supported by the BULK API simply (as is).
# 2) Access to a sobject(stream) is not available
# 3) sobject is not queryable. It means this sobject can't be called directly.
# We can call it as part of response from another sobject only. E.g.:
# initial query: "Select Id, Subject from ActivityHistory" -> error
# updated query: "Select Name, (Select Subject,ActivityType from ActivityHistories) from Contact"
# The second variant forces customisation for every case (ActivityHistory, ActivityHistories etc).
# And the main problem is these subqueries doesn't support CSV response format.
error_data = error.response.json()[0]
if (error_data.get("message", "") == "Selecting compound data not supported in Bulk Query") or (
error_data.get("errorCode", "") == "INVALIDENTITY"
and "is not supported by the Bulk API" in error_data.get("message", "")
error_code = error_data.get("errorCode")
error_message = error_data.get("message", "")
if error_message == "Selecting compound data not supported in Bulk Query" or (
error_code == "INVALIDENTITY" and "is not supported by the Bulk API" in error_message
):
self.logger.error(
f"Cannot receive data for stream '{self.name}' using BULK API, error message: '{error_data.get('message')}'"
)
elif error.response.status_code == codes.FORBIDDEN and not error_data.get("errorCode", "") == "REQUEST_LIMIT_EXCEEDED":
self.logger.error(f"Cannot receive data for stream '{self.name}', error message: '{error_data.get('message')}'")
self.logger.error(f"Cannot receive data for stream '{self.name}' using BULK API, error message: '{error_message}'")
elif error.response.status_code == codes.FORBIDDEN and error_code != "REQUEST_LIMIT_EXCEEDED":
self.logger.error(f"Cannot receive data for stream '{self.name}', error message: '{error_message}'")
elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"):
self.logger.error(f"The stream '{self.name}' is not queryable, error message: '{error_message}'")
else:
raise error
else:
raise error
return None

def wait_for_job(self, url: str) -> str:
# using "seconds" argument because self._wait_timeout can be changed by tests
Expand Down Expand Up @@ -197,7 +209,7 @@ def execute_job(self, query: str, url: str) -> str:
job_status = self.wait_for_job(url=job_full_url)
if job_status not in ["UploadComplete", "InProgress"]:
break
self.logger.error(f"Waiting error. Try to run this job again {i+1}/{self.MAX_RETRY_NUMBER}...")
self.logger.error(f"Waiting error. Try to run this job again {i + 1}/{self.MAX_RETRY_NUMBER}...")
self.abort_job(url=job_full_url)
job_status = "Aborted"

Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ List of available streams:

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |

| 0.1.10 | 2021-12-23 | [9005](https://github.com/airbytehq/airbyte/pull/9005) | Handling 400 error when a stream is not queryable |
| 0.1.9 | 2021-12-07 | [8405](https://github.com/airbytehq/airbyte/pull/8405) | Filter 'null' byte(s) in HTTP responses |
| 0.1.8 | 2021-11-30 | [8191](https://github.com/airbytehq/airbyte/pull/8191) | Make `start_date` optional and change its format to `YYYY-MM-DD` |
| 0.1.7 | 2021-11-24 | [8206](https://github.com/airbytehq/airbyte/pull/8206) | Handling 400 error when trying to create a job for sync using Bulk API. |
Expand Down
8 changes: 7 additions & 1 deletion tools/ci_credentials/ci_credentials/secrets_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@ def __load_gsm_secrets(self) -> Mapping[Tuple[str, str], str]:
filename = DEFAULT_SECRET_FILE_WITH_EXT
log_name = f'{secret_name.split("/")[-1]}({connector_name})'
self.logger.info(f"found GSM secret: {log_name} = > {filename}")
secret_url = f"https://secretmanager.googleapis.com/v1/{secret_name}/versions/latest:access"

versions_url = f"https://secretmanager.googleapis.com/v1/{secret_name}/versions"
data = self.api.get(versions_url)
enabled_versions = [version["name"] for version in data["versions"] if version["state"] == "ENABLED"]
if len(enabled_versions) > 1:
self.logger.critical(f"{log_name} should have one enabled version at the same time!!!")

secret_url = f"https://secretmanager.googleapis.com/v1/{enabled_versions[0]}:access"
data = self.api.get(secret_url)
secret_value = data.get("payload", {}).get("data")
if not secret_value:
Expand Down
12 changes: 11 additions & 1 deletion tools/ci_credentials/tests/test_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,16 @@ def test_read(connector_name, gsm_secrets, expected_secrets):
"connector": connector_name,
}
} for i, k in enumerate(gsm_secrets)]}
matcher_secret = re.compile("https://secretmanager.googleapis.com/v1/.+/versions/latest:access")

matcher_versions = re.compile("https://secretmanager.googleapis.com/v1/.+/versions")
versions_response_list = [{"json": {
"versions": [{
"name": f"projects/<fake_id>/secrets/SECRET_{connector_name.upper()}_{i}_CREDS/versions/1",
"state": "ENABLED",
}]
}} for i in range(len(gsm_secrets))]

matcher_secret = re.compile("https://secretmanager.googleapis.com/v1/.+/1:access")
secrets_response_list = [{
"json": {"payload": {"data": base64.b64encode(json.dumps(v).encode()).decode("utf-8")}}
} for v in gsm_secrets.values()]
Expand All @@ -116,6 +125,7 @@ def test_read(connector_name, gsm_secrets, expected_secrets):
m.get(matcher_gsm_list, json=secrets_list)
m.post(matcher_gsm_list, json={"name": "<fake_name>"})
m.post(matcher_version, json={})
m.get(matcher_versions, versions_response_list)
m.get(matcher_secret, secrets_response_list)

secrets = [(*k, v.replace(" ", "")) for k, v in loader.read_from_gsm().items()]
Expand Down

0 comments on commit a4be492

Please sign in to comment.