diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f11ff7818dd5c..2aecc56717a79 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -931,7 +931,7 @@ - name: Salesforce sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962 dockerRepository: airbyte/source-salesforce - dockerImageTag: 1.0.18 + dockerImageTag: 1.0.19 documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce icon: salesforce.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 5f579e691d2a9..cbd3be27634ae 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -9763,7 +9763,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-salesforce:1.0.18" +- dockerImage: "airbyte/source-salesforce:1.0.19" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 19b51b0c83a85..6eef8e2bcd352 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -13,5 +13,6 @@ RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.0.18 +LABEL io.airbyte.version=1.0.19 + LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml index 95fde01f89995..87ae3d60ce8b7 100644 --- a/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml @@ -11,7 +11,6 @@ tests: status: "succeed" - config_path: "integration_tests/invalid_config.json" status: "failed" - discovery: - config_path: "secrets/config.json" basic_read: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index b3a9363938636..c22a79e12f1f7 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -48,23 +48,6 @@ def __init__( self.schema: Mapping[str, Any] = schema # type: ignore[assignment] self.sobject_options = sobject_options - def decode(self, chunk): - """ - Most Salesforce instances use UTF-8, but some use ISO-8859-1. - By default, we'll decode using UTF-8, and fallback to ISO-8859-1 if it doesn't work. - See implementation considerations for more details https://developer.salesforce.com/docs/atlas.en-us.api.meta/api/implementation_considerations.htm - """ - if self.encoding == DEFAULT_ENCODING: - try: - decoded = chunk.decode(self.encoding) - return decoded - except UnicodeDecodeError as e: - self.encoding = "ISO-8859-1" - self.logger.info(f"Could not decode chunk. Falling back to {self.encoding} encoding. Error: {e}") - return self.decode(chunk) - else: - return chunk.decode(self.encoding) - @property def name(self) -> str: return self.stream_name @@ -206,6 +189,11 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]: f"Cannot receive data for stream '{self.name}' ," f"sobject options: {self.sobject_options}, error message: '{error_message}'" ) + elif error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED": + self.logger.error( + f"Cannot receive data for stream '{self.name}' ," + f"sobject options: {self.sobject_options}, Error message: '{error_data.get('message')}'" + ) elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"): self.logger.error( f"The stream '{self.name}' is not queryable, " @@ -281,34 +269,34 @@ def filter_null_bytes(self, b: bytes): self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res)) return res - def download_data(self, url: str, chunk_size: float = 1024) -> os.PathLike: + def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]: """ - Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitaions. + Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitations. @ url: string - the url of the `executed_job` - @ chunk_size: float - the buffer size for each chunk to fetch from stream, in bytes, default: 1024 bytes - - Returns the string with file path of downloaded binary data. Saved temporarily. + @ chunk_size: int - the buffer size for each chunk to fetch from stream, in bytes, default: 1024 bytes + Return the tuple containing string with file path of downloaded binary data (Saved temporarily) and file encoding. """ # set filepath for binary data from response tmp_file = os.path.realpath(os.path.basename(url)) - with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response: - with open(tmp_file, "wb") as data_file: - for chunk in response.iter_content(chunk_size=chunk_size): - data_file.write(self.filter_null_bytes(chunk)) + with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response, open(tmp_file, "wb") as data_file: + response_encoding = response.encoding or response.apparent_encoding or self.encoding + for chunk in response.iter_content(chunk_size=chunk_size): + data_file.write(self.filter_null_bytes(chunk)) # check the file exists if os.path.isfile(tmp_file): - return tmp_file + return tmp_file, response_encoding else: raise TmpFileIOError(f"The IO/Error occured while verifying binary data. Stream: {self.name}, file {tmp_file} doesn't exist.") - def read_with_chunks(self, path: str = None, chunk_size: int = 100) -> Iterable[Tuple[int, Mapping[str, Any]]]: + def read_with_chunks(self, path: str, file_encoding: str, chunk_size: int = 100) -> Iterable[Tuple[int, Mapping[str, Any]]]: """ Reads the downloaded binary data, using lines chunks, set by `chunk_size`. @ path: string - the path to the downloaded temporarily binary data. + @ file_encoding: string - encoding for binary data file according to Standard Encodings from codecs module @ chunk_size: int - the number of lines to read at a time, default: 100 lines / time. """ try: - with open(path, "r", encoding=self.encoding) as data: + with open(path, "r", encoding=file_encoding) as data: chunks = pd.read_csv(data, chunksize=chunk_size, iterator=True, dialect="unix") for chunk in chunks: chunk = chunk.replace({nan: None}).to_dict(orient="records") @@ -382,7 +370,7 @@ def read_records( count = 0 record: Mapping[str, Any] = {} - for record in self.read_with_chunks(self.download_data(url=job_full_url)): + for record in self.read_with_chunks(*self.download_data(url=job_full_url)): count += 1 yield record self.delete_job(url=job_full_url) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 083b85e812f82..2334c4945ba0e 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -12,7 +12,7 @@ import pytest import requests_mock from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type -from conftest import generate_stream +from conftest import encoding_symbols_parameters, generate_stream from requests.exceptions import HTTPError from source_salesforce.source import SourceSalesforce from source_salesforce.streams import ( @@ -184,14 +184,29 @@ def test_download_data_filter_null_bytes(stream_config, stream_api): with requests_mock.Mocker() as m: m.register_uri("GET", f"{job_full_url}/results", content=b"\x00") - res = list(stream.read_with_chunks(stream.download_data(url=job_full_url))) + res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) assert res == [] m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00') - res = list(stream.read_with_chunks(stream.download_data(url=job_full_url))) + res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url))) assert res == [{"Id": "0014W000027f6UwQAI", "IsDeleted": False}] +@pytest.mark.parametrize( + "chunk_size, content_type, content, expected_result", + encoding_symbols_parameters(), + ids=[f"charset: {x[1]}, chunk_size: {x[0]}" for x in encoding_symbols_parameters()], +) +def test_encoding_symbols(stream_config, stream_api, chunk_size, content_type, content, expected_result): + job_full_url: str = "https://fase-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA" + stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api) + + with requests_mock.Mocker() as m: + m.register_uri("GET", f"{job_full_url}/results", headers={"Content-Type": f"text/html; charset={content_type}"}, content=content) + res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url, chunk_size=chunk_size))) + assert res == expected_result + + @pytest.mark.parametrize( "login_status_code, login_json_resp, discovery_status_code, discovery_resp_json, expected_error_msg", ( @@ -415,7 +430,7 @@ def test_csv_reader_dialect_unix(): with requests_mock.Mocker() as m: m.register_uri("GET", url + "/results", text=text) - result = [i for i in stream.read_with_chunks(stream.download_data(url))] + result = [i for i in stream.read_with_chunks(*stream.download_data(url))] assert result == data @@ -513,10 +528,3 @@ def test_convert_to_standard_instance(stream_config, stream_api): bulk_stream = generate_stream("Account", stream_config, stream_api) rest_stream = bulk_stream.get_standard_instance() assert isinstance(rest_stream, IncrementalSalesforceStream) - - -def test_decoding(stream_config, stream_api): - stream_name = "AcceptedEventRelation" - stream = generate_stream(stream_name, stream_config, stream_api) - assert stream.decode(b"\xe9\x97\xb4\xe5\x8d\x95\xe7\x9a\x84\xe8\xaf\xb4 \xf0\x9f\xaa\x90") == "间单的说 🪐" - assert stream.decode(b"0\xe5") == "0å" diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py index e5996e13284c6..e0de6179f98f3 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/conftest.py @@ -101,3 +101,15 @@ def stream_api_v2(stream_config): def generate_stream(stream_name, stream_config, stream_api): return SourceSalesforce.generate_streams(stream_config, {stream_name: None}, stream_api)[0] + + +def encoding_symbols_parameters(): + return [(x, "ISO-8859-1", b'"\xc4"\n,"4"\n\x00,"\xca \xfc"', [{"Ä": "4"}, {"Ä": "Ê ü"}]) for x in range(1, 11)] + [ + ( + x, + "utf-8", + b'"\xd5\x80"\n "\xd5\xaf","\xd5\xaf"\n\x00,"\xe3\x82\x82 \xe3\x83\xa4 \xe3\x83\xa4 \xf0\x9d\x9c\xb5"', + [{"Հ": "կ"}, {"Հ": "も ヤ ヤ 𝜵"}], + ) + for x in range(1, 11) + ] diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py index fcd05203dbbc9..47546c34517f8 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/test_memory.py @@ -16,8 +16,8 @@ ( (1000, 0.4, 1), (10000, 1, 2), - (100000, 4, 7), - (200000, 7, 12), + (100000, 4, 9), + (200000, 7, 19), ), ids=[ "1k recods", @@ -36,7 +36,7 @@ def test_memory_download_data(stream_config, stream_api, n_records, first_size, with requests_mock.Mocker() as m: m.register_uri("GET", f"{job_full_url}/results", content=content) tracemalloc.start() - for x in stream.read_with_chunks(stream.download_data(url=job_full_url)): + for x in stream.read_with_chunks(*stream.download_data(url=job_full_url)): pass fs, fp = tracemalloc.get_traced_memory() first_size_in_mb, first_peak_in_mb = fs / 1024**2, fp / 1024**2