Skip to content

Commit

Permalink
✨ Source Salesforce: Adding bulk stream mock server tests (#37749)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored May 6, 2024
1 parent d3864c2 commit 5c6024b
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.5.8
dockerImageTag: 2.5.9
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.5.8"
version = "2.5.9"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from airbyte_cdk.utils import AirbyteTracedException
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import ChunkedEncodingError, HTTPError
from salesforce_job_response_builder import SalesforceJobResponseBuilder
from salesforce_job_response_builder import JobInfoResponseBuilder
from source_salesforce.api import Salesforce
from source_salesforce.exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING
from source_salesforce.source import SourceSalesforce
Expand All @@ -47,7 +47,7 @@

_A_CHUNKED_RESPONSE = [b"first chunk", b"second chunk"]
_A_JSON_RESPONSE = {"id": "any id"}
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = SalesforceJobResponseBuilder().with_state("JobComplete").get_response()
_A_SUCCESSFUL_JOB_CREATION_RESPONSE = JobInfoResponseBuilder().with_state("JobComplete").get_response()
_A_PK = "a_pk"
_A_STREAM_NAME = "a_stream_name"

Expand Down Expand Up @@ -179,31 +179,6 @@ def test_stream_contains_unsupported_properties_by_bulk(stream_config, stream_ap
assert not isinstance(stream, BulkSalesforceStream)


def test_bulk_sync_pagination(stream_config, stream_api, requests_mock):
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
job_id = "fake_job"
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
resp_text = ["Field1,LastModifiedDate,ID"] + [f"test,2021-11-16,{i}" for i in range(5)]
result_uri = requests_mock.register_uri(
"GET",
stream.path() + f"/{job_id}/results",
[
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_1"}},
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "somelocator_2"}},
{"text": "\n".join(resp_text), "headers": {"Sforce-Locator": "null"}},
],
)
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id}")

stream_slices = next(iter(stream.stream_slices(sync_mode=SyncMode.incremental)))
loaded_ids = [int(record["ID"]) for record in stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices)]
assert loaded_ids == [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
assert result_uri.call_count == 3
assert result_uri.request_history[1].query == "locator=somelocator_1"
assert result_uri.request_history[2].query == "locator=somelocator_2"


def _prepare_mock(m, stream):
job_id = "fake_job_1"
m.register_uri("POST", stream.path(), json={"id": job_id})
Expand Down Expand Up @@ -481,7 +456,7 @@ def test_given_retryable_error_when_download_data_then_retry(send_http_request_p
@patch("source_salesforce.source.BulkSalesforceStream._non_retryable_send_http_request")
def test_given_first_download_fail_when_download_data_then_retry_job_only_once(send_http_request_patch):
sf_api = Mock()
sf_api.generate_schema.return_value = SalesforceJobResponseBuilder().with_state("JobComplete").get_response()
sf_api.generate_schema.return_value = JobInfoResponseBuilder().with_state("JobComplete").get_response()
sf_api.instance_url = "http://test_given_first_download_fail_when_download_data_then_retry_job.com"
job_creation_return_values = [_A_JSON_RESPONSE, _A_SUCCESSFUL_JOB_CREATION_RESPONSE]
send_http_request_patch.return_value.json.side_effect = job_creation_return_values * 2
Expand Down Expand Up @@ -869,13 +844,13 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config_date_format, stream_api, state=state, legacy=True)

job_id_1 = "fake_job_1"
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_1).with_state("JobComplete").get_response()}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}", [{"json": JobInfoResponseBuilder().with_id(job_id_1).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_1}")
requests_mock.register_uri("GET", stream.path() + f"/{job_id_1}/results", text="Field1,LastModifiedDate,ID\ntest,2023-01-15,1")
requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_1}")

job_id_2 = "fake_job_2"
requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_2).with_state("JobComplete").get_response()}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_2}", [{"json": JobInfoResponseBuilder().with_id(job_id_2).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_2}")
requests_mock.register_uri(
"GET", stream.path() + f"/{job_id_2}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,2\ntest,2023-02-20,22"
Expand All @@ -886,7 +861,7 @@ def test_bulk_stream_request_params_states(stream_config_date_format, stream_api
queries_history = requests_mock.register_uri(
"POST", stream.path(), [{"json": {"id": job_id_1}}, {"json": {"id": job_id_2}}, {"json": {"id": job_id_3}}]
)
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": SalesforceJobResponseBuilder().with_id(job_id_3).with_state("JobComplete").get_response()}])
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}", [{"json": JobInfoResponseBuilder().with_id(job_id_3).with_state("JobComplete").get_response()}])
requests_mock.register_uri("DELETE", stream.path() + f"/{job_id_3}")
requests_mock.register_uri("GET", stream.path() + f"/{job_id_3}/results", text="Field1,LastModifiedDate,ID\ntest,2023-04-01,3")
requests_mock.register_uri("PATCH", stream.path() + f"/{job_id_3}")
Expand Down Expand Up @@ -945,7 +920,7 @@ def test_stream_slices_for_substream(stream_config, stream_api, requests_mock):

job_id = "fake_job"
requests_mock.register_uri("POST", stream.path(), json={"id": job_id})
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=SalesforceJobResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
requests_mock.register_uri("GET", stream.path() + f"/{job_id}", json=JobInfoResponseBuilder().with_id(job_id).with_state("JobComplete").get_response())
requests_mock.register_uri(
"GET",
stream.path() + f"/{job_id}/results",
Expand Down
Loading

0 comments on commit 5c6024b

Please sign in to comment.