Skip to content

Commit

Permalink
🐛 Source Salesforce: fix response encoding (#17314)
Browse files Browse the repository at this point in the history
* 🐛 Source Salesforce: fix response encoding

* 🐛 Source Salesforce: fix REQUEST_LIMIT_EXCEEDED error handling

* 🐛 Source Salesforce: fix test memory

* 🐛 Source Salesforce: Bump version to 1.0.19

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
artem1205 and octavia-squidington-iii committed Sep 29, 2022
1 parent da5df45 commit f9f9fab
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 48 deletions.
Expand Up @@ -931,7 +931,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 1.0.18
dockerImageTag: 1.0.19
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Expand Up @@ -9763,7 +9763,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:1.0.18"
- dockerImage: "airbyte/source-salesforce:1.0.19"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-salesforce/Dockerfile
Expand Up @@ -13,5 +13,6 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.18
LABEL io.airbyte.version=1.0.19

LABEL io.airbyte.name=airbyte/source-salesforce
Expand Up @@ -11,7 +11,6 @@ tests:
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"

discovery:
- config_path: "secrets/config.json"
basic_read:
Expand Down
Expand Up @@ -48,23 +48,6 @@ def __init__(
self.schema: Mapping[str, Any] = schema # type: ignore[assignment]
self.sobject_options = sobject_options

def decode(self, chunk):
"""
Most Salesforce instances use UTF-8, but some use ISO-8859-1.
By default, we'll decode using UTF-8, and fallback to ISO-8859-1 if it doesn't work.
See implementation considerations for more details https://developer.salesforce.com/docs/atlas.en-us.api.meta/api/implementation_considerations.htm
"""
if self.encoding == DEFAULT_ENCODING:
try:
decoded = chunk.decode(self.encoding)
return decoded
except UnicodeDecodeError as e:
self.encoding = "ISO-8859-1"
self.logger.info(f"Could not decode chunk. Falling back to {self.encoding} encoding. Error: {e}")
return self.decode(chunk)
else:
return chunk.decode(self.encoding)

@property
def name(self) -> str:
return self.stream_name
Expand Down Expand Up @@ -206,6 +189,11 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
f"Cannot receive data for stream '{self.name}' ,"
f"sobject options: {self.sobject_options}, error message: '{error_message}'"
)
elif error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
self.logger.error(
f"Cannot receive data for stream '{self.name}' ,"
f"sobject options: {self.sobject_options}, Error message: '{error_data.get('message')}'"
)
elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"):
self.logger.error(
f"The stream '{self.name}' is not queryable, "
Expand Down Expand Up @@ -281,34 +269,34 @@ def filter_null_bytes(self, b: bytes):
self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res))
return res

def download_data(self, url: str, chunk_size: float = 1024) -> os.PathLike:
def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]:
"""
Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitaions.
Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitations.
@ url: string - the url of the `executed_job`
@ chunk_size: float - the buffer size for each chunk to fetch from stream, in bytes, default: 1024 bytes
Returns the string with file path of downloaded binary data. Saved temporarily.
@ chunk_size: int - the buffer size for each chunk to fetch from stream, in bytes, default: 1024 bytes
Return the tuple containing string with file path of downloaded binary data (Saved temporarily) and file encoding.
"""
# set filepath for binary data from response
tmp_file = os.path.realpath(os.path.basename(url))
with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response:
with open(tmp_file, "wb") as data_file:
for chunk in response.iter_content(chunk_size=chunk_size):
data_file.write(self.filter_null_bytes(chunk))
with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response, open(tmp_file, "wb") as data_file:
response_encoding = response.encoding or response.apparent_encoding or self.encoding
for chunk in response.iter_content(chunk_size=chunk_size):
data_file.write(self.filter_null_bytes(chunk))
# check the file exists
if os.path.isfile(tmp_file):
return tmp_file
return tmp_file, response_encoding
else:
raise TmpFileIOError(f"The IO/Error occured while verifying binary data. Stream: {self.name}, file {tmp_file} doesn't exist.")

def read_with_chunks(self, path: str = None, chunk_size: int = 100) -> Iterable[Tuple[int, Mapping[str, Any]]]:
def read_with_chunks(self, path: str, file_encoding: str, chunk_size: int = 100) -> Iterable[Tuple[int, Mapping[str, Any]]]:
"""
Reads the downloaded binary data, using lines chunks, set by `chunk_size`.
@ path: string - the path to the downloaded temporarily binary data.
@ file_encoding: string - encoding for binary data file according to Standard Encodings from codecs module
@ chunk_size: int - the number of lines to read at a time, default: 100 lines / time.
"""
try:
with open(path, "r", encoding=self.encoding) as data:
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(data, chunksize=chunk_size, iterator=True, dialect="unix")
for chunk in chunks:
chunk = chunk.replace({nan: None}).to_dict(orient="records")
Expand Down Expand Up @@ -382,7 +370,7 @@ def read_records(

count = 0
record: Mapping[str, Any] = {}
for record in self.read_with_chunks(self.download_data(url=job_full_url)):
for record in self.read_with_chunks(*self.download_data(url=job_full_url)):
count += 1
yield record
self.delete_job(url=job_full_url)
Expand Down
Expand Up @@ -12,7 +12,7 @@
import pytest
import requests_mock
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type
from conftest import generate_stream
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import HTTPError
from source_salesforce.source import SourceSalesforce
from source_salesforce.streams import (
Expand Down Expand Up @@ -184,14 +184,29 @@ def test_download_data_filter_null_bytes(stream_config, stream_api):

with requests_mock.Mocker() as m:
m.register_uri("GET", f"{job_full_url}/results", content=b"\x00")
res = list(stream.read_with_chunks(stream.download_data(url=job_full_url)))
res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url)))
assert res == []

m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00')
res = list(stream.read_with_chunks(stream.download_data(url=job_full_url)))
res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url)))
assert res == [{"Id": "0014W000027f6UwQAI", "IsDeleted": False}]


@pytest.mark.parametrize(
"chunk_size, content_type, content, expected_result",
encoding_symbols_parameters(),
ids=[f"charset: {x[1]}, chunk_size: {x[0]}" for x in encoding_symbols_parameters()],
)
def test_encoding_symbols(stream_config, stream_api, chunk_size, content_type, content, expected_result):
job_full_url: str = "https://fase-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA"
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)

with requests_mock.Mocker() as m:
m.register_uri("GET", f"{job_full_url}/results", headers={"Content-Type": f"text/html; charset={content_type}"}, content=content)
res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url, chunk_size=chunk_size)))
assert res == expected_result


@pytest.mark.parametrize(
"login_status_code, login_json_resp, discovery_status_code, discovery_resp_json, expected_error_msg",
(
Expand Down Expand Up @@ -415,7 +430,7 @@ def test_csv_reader_dialect_unix():

with requests_mock.Mocker() as m:
m.register_uri("GET", url + "/results", text=text)
result = [i for i in stream.read_with_chunks(stream.download_data(url))]
result = [i for i in stream.read_with_chunks(*stream.download_data(url))]
assert result == data


Expand Down Expand Up @@ -513,10 +528,3 @@ def test_convert_to_standard_instance(stream_config, stream_api):
bulk_stream = generate_stream("Account", stream_config, stream_api)
rest_stream = bulk_stream.get_standard_instance()
assert isinstance(rest_stream, IncrementalSalesforceStream)


def test_decoding(stream_config, stream_api):
stream_name = "AcceptedEventRelation"
stream = generate_stream(stream_name, stream_config, stream_api)
assert stream.decode(b"\xe9\x97\xb4\xe5\x8d\x95\xe7\x9a\x84\xe8\xaf\xb4 \xf0\x9f\xaa\x90") == "间单的说 🪐"
assert stream.decode(b"0\xe5") == "0å"
Expand Up @@ -101,3 +101,15 @@ def stream_api_v2(stream_config):

def generate_stream(stream_name, stream_config, stream_api):
return SourceSalesforce.generate_streams(stream_config, {stream_name: None}, stream_api)[0]


def encoding_symbols_parameters():
return [(x, "ISO-8859-1", b'"\xc4"\n,"4"\n\x00,"\xca \xfc"', [{"Ä": "4"}, {"Ä": "Ê ü"}]) for x in range(1, 11)] + [
(
x,
"utf-8",
b'"\xd5\x80"\n "\xd5\xaf","\xd5\xaf"\n\x00,"\xe3\x82\x82 \xe3\x83\xa4 \xe3\x83\xa4 \xf0\x9d\x9c\xb5"',
[{"Հ": "կ"}, {"Հ": "も ヤ ヤ 𝜵"}],
)
for x in range(1, 11)
]
Expand Up @@ -16,8 +16,8 @@
(
(1000, 0.4, 1),
(10000, 1, 2),
(100000, 4, 7),
(200000, 7, 12),
(100000, 4, 9),
(200000, 7, 19),
),
ids=[
"1k recods",
Expand All @@ -36,7 +36,7 @@ def test_memory_download_data(stream_config, stream_api, n_records, first_size,
with requests_mock.Mocker() as m:
m.register_uri("GET", f"{job_full_url}/results", content=content)
tracemalloc.start()
for x in stream.read_with_chunks(stream.download_data(url=job_full_url)):
for x in stream.read_with_chunks(*stream.download_data(url=job_full_url)):
pass
fs, fp = tracemalloc.get_traced_memory()
first_size_in_mb, first_peak_in_mb = fs / 1024**2, fp / 1024**2
Expand Down

0 comments on commit f9f9fab

Please sign in to comment.