From a8240d9a28af38bb16db678acb9a18ad3f98b3d1 Mon Sep 17 00:00:00 2001 From: Roman Yermilov Date: Tue, 5 Dec 2023 15:25:03 +0100 Subject: [PATCH] Source Hubspot: unit tests --- .../source-hubspot/source_hubspot/streams.py | 4 +- .../source-hubspot/unit_tests/conftest.py | 10 ++++ .../source-hubspot/unit_tests/test_source.py | 23 ++++++++ .../source-hubspot/unit_tests/test_streams.py | 52 +++++++++++++++++++ 4 files changed, 88 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index e7d21fafb6d8a..516ba0fe0c62f 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -2126,7 +2126,9 @@ def client_side_filter_by_state(self, records: Iterable[Mapping[str, Any]], stre if object_id not in stream_state or record[self.cursor_field] > stream_state[object_id][self.cursor_field]: yield record - def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] | None = None, stream_state: Mapping[str, Any] | None = None) -> Iterable[Mapping[str, Any] | None]: + def stream_slices( + self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: now = pendulum.now(tz="UTC") for parent_slice in super().stream_slices(sync_mode, cursor_field, stream_state): diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/conftest.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/conftest.py index f5e052e9d43b5..08fe337e61c8c 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/conftest.py @@ -51,6 +51,16 @@ def config_fixture(): return { "start_date": "2021-01-10T00:00:00Z", "credentials": {"credentials_title": "Private App Credentials", "access_token": "test_access_token"}, + "enable_experimental_streams": False + } + + +@pytest.fixture(name="config_experimental") +def config_eperimantal_fixture(): + return { + "start_date": "2021-01-10T00:00:00Z", + "credentials": {"credentials_title": "Private App Credentials", "access_token": "test_access_token"}, + "enable_experimental_streams": True } diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py index c984a7d7ba3a1..acaad00705897 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_source.py @@ -88,6 +88,29 @@ def test_streams(requests_mock, config): assert len(streams) == 30 +@mock.patch("source_hubspot.source.SourceHubspot.get_custom_object_streams") +def test_streams(requests_mock, config_experimental): + + streams = SourceHubspot().streams(config_experimental) + + assert len(streams) == 42 + + +def test_custom_streams(config_experimental): + custom_object_stream_instances = [ + MagicMock() + ] + streams = SourceHubspot().get_web_analytics_custom_objects_stream( + custom_object_stream_instances=custom_object_stream_instances, + common_params={ + "api": MagicMock(), + "start_date": "2021-01-01T00:00:00Z", + "credentials": config_experimental["credentials"] + } + ) + assert len(list(streams)) == 1 + + def test_check_credential_title_exception(config): config["credentials"].pop("credentials_title") diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_streams.py index dade0ccdb4ac4..c6c842e6c6da6 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_streams.py @@ -11,6 +11,7 @@ ContactLists, Contacts, ContactsMergedAudit, + ContactsWebAnalytics, CustomObject, DealPipelines, Deals, @@ -30,6 +31,7 @@ Owners, OwnersArchived, Products, + PropertyHistory, RecordUnnester, TicketPipelines, Tickets, @@ -503,3 +505,53 @@ def test_get_custom_objects_metadata_success(requests_mock, custom_object_schema def test_records_unnester(input_data, unnest_fields, expected_output): unnester = RecordUnnester(fields=unnest_fields) assert list(unnester.unnest(input_data)) == expected_output + + +def test_web_analytics_stream_slices(common_params, mocker): + parent_slicer_mock = mocker.patch("airbyte_cdk.sources.streams.http.HttpSubStream.stream_slices") + parent_slicer_mock.return_value = (_ for _ in [{"parent": {"id": 1}}]) + + stream = ContactsWebAnalytics(**common_params) + slices = list(stream.stream_slices(SyncMode.incremental, cursor_field="occurredAt")) + + assert len(slices) == 1 + assert slices[0]["objectId"] == 1 + + +def test_web_analytics_latest_state(common_params, mocker): + parent_slicer_mock = mocker.patch("airbyte_cdk.sources.streams.http.HttpSubStream.stream_slices") + parent_slicer_mock.return_value = (_ for _ in [{"parent": {"id": "1"}}]) + + parent_slicer_mock = mocker.patch("source_hubspot.streams.Stream.read_records") + parent_slicer_mock.return_value = (_ for _ in [{"objectId": "1", "occurredAt": "2021-01-02T00:00:00Z"}]) + + stream = ContactsWebAnalytics(**common_params) + stream.state = {"1": {"occurredAt": "2021-01-01T00:00:00Z"}} + slices = list(stream.stream_slices(SyncMode.incremental, cursor_field="occurredAt")) + records = [list(stream.read_records(SyncMode.incremental, cursor_field="occurredAt", stream_slice=stream_slice)) for stream_slice in slices] + + assert len(slices) == 1 + assert len(records) == 1 + assert len(records[0]) == 1 + assert records[0][0]["objectId"] == "1" + assert stream.state["1"]["occurredAt"] == "2021-01-02T00:00:00Z" + + +def test_property_history_transform(common_params): + stream = PropertyHistory(**common_params) + versions = [ + { + "value": "Georgia", + "timestamp": 1645135236625 + } + ] + records = [ + { + "vid": 1, + "properties": { + "hs_country": {"versions": versions}, + "lastmodifieddate": {"value": 1645135236625} + } + } + ] + assert [{"vid": 1, "property": "hs_country", **version} for version in versions] == list(stream._transform(records=records))