From e3a4b86309999b68db92e1ba8f02194c2517d6ef Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Mon, 12 Jun 2023 20:30:55 +0300 Subject: [PATCH 01/11] added handler for 402 error --- .../connectors/source-mixpanel/Dockerfile | 2 +- .../connectors/source-mixpanel/metadata.yaml | 2 +- .../source_mixpanel/streams/base.py | 6 ++++++ .../source-mixpanel/unit_tests/test_streams.py | 17 +++++++++++++++++ docs/integrations/sources/mixpanel.md | 3 ++- 5 files changed, 27 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/Dockerfile b/airbyte-integrations/connectors/source-mixpanel/Dockerfile index 9ca98101d3c54..a845db65bbfc7 100644 --- a/airbyte-integrations/connectors/source-mixpanel/Dockerfile +++ b/airbyte-integrations/connectors/source-mixpanel/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.34 +LABEL io.airbyte.version=0.1.35 LABEL io.airbyte.name=airbyte/source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml index 2e743964fc948..b98749378efc8 100644 --- a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml +++ b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml @@ -6,7 +6,7 @@ data: connectorSubtype: api connectorType: source definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a - dockerImageTag: 0.1.34 + dockerImageTag: 0.1.35 dockerRepository: airbyte/source-mixpanel githubIssueLabel: source-mixpanel icon: mixpanel.svg diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 8028948fdab21..19f3b802a4ec0 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -106,6 +106,12 @@ def backoff_time(self, response: requests.Response) -> float: self.retries += 1 return 2**self.retries * 60 + def should_retry(self, response: requests.Response) -> bool: + if response.status_code == 402: + self.logger.warning(f"Unable to perform a request. Payment Required: {response.text}") + return False + return super().should_retry(response) + def get_stream_params(self) -> Mapping[str, Any]: """ Fetch required parameters in a given stream. Used to create sub-streams diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index a44bd95af0134..77b9fee771b33 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -468,3 +468,20 @@ def test_export_iter_dicts(config): assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record] # drop record parts because they are not standing nearby assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record] + + +@pytest.mark.parametrize( + ("http_status_code", "should_retry", "log_message"), + [ + (402, False, "Unable to perform a request. Payment Required: "), + ], +) +def test_should_retry_payment_required(http_status_code, should_retry, log_message, config, caplog): + response_mock = MagicMock() + response_mock.status_code = http_status_code + response_mock.text = "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing" + streams = [Annotations, CohortMembers, Cohorts, Engage, EngageSchema, Export, ExportSchema, Funnels, FunnelsList, Revenue] + for stream_class in streams: + stream = stream_class(authenticator=MagicMock(), **config) + assert stream.should_retry(response_mock) == should_retry + assert log_message in caplog.text diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 3ef7e5b45ccf9..bfd9d4d9d3083 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -50,7 +50,8 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------| -| 0.1.34 | 2022-05-15 | [21837](https://github.com/airbytehq/airbyte/pull/21837) | Add "insert_id" field to "export" stream schema | +| 0.1.35 | 2022-06-12 | | Add should_retry for 402 error | +| 0.1.34 | 2022-05-15 | [21837](https://github.com/airbytehq/airbyte/pull/21837) | Add "insert_id" field to "export" stream schema | | 0.1.33 | 2023-04-25 | [25543](https://github.com/airbytehq/airbyte/pull/25543) | Set should_retry for 104 error in stream export | | 0.1.32 | 2023-04-11 | [25056](https://github.com/airbytehq/airbyte/pull/25056) | Set HttpAvailabilityStrategy, add exponential backoff, streams export and annotations add undeclared fields | | 0.1.31 | 2023-02-13 | [22936](https://github.com/airbytehq/airbyte/pull/22936) | Specified date formatting in specification | From 6bf5aadd15f8c0b27e1efb1bd8daaddf049e4e84 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Tue, 13 Jun 2023 17:33:10 +0300 Subject: [PATCH 02/11] added changelog --- docs/integrations/sources/mixpanel.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index bfd9d4d9d3083..7a91bb172522c 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -50,7 +50,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------| -| 0.1.35 | 2022-06-12 | | Add should_retry for 402 error | +| 0.1.35 | 2022-06-12 | [27252](https://github.com/airbytehq/airbyte/pull/27252) | Add should_retry False for 402 error | | 0.1.34 | 2022-05-15 | [21837](https://github.com/airbytehq/airbyte/pull/21837) | Add "insert_id" field to "export" stream schema | | 0.1.33 | 2023-04-25 | [25543](https://github.com/airbytehq/airbyte/pull/25543) | Set should_retry for 104 error in stream export | | 0.1.32 | 2023-04-11 | [25056](https://github.com/airbytehq/airbyte/pull/25056) | Set HttpAvailabilityStrategy, add exponential backoff, streams export and annotations add undeclared fields | From d124b193926fec1001b7a3742035981cf309a0bc Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Tue, 13 Jun 2023 17:34:37 +0300 Subject: [PATCH 03/11] fixed CAT, updated logging --- .../integration_tests/expected_records.jsonl | 14 +++++++------- .../source_mixpanel/schemas/export.json | 18 +++++++++++++++--- .../source_mixpanel/streams/base.py | 2 +- .../source-mixpanel/unit_tests/test_streams.py | 2 +- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-mixpanel/integration_tests/expected_records.jsonl index 7daf145e9d95e..3043c905590d9 100644 --- a/airbyte-integrations/connectors/source-mixpanel/integration_tests/expected_records.jsonl +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/expected_records.jsonl @@ -1,16 +1,16 @@ -{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-05-29", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1 }, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0 } ], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1 } }, "emitted_at": 1684508037955} -{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-05-27", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1 }, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0 } ], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1 } }, "emitted_at": 1684508037956} -{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-05-28", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1 }, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0 } ], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1 } }, "emitted_at": 1684508037956} +{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-13", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1684508037955} +{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-10", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1} }, "emitted_at": 1684508037956} +{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-09", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1684508037956} {"stream": "engage", "data": {"distinct_id": "123@gmail.com", "email": "123@gmail.com", "name": "123", "123": "123456", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042343} {"stream": "engage", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042345} {"stream": "engage", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042346} {"stream": "annotations", "data": {"date": "2023-01-15T12:00:00+01:00", "description": "test", "id": 1138193, "project_id": 2529987, "user": {"id": 3440095, "first_name": "", "last_name": ""}}, "emitted_at": 1684508044902} {"stream": "annotations", "data": {"date": "2023-01-13T12:00:00+01:00", "description": "test123", "id": 1138196, "project_id": 2529987, "user": {"id": 3440095, "first_name": "", "last_name": ""}}, "emitted_at": 1684508044904} {"stream": "annotations", "data": {"date": "2023-01-13T12:00:00+01:00", "description": "test121233", "id": 1138197, "project_id": 2529987, "user": {"id": 3440095, "first_name": "", "last_name": ""}}, "emitted_at": 1684508044904} -{"stream": "export", "data": {"event": "Signed up", "import": "True", "insert_id": "29fc2962-6d9c-455d-95ad-95b84f09b9e4", "mp_api_endpoint": "api.mixpanel.com", "mp_api_timestamp_ms": "1685362729000", "distinct_id": "91304156-cafc-4673-a237-623d1129c801", "mp_processing_time_ms": "1685362729276", "time": "2023-05-29T14:17:52Z"}, "emitted_at": 1685362954820} +{"stream": "export", "data": {"event": "Browse", "browser": "Chrome", "created": "2020-04-24T06:59:41", "email": "Ray.Rodriguez@hotmailx.com", "first_name": "Ray", "import": "True", "initial_referrer": "bing.com", "insert_id": "bBquCestadcpFdhz", "last_name": "Rodriguez", "mp_api_endpoint": "api.mixpanel.com", "mp_api_timestamp_ms": "1686653210771", "os": "Windows", "Abandon Cart Count": "2", "Account Created Count": "2", "Add To Cart Count": "3", "Affiliate": "Amazon", "Browse Count": "2", "Browse Filter": "['Under $1000', 'Books', 'Microsoft']", "Campaign Name": "Super Sale", "Campaign Source": "Facebook", "Card Type": "Visa", "Cart Items": "Lord of the Rings Novels", "Cart Size": "7", "Cart Size (# of Items)": "10", "Cart Value": "577", "Complete Purchase Count": "3", "Coupon": "First Purchase", "Coupon Count Used": "1", "Date of Last Item Detail View": "2020-03-17T16:24:40", "Delivery Day": "Ground", "Delivery Fee": "7", "Delivery Fees": "7", "Delivery Method": "Ground", "Delivery Method Added Count": "1", "Gender": "Female", "Item Category": "Electronics", "Item Cost": "651", "Item Detail Page Count": "7", "Item Name": "Bose Speaker", "Item Rating": "5", "Items in Browse": "49", "Landing Page Loaded Count": "2", "Last Cart Abandonment": "2020-03-17T16:28:15", "Last Event": "Add To Cart", "Last Purchase": "2020-03-17T16:24:21", "Last Search": "2020-04-01T00:39:55", "Marketing A/B Test": "Control", "Misc Fee": "34", "Misc Fees": "17", "Number of Cards Added": "2", "Number of Cart Abandons": "7", "Number of Item Details Viewed": "22", "Number of Purchases": "8", "Number of Searches": "14", "Page Version": "B", "Payment Method Added Count": "1", "Platform": "Web", "Registration Date": "2020-03-17T15:49:09", "Registration Method": "Twitter", "Review Payment Count": "4", "Search Count": "3", "Search Page": "Homepage", "Search Results Count": "3", "Search Term": "TVs", "Suggested Item": "True", "Total Charge": "664", "UTM_Medium": "Organic", "UTM_Term": "Novels", "UTM_source": "Google", "Within Checkout Process": "True", "distinct_id": "1c28216e-b971-4f68-b879-4898a4624256", "mp_lib": "web", "mp_processing_time_ms": "1686653211817", "time": "2023-06-13T12:34:07Z"}, "emitted_at": 1686655046334} {"stream": "cohorts", "data": {"id": 1478097, "project_id": 2529987, "name": "Cohort1", "description": "", "data_group_id": null, "count": 2, "is_visible": 1, "created": "2021-09-14 15:57:43"}, "emitted_at": 1684508052373} {"stream": "cohort_members", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1684508059432} {"stream": "cohort_members", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1684508059434} -{"stream": "revenue", "data": {"date": "2023-05-27", "amount": 0.0, "count": 3, "paid_count": 0 }, "emitted_at": 1684508063120} -{"stream": "revenue", "data": {"date": "2023-05-28", "amount": 0.0, "count": 3, "paid_count": 0 }, "emitted_at": 1684508063121} -{"stream": "revenue", "data": {"date": "2023-05-29", "amount": 0.0, "count": 3, "paid_count": 0 }, "emitted_at": 1684508063121} +{"stream": "revenue", "data": {"date": "2023-06-11", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063120} +{"stream": "revenue", "data": {"date": "2023-06-12", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063121} +{"stream": "revenue", "data": {"date": "2023-06-13", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063121} diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json index cc158dece5dc5..492728e7d6c3d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/schemas/export.json @@ -237,9 +237,6 @@ "URL": { "type": ["null", "string"] }, - "insert_id": { - "type": ["null", "string"] - }, "mp_api_timestamp_ms": { "type": ["null", "string"] }, @@ -248,6 +245,21 @@ }, "mp_processing_time_ms": { "type": ["null", "string"] + }, + "region": { + "type": ["null", "string"] + }, + "mp_country_code": { + "type": ["null", "string"] + }, + "Share Item Count": { + "type": ["null", "string"] + }, + "city": { + "type": ["null", "string"] + }, + "Last Share": { + "type": ["null", "string"] } } } diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 19f3b802a4ec0..32e0044906d8f 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -108,7 +108,7 @@ def backoff_time(self, response: requests.Response) -> float: def should_retry(self, response: requests.Response) -> bool: if response.status_code == 402: - self.logger.warning(f"Unable to perform a request. Payment Required: {response.text}") + self.logger.warning(f"Unable to perform a request. Payment Required: {response.json()['error']}") return False return super().should_retry(response) diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 77b9fee771b33..f6cfe7f2e26e2 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -479,7 +479,7 @@ def test_export_iter_dicts(config): def test_should_retry_payment_required(http_status_code, should_retry, log_message, config, caplog): response_mock = MagicMock() response_mock.status_code = http_status_code - response_mock.text = "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing" + response_mock.json = MagicMock(return_value={"error": "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing"}) streams = [Annotations, CohortMembers, Cohorts, Engage, EngageSchema, Export, ExportSchema, Funnels, FunnelsList, Revenue] for stream_class in streams: stream = stream_class(authenticator=MagicMock(), **config) From 239963a95e365b09678a371ade4a499465e2181a Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Tue, 13 Jun 2023 17:42:49 +0300 Subject: [PATCH 04/11] changed stream in check_connection --- .../connectors/source-mixpanel/source_mixpanel/source.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index 6d5063ead948b..4a6bfd6a2e9a9 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -79,10 +79,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> try: config = self._validate_and_transform(config) auth = self.get_authenticator(config) - FunnelsList.max_retries = 0 - funnels = FunnelsList(authenticator=auth, **config) - funnels.reqs_per_hour_limit = 0 - next(read_full_refresh(funnels), None) + annotations = Annotations(authenticator=auth, **config) + annotations.reqs_per_hour_limit = 0 + next(read_full_refresh(annotations), None) except requests.HTTPError as e: return False, e.response.json()["error"] except Exception as e: From 5c2348c0349d62dc9bdaa6b25379448503f0789b Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Tue, 13 Jun 2023 18:28:25 +0300 Subject: [PATCH 05/11] refactored tests --- .../source-mixpanel/source_mixpanel/source.py | 2 +- .../source-mixpanel/unit_tests/test_source.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index 4a6bfd6a2e9a9..18fbabfa1b565 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -13,7 +13,7 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.auth import BasicHttpAuthenticator, TokenAuthenticator -from .streams import Annotations, CohortMembers, Cohorts, Engage, Export, Funnels, FunnelsList, Revenue +from .streams import Annotations, CohortMembers, Cohorts, Engage, Export, Funnels, Revenue from .testing import adapt_streams_if_testing, adapt_validate_if_testing from .utils import read_full_refresh diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py index c2ad5b3aad076..3ffd6f324df1b 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py @@ -8,7 +8,7 @@ from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import AirbyteConnectionStatus, Status from source_mixpanel.source import SourceMixpanel, TokenAuthenticatorBase64 -from source_mixpanel.streams import FunnelsList +from source_mixpanel.streams import Annotations from .utils import command_check, get_url_to_mock, setup_response @@ -18,16 +18,15 @@ @pytest.fixture def check_connection_url(config): auth = TokenAuthenticatorBase64(token=config["api_secret"]) - funnel_list = FunnelsList(authenticator=auth, **config) - return get_url_to_mock(funnel_list) + annotations = Annotations(authenticator=auth, **config) + return get_url_to_mock(annotations) @pytest.mark.parametrize( "response_code,expect_success,response_json", [ (200, True, {}), - (400, False, {"error": "Request error"}), - (500, False, {"error": "Server error"}), + (400, False, {"error": "Request error"}) ], ) def test_check_connection(requests_mock, check_connection_url, config_raw, response_code, expect_success, response_json): @@ -69,7 +68,8 @@ def test_streams_string_date(requests_mock, config_raw): def test_streams_disabled_402(requests_mock, config_raw): - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(402, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(402, {})) + json_response = {"error": "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing"} + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(402, json_response)) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(402, json_response)) streams = SourceMixpanel().streams(config_raw) assert {s.name for s in streams} == {"funnels", "revenue", "annotations", "cohorts"} From 368e2d24e5f4fbc8840c150c87926f3dd72ac8cd Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Tue, 13 Jun 2023 20:58:11 +0300 Subject: [PATCH 06/11] added check for all streams to avoid 402 error --- .../source-mixpanel/source_mixpanel/source.py | 22 ++++++++++++++----- .../source-mixpanel/unit_tests/test_source.py | 12 +++++++++- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index 18fbabfa1b565..e09bf75cff105 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -79,14 +79,26 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> try: config = self._validate_and_transform(config) auth = self.get_authenticator(config) - annotations = Annotations(authenticator=auth, **config) - annotations.reqs_per_hour_limit = 0 - next(read_full_refresh(annotations), None) - except requests.HTTPError as e: - return False, e.response.json()["error"] except Exception as e: return False, e + # https://github.com/airbytehq/airbyte/pull/27252#discussion_r1228356872 + # temporary solution, testing access for all streams to avoid 402 error + streams = [Annotations, Cohorts, Engage, Export, Revenue] + for stream_class in streams: + try: + stream = stream_class(authenticator=auth, **config) + stream.reqs_per_hour_limit = 0 + next(read_full_refresh(stream), None) + break + except requests.HTTPError as e: + if e.response.status_code == 402: + logger.info(f"Stream {stream_class.__name__}: {e.response.json()['error']}") + else: + return False, e.response.json()["error"] + except Exception as e: + return False, e + return True, None @adapt_streams_if_testing diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py index 3ffd6f324df1b..d1aa2ea8ca3ec 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py @@ -8,7 +8,7 @@ from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import AirbyteConnectionStatus, Status from source_mixpanel.source import SourceMixpanel, TokenAuthenticatorBase64 -from source_mixpanel.streams import Annotations +from source_mixpanel.streams import Annotations, Cohorts from .utils import command_check, get_url_to_mock, setup_response @@ -38,6 +38,16 @@ def test_check_connection(requests_mock, check_connection_url, config_raw, respo assert error == expected_error +def test_check_connection_402_error_on_first_stream(requests_mock, check_connection_url, config, config_raw): + auth = TokenAuthenticatorBase64(token=config["api_secret"]) + requests_mock.register_uri("GET", get_url_to_mock(Cohorts(authenticator=auth, **config)), setup_response(200, {})) + requests_mock.register_uri("GET", get_url_to_mock(Annotations(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"})) + + ok, error = SourceMixpanel().check_connection(logger, config_raw) + assert ok is True + assert error is None + + def test_check_connection_bad_config(): config = {} source = SourceMixpanel() From d47eea61257ee85d3cbd2606dedd594339785662 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Tue, 13 Jun 2023 21:49:47 +0300 Subject: [PATCH 07/11] added check for all stream for right permissions --- .../source-mixpanel/source_mixpanel/source.py | 14 +++++++++-- .../source-mixpanel/unit_tests/test_source.py | 23 +++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index e09bf75cff105..dc2103d61eb14 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -111,12 +111,22 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: logger.info(f"Using start_date: {config['start_date']}, end_date: {config['end_date']}") auth = self.get_authenticator(config) - streams = [ + streams = [] + for stream in [ Annotations(authenticator=auth, **config), Cohorts(authenticator=auth, **config), Funnels(authenticator=auth, **config), Revenue(authenticator=auth, **config), - ] + ]: + try: + next(read_full_refresh(stream), None) + except requests.HTTPError as e: + if e.response.status_code != 402: + raise e + logger.warning("Stream '%s' - is disabled, reason: 402 Payment Required", stream.name) + else: + streams.append(stream) + # streams with dynamically generated schema for stream in [ diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py index d1aa2ea8ca3ec..56be7678d9ba5 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py @@ -63,23 +63,36 @@ def test_check_connection_incomplete(config_raw): def test_streams(requests_mock, config_raw): requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, {"error": "Payment required"})) streams = SourceMixpanel().streams(config_raw) - assert len(streams) == 7 + assert len(streams) == 6 def test_streams_string_date(requests_mock, config_raw): requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, {"error": "Payment required"})) config = copy.deepcopy(config_raw) config["start_date"] = "2020-01-01" config["end_date"] = "2020-01-02" streams = SourceMixpanel().streams(config) - assert len(streams) == 7 + assert len(streams) == 6 def test_streams_disabled_402(requests_mock, config_raw): json_response = {"error": "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing"} - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(402, json_response)) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(402, json_response)) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(402, json_response)) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(402, json_response)) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, json_response)) streams = SourceMixpanel().streams(config_raw) - assert {s.name for s in streams} == {"funnels", "revenue", "annotations", "cohorts"} + assert {s.name for s in streams} == {'annotations', 'export', 'engage', 'cohort_members'} From b87c523a4944d14a9da3f013d5fcf136649c35b0 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Wed, 14 Jun 2023 16:33:01 +0300 Subject: [PATCH 08/11] upadated expected records --- .../integration_tests/expected_records.jsonl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-mixpanel/integration_tests/expected_records.jsonl index d5db2a3f9bb16..fca816c5b6088 100644 --- a/airbyte-integrations/connectors/source-mixpanel/integration_tests/expected_records.jsonl +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/expected_records.jsonl @@ -1,5 +1,12 @@ -{"stream": "funnels", "data": {"funnel_id": 8901755, "name": "Onboarding funnel", "date": "2023-06-02", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Viewed Home Page", "goal": "Viewed Home Page", "step_label": "Viewed Home Page", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Signed Up", "goal": "Signed Up", "step_label": "Signed Up", "overall_conv_ratio": 0, "step_conv_ratio": 0}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Onboarding - Action Performed", "goal": "Onboarding - Action Performed", "step_label": "Onboarding - Action Performed", "selector": "(string(properties[\"action\"], \"undefined\") == \"Set a name for your workspace\")", "selector_params": {"step_label": "Onboarding - Action Performed", "bool_op": "and", "property_filter_params_list": [{"filter": {"operator": "==", "operand": ["Set a name for your workspace"]}, "property": {"name": "action", "source": "properties", "type": "string"}, "selected_property_type": "string", "type": "string"}]}, "overall_conv_ratio": 0, "step_conv_ratio": 0}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Onboarding - Action Performed", "goal": "Onboarding - Action Performed", "step_label": "Onboarding - Action Performed", "selector": "(string(properties[\"action\"], \"undefined\") == \"Invite your team to your workspace\")", "selector_params": {"step_label": "Onboarding - Action Performed", "bool_op": "and", "property_filter_params_list": [{"filter": {"operator": "==", "operand": ["Invite your team to your workspace"]}, "property": {"name": "action", "source": "properties", "type": "string"}, "selected_property_type": "string", "type": "string"}]}, "overall_conv_ratio": 0, "step_conv_ratio": 0}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Onboarding - Action Performed", "goal": "Onboarding - Action Performed", "step_label": "Onboarding - Action Performed", "selector": "(string(properties[\"action\"], \"undefined\") == \"Enter the website you want to unblock\")", "selector_params": {"step_label": "Onboarding - Action Performed", "bool_op": "and", "property_filter_params_list": [{"filter": {"operator": "==", "operand": ["Enter the website you want to unblock"]}, "property": {"name": "action", "source": "properties", "type": "string"}, "selected_property_type": "string", "type": "string"}]}, "overall_conv_ratio": 0, "step_conv_ratio": 0}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Onboarding - Action Performed", "goal": "Onboarding - Action Performed", "step_label": "Onboarding - Action Performed", "selector": "(string(properties[\"action\"], \"undefined\") == \"Install Dataline on your website\")", "selector_params": {"step_label": "Onboarding - Action Performed", "bool_op": "and", "property_filter_params_list": [{"filter": {"operator": "==", "operand": ["Install Dataline on your website"]}, "property": {"name": "action", "source": "properties", "type": "string"}, "selected_property_type": "string", "type": "string"}]}, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 6, "worst": 1}}, "emitted_at": 1686595087241} -{"stream": "engage", "data": {"distinct_id": "22885b19-781a-44cd-a8d9-ce46970a3fd6", "browser": "Chrome", "browser_version": "81.0.4044.138", "city": "San Francisco", "country_code": "US", "email": "john+test5@dataline.io", "first_name": "John", "last_name": "Laflur", "name": "John Laflur", "region": "California", "timezone": "America/Los_Angeles", "id": "22885b19-781a-44cd-a8d9-ce46970a3fd6", "last_seen": "2020-05-19T17:53:14"}, "emitted_at": 1686595088617} -{"stream": "cohorts", "data": {"id": 1343181, "project_id": 2117889, "name": "Users in California", "description": "Users in California description", "data_group_id": null, "count": 45, "is_visible": 1, "created": "2021-07-01 22:02:05"}, "emitted_at": 1686595090896} -{"stream": "cohort_members", "data": {"distinct_id": "44b3e80b-894c-480e-9cc0-20cb27784e48", "browser": "Chrome", "browser_version": "85.0.4183.121", "city": "Laguna Niguel", "country_code": "US", "email": "alec@brev.dev", "first_name": "Alec", "last_name": "Fong", "name": "Alec Fong", "region": "California", "timezone": "America/Los_Angeles", "id": "0e2a7bf1-47c5-4c98-b4eb-52859d98adc7", "unblocked": "true", "last_seen": "2020-10-13T22:03:01", "cohort_id": 1343181}, "emitted_at": 1686595093527} -{"stream": "revenue", "data": {"date": "2023-06-02", "amount": 0.0, "count": 121, "paid_count": 0}, "emitted_at": 1686595094576} +{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-13", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1684508037955} +{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-10", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1} }, "emitted_at": 1684508037956} +{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-09", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1684508037956} +{"stream": "engage", "data": {"distinct_id": "123@gmail.com", "email": "123@gmail.com", "name": "123", "123": "123456", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042343} +{"stream": "engage", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042345} +{"stream": "engage", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042346} +{"stream": "cohorts", "data": {"id": 1478097, "project_id": 2529987, "name": "Cohort1", "description": "", "data_group_id": null, "count": 2, "is_visible": 1, "created": "2021-09-14 15:57:43"}, "emitted_at": 1684508052373} +{"stream": "cohort_members", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1684508059432} +{"stream": "cohort_members", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1684508059434} +{"stream": "revenue", "data": {"date": "2023-06-11", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063120} +{"stream": "revenue", "data": {"date": "2023-06-12", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063121} +{"stream": "revenue", "data": {"date": "2023-06-13", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063121} \ No newline at end of file From e46af16957ee42fb4281f11bd27c44e1218b4fb3 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Wed, 14 Jun 2023 19:45:59 +0300 Subject: [PATCH 09/11] updated streams method --- .../source-mixpanel/source_mixpanel/source.py | 15 +-------- .../source-mixpanel/unit_tests/test_source.py | 31 ++++++++++++++----- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index dc2103d61eb14..a59dda525ffc1 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -117,25 +117,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Cohorts(authenticator=auth, **config), Funnels(authenticator=auth, **config), Revenue(authenticator=auth, **config), - ]: - try: - next(read_full_refresh(stream), None) - except requests.HTTPError as e: - if e.response.status_code != 402: - raise e - logger.warning("Stream '%s' - is disabled, reason: 402 Payment Required", stream.name) - else: - streams.append(stream) - - - # streams with dynamically generated schema - for stream in [ CohortMembers(authenticator=auth, **config), Engage(authenticator=auth, **config), Export(authenticator=auth, **config), ]: try: - stream.get_json_schema() + next(read_full_refresh(stream), None) except requests.HTTPError as e: if e.response.status_code != 402: raise e diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py index 56be7678d9ba5..978f1f43c09ca 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py @@ -61,23 +61,38 @@ def test_check_connection_incomplete(config_raw): def test_streams(requests_mock, config_raw): + requests_mock.register_uri("POST", "https://mixpanel.com/api/2.0/engage?page_size=1000", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {"id": 123})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, {"error": "Payment required"})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels", setup_response(200, {})) + requests_mock.register_uri( + "GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(200, {"funnel_id": 123, "name": "name"}) + ) + requests_mock.register_uri( + "GET", "https://data.mixpanel.com/api/2.0/export", + setup_response(200, {"event": "some event", "properties": {"event": 124, "time": 124124}}) + ) + streams = SourceMixpanel().streams(config_raw) - assert len(streams) == 6 + assert len(streams) == 7 def test_streams_string_date(requests_mock, config_raw): - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(402, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {})) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {"id": 123})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {})) + requests_mock.register_uri("POST", "https://mixpanel.com/api/2.0/engage", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, {"error": "Payment required"})) + requests_mock.register_uri( + "GET", "https://data.mixpanel.com/api/2.0/export", + setup_response(200, {"event": "some event", "properties": {"event": 124, "time": 124124}}) + ) config = copy.deepcopy(config_raw) config["start_date"] = "2020-01-01" config["end_date"] = "2020-01-02" @@ -87,12 +102,14 @@ def test_streams_string_date(requests_mock, config_raw): def test_streams_disabled_402(requests_mock, config_raw): json_response = {"error": "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing"} + requests_mock.register_uri("POST", "https://mixpanel.com/api/2.0/engage?page_size=1000", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(402, json_response)) - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(402, json_response)) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, json_response)) + requests_mock.register_uri("GET", "https://data.mixpanel.com/api/2.0/export?from_date=2017-01-20&to_date=2017-02-18", setup_response(402, json_response)) streams = SourceMixpanel().streams(config_raw) - assert {s.name for s in streams} == {'annotations', 'export', 'engage', 'cohort_members'} + assert {s.name for s in streams} == {'annotations', 'engage', 'revenue'} From ca330e7993e2ee2fb8e267d870d14df6a212d458 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Thu, 15 Jun 2023 11:34:23 +0300 Subject: [PATCH 10/11] added get_json_schema to streams method --- .../connectors/source-mixpanel/source_mixpanel/source.py | 1 + .../connectors/source-mixpanel/unit_tests/test_source.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index a59dda525ffc1..6ff593935e9dc 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -123,6 +123,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ]: try: next(read_full_refresh(stream), None) + stream.get_json_schema() except requests.HTTPError as e: if e.response.status_code != 402: raise e diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py index 978f1f43c09ca..18beba82e6d79 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py @@ -82,7 +82,7 @@ def test_streams(requests_mock, config_raw): def test_streams_string_date(requests_mock, config_raw): - requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(402, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {"id": 123})) From 30dd9202484d309302acbcaefd9970140034f5a3 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko Date: Thu, 15 Jun 2023 16:48:21 +0300 Subject: [PATCH 11/11] refactored check_connection method --- .../source-mixpanel/source_mixpanel/source.py | 12 ++++++++---- .../source-mixpanel/unit_tests/test_source.py | 14 +++++++++++++- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index 6ff593935e9dc..ff9890f8a39a8 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -85,21 +85,25 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> # https://github.com/airbytehq/airbyte/pull/27252#discussion_r1228356872 # temporary solution, testing access for all streams to avoid 402 error streams = [Annotations, Cohorts, Engage, Export, Revenue] + connected = False + reason = None for stream_class in streams: try: stream = stream_class(authenticator=auth, **config) - stream.reqs_per_hour_limit = 0 next(read_full_refresh(stream), None) + connected = True break except requests.HTTPError as e: + reason = e.response.json()["error"] if e.response.status_code == 402: logger.info(f"Stream {stream_class.__name__}: {e.response.json()['error']}") else: - return False, e.response.json()["error"] + return connected, reason except Exception as e: - return False, e + return connected, e - return True, None + reason = None if connected else reason + return connected, reason @adapt_streams_if_testing def streams(self, config: Mapping[str, Any]) -> List[Stream]: diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py index 18beba82e6d79..e238294420507 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py @@ -8,7 +8,7 @@ from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import AirbyteConnectionStatus, Status from source_mixpanel.source import SourceMixpanel, TokenAuthenticatorBase64 -from source_mixpanel.streams import Annotations, Cohorts +from source_mixpanel.streams import Annotations, Cohorts, Engage, Export, Revenue from .utils import command_check, get_url_to_mock, setup_response @@ -38,6 +38,18 @@ def test_check_connection(requests_mock, check_connection_url, config_raw, respo assert error == expected_error +def test_check_connection_all_streams_402_error(requests_mock, check_connection_url, config_raw, config): + auth = TokenAuthenticatorBase64(token=config["api_secret"]) + requests_mock.register_uri("GET", get_url_to_mock(Cohorts(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"})) + requests_mock.register_uri("GET", get_url_to_mock(Annotations(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"})) + requests_mock.register_uri("POST", get_url_to_mock(Engage(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"})) + requests_mock.register_uri("GET", get_url_to_mock(Export(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"})) + requests_mock.register_uri("GET", get_url_to_mock(Revenue(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"})) + + ok, error = SourceMixpanel().check_connection(logger, config_raw) + assert ok is False and error == "Payment required" + + def test_check_connection_402_error_on_first_stream(requests_mock, check_connection_url, config, config_raw): auth = TokenAuthenticatorBase64(token=config["api_secret"]) requests_mock.register_uri("GET", get_url_to_mock(Cohorts(authenticator=auth, **config)), setup_response(200, {}))