Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Mixpanel: skip stream when credentials are expired #30090

Merged
merged 9 commits into from
Sep 26, 2023
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.39
LABEL io.airbyte.version=0.1.40
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,9 @@
"source_defined_primary_key": [["funnel_id"], ["date"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "export",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["time"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
"destination_sync_mode": "append",
"cursor_field": ["date"],
"primary_key": [["funnel_id"], ["date"]]
},
{
"stream": {
Expand All @@ -33,7 +24,9 @@
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
"destination_sync_mode": "append",
"cursor_field": ["created"],
"primary_key": [["id"]]
},
{
"stream": {
Expand All @@ -46,7 +39,22 @@
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["last_seen"]
"cursor_field": ["last_seen"],
"primary_key": [["distinct_id"]]
},
{
"stream": {
"name": "revenue",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["date"],
"source_defined_primary_key": [["date"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["date"],
"primary_key": [["date"]]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 0.1.39
dockerImageTag: 0.1.40
dockerRepository: airbyte/source-mixpanel
githubIssueLabel: source-mixpanel
icon: mixpanel.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

import pendulum
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from airbyte_cdk.utils import AirbyteTracedException
from pendulum import Date


Expand Down Expand Up @@ -61,7 +63,6 @@ def __init__(
self.project_id = project_id
self.retries = 0
self._reqs_per_hour_limit = reqs_per_hour_limit

super().__init__(authenticator=authenticator)

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
Expand Down Expand Up @@ -91,7 +92,6 @@ def parse_response(
stream_state: Mapping[str, Any],
**kwargs,
) -> Iterable[Mapping]:

# parse the whole response
yield from self.process_response(response, stream_state=stream_state, **kwargs)

Expand Down Expand Up @@ -119,6 +119,12 @@ def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 402:
self.logger.warning(f"Unable to perform a request. Payment Required: {response.json()['error']}")
return False
if response.status_code == 400 and "Unable to authenticate request" in response.text:
message = (
f"Your credentials might have expired. Please update your config with valid credentials."
f" See more details: {response.text}"
)
raise AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
return super().should_retry(response)

def get_stream_params(self) -> Mapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.utils import AirbyteTracedException
from source_mixpanel.streams import (
Annotations,
CohortMembers,
Expand Down Expand Up @@ -493,3 +494,20 @@ def test_should_retry_payment_required(http_status_code, should_retry, log_messa
stream = stream_class(authenticator=MagicMock(), **config)
assert stream.should_retry(response_mock) == should_retry
assert log_message in caplog.text


def test_raise_config_error_on_creds_expiration(config, caplog, requests_mock):
streams = []
for cls in [Annotations, CohortMembers, Cohorts, Engage, EngageSchema, Export, ExportSchema, Funnels, FunnelsList, Revenue]:
stream = cls(authenticator=MagicMock(), **config)
requests_mock.register_uri(stream.http_method, get_url_to_mock(stream), status_code=400, text="Unable to authenticate request")
streams.append(stream)

for stream in streams:
records = []
with pytest.raises(AirbyteTracedException) as e:
for slice_ in stream.stream_slices(sync_mode="full_refresh"):
records.extend(stream.read_records("full_refresh", stream_slice=slice_))
assert records == []
assert str(e.value) == "Your credentials might have expired. Please update your config with valid credentials. " \
"See more details: Unable to authenticate request"
1 change: 1 addition & 0 deletions docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------|
| 0.1.40 | 2022-09-20 | [30090](https://github.com/airbytehq/airbyte/pull/30090) | Handle 400 error when the credentials become expired |
| 0.1.39 | 2023-09-15 | [30469](https://github.com/airbytehq/airbyte/pull/30469) | Add default primary key `distinct_id` to `Export` stream |
| 0.1.38 | 2023-08-31 | [30028](https://github.com/airbytehq/airbyte/pull/30028) | Handle gracefully project timezone mismatch |
| 0.1.37 | 2023-07-20 | [27932](https://github.com/airbytehq/airbyte/pull/27932) | Fix spec: change start/end date format to `date` |
Expand Down