Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Mixpanel: add handler for 402 error #27252

Merged
merged 12 commits into from
Jun 15, 2023
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.34
LABEL io.airbyte.version=0.1.35
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
{"stream": "funnels", "data": {"funnel_id": 8901755, "name": "Onboarding funnel", "date": "2023-06-02", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Viewed Home Page", "goal": "Viewed Home Page", "step_label": "Viewed Home Page", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Signed Up", "goal": "Signed Up", "step_label": "Signed Up", "overall_conv_ratio": 0, "step_conv_ratio": 0}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Onboarding - Action Performed", "goal": "Onboarding - Action Performed", "step_label": "Onboarding - Action Performed", "selector": "(string(properties[\"action\"], \"undefined\") == \"Set a name for your workspace\")", "selector_params": {"step_label": "Onboarding - Action Performed", "bool_op": "and", "property_filter_params_list": [{"filter": {"operator": "==", "operand": ["Set a name for your workspace"]}, "property": {"name": "action", "source": "properties", "type": "string"}, "selected_property_type": "string", "type": "string"}]}, "overall_conv_ratio": 0, "step_conv_ratio": 0}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Onboarding - Action Performed", "goal": "Onboarding - Action Performed", "step_label": "Onboarding - Action Performed", "selector": "(string(properties[\"action\"], \"undefined\") == \"Invite your team to your workspace\")", "selector_params": {"step_label": "Onboarding - Action Performed", "bool_op": "and", "property_filter_params_list": [{"filter": {"operator": "==", "operand": ["Invite your team to your workspace"]}, "property": {"name": "action", "source": "properties", "type": "string"}, "selected_property_type": "string", "type": "string"}]}, "overall_conv_ratio": 0, "step_conv_ratio": 0}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Onboarding - Action Performed", "goal": "Onboarding - Action Performed", "step_label": "Onboarding - Action Performed", "selector": "(string(properties[\"action\"], \"undefined\") == \"Enter the website you want to unblock\")", "selector_params": {"step_label": "Onboarding - Action Performed", "bool_op": "and", "property_filter_params_list": [{"filter": {"operator": "==", "operand": ["Enter the website you want to unblock"]}, "property": {"name": "action", "source": "properties", "type": "string"}, "selected_property_type": "string", "type": "string"}]}, "overall_conv_ratio": 0, "step_conv_ratio": 0}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Onboarding - Action Performed", "goal": "Onboarding - Action Performed", "step_label": "Onboarding - Action Performed", "selector": "(string(properties[\"action\"], \"undefined\") == \"Install Dataline on your website\")", "selector_params": {"step_label": "Onboarding - Action Performed", "bool_op": "and", "property_filter_params_list": [{"filter": {"operator": "==", "operand": ["Install Dataline on your website"]}, "property": {"name": "action", "source": "properties", "type": "string"}, "selected_property_type": "string", "type": "string"}]}, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 6, "worst": 1}}, "emitted_at": 1686595087241}
{"stream": "engage", "data": {"distinct_id": "22885b19-781a-44cd-a8d9-ce46970a3fd6", "browser": "Chrome", "browser_version": "81.0.4044.138", "city": "San Francisco", "country_code": "US", "email": "john+test5@dataline.io", "first_name": "John", "last_name": "Laflur", "name": "John Laflur", "region": "California", "timezone": "America/Los_Angeles", "id": "22885b19-781a-44cd-a8d9-ce46970a3fd6", "last_seen": "2020-05-19T17:53:14"}, "emitted_at": 1686595088617}
{"stream": "cohorts", "data": {"id": 1343181, "project_id": 2117889, "name": "Users in California", "description": "Users in California description", "data_group_id": null, "count": 45, "is_visible": 1, "created": "2021-07-01 22:02:05"}, "emitted_at": 1686595090896}
{"stream": "cohort_members", "data": {"distinct_id": "44b3e80b-894c-480e-9cc0-20cb27784e48", "browser": "Chrome", "browser_version": "85.0.4183.121", "city": "Laguna Niguel", "country_code": "US", "email": "alec@brev.dev", "first_name": "Alec", "last_name": "Fong", "name": "Alec Fong", "region": "California", "timezone": "America/Los_Angeles", "id": "0e2a7bf1-47c5-4c98-b4eb-52859d98adc7", "unblocked": "true", "last_seen": "2020-10-13T22:03:01", "cohort_id": 1343181}, "emitted_at": 1686595093527}
{"stream": "revenue", "data": {"date": "2023-06-02", "amount": 0.0, "count": 121, "paid_count": 0}, "emitted_at": 1686595094576}
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-13", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1684508037955}
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-10", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1} }, "emitted_at": 1684508037956}
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-09", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1684508037956}
{"stream": "engage", "data": {"distinct_id": "123@gmail.com", "email": "123@gmail.com", "name": "123", "123": "123456", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042343}
{"stream": "engage", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042345}
{"stream": "engage", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042346}
{"stream": "cohorts", "data": {"id": 1478097, "project_id": 2529987, "name": "Cohort1", "description": "", "data_group_id": null, "count": 2, "is_visible": 1, "created": "2021-09-14 15:57:43"}, "emitted_at": 1684508052373}
{"stream": "cohort_members", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1684508059432}
{"stream": "cohort_members", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1684508059434}
{"stream": "revenue", "data": {"date": "2023-06-11", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063120}
{"stream": "revenue", "data": {"date": "2023-06-12", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063121}
{"stream": "revenue", "data": {"date": "2023-06-13", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063121}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 0.1.34
dockerImageTag: 0.1.35
dockerRepository: airbyte/source-mixpanel
githubIssueLabel: source-mixpanel
icon: mixpanel.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,6 @@
"URL": {
"type": ["null", "string"]
},
"insert_id": {
"type": ["null", "string"]
},
"mp_api_timestamp_ms": {
"type": ["null", "string"]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import BasicHttpAuthenticator, TokenAuthenticator

from .streams import Annotations, CohortMembers, Cohorts, Engage, Export, Funnels, FunnelsList, Revenue
from .streams import Annotations, CohortMembers, Cohorts, Engage, Export, Funnels, Revenue
from .testing import adapt_streams_if_testing, adapt_validate_if_testing
from .utils import read_full_refresh

Expand Down Expand Up @@ -79,16 +79,31 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
try:
config = self._validate_and_transform(config)
auth = self.get_authenticator(config)
FunnelsList.max_retries = 0
funnels = FunnelsList(authenticator=auth, **config)
funnels.reqs_per_hour_limit = 0
next(read_full_refresh(funnels), None)
except requests.HTTPError as e:
return False, e.response.json()["error"]
except Exception as e:
return False, e

return True, None
# https://github.com/airbytehq/airbyte/pull/27252#discussion_r1228356872
# temporary solution, testing access for all streams to avoid 402 error
streams = [Annotations, Cohorts, Engage, Export, Revenue]
connected = False
reason = None
for stream_class in streams:
try:
stream = stream_class(authenticator=auth, **config)
next(read_full_refresh(stream), None)
connected = True
break
except requests.HTTPError as e:
reason = e.response.json()["error"]
if e.response.status_code == 402:
logger.info(f"Stream {stream_class.__name__}: {e.response.json()['error']}")
else:
return connected, reason
except Exception as e:
return connected, e

reason = None if connected else reason
return connected, reason

@adapt_streams_if_testing
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Expand All @@ -100,20 +115,18 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
logger.info(f"Using start_date: {config['start_date']}, end_date: {config['end_date']}")

auth = self.get_authenticator(config)
streams = [
streams = []
for stream in [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also try try reading from the streams with dynamically generated schema because they do not get filtered out with the current solution.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Annotations(authenticator=auth, **config),
Cohorts(authenticator=auth, **config),
Funnels(authenticator=auth, **config),
Revenue(authenticator=auth, **config),
]

# streams with dynamically generated schema
for stream in [
CohortMembers(authenticator=auth, **config),
Engage(authenticator=auth, **config),
Export(authenticator=auth, **config),
]:
try:
next(read_full_refresh(stream), None)
stream.get_json_schema()
except requests.HTTPError as e:
if e.response.status_code != 402:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ def backoff_time(self, response: requests.Response) -> float:
self.retries += 1
return 2**self.retries * 60

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 402:
self.logger.warning(f"Unable to perform a request. Payment Required: {response.json()['error']}")
return False
return super().should_retry(response)

def get_stream_params(self) -> Mapping[str, Any]:
"""
Fetch required parameters in a given stream. Used to create sub-streams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import AirbyteConnectionStatus, Status
from source_mixpanel.source import SourceMixpanel, TokenAuthenticatorBase64
from source_mixpanel.streams import FunnelsList
from source_mixpanel.streams import Annotations, Cohorts, Engage, Export, Revenue

from .utils import command_check, get_url_to_mock, setup_response

Expand All @@ -18,16 +18,15 @@
@pytest.fixture
def check_connection_url(config):
auth = TokenAuthenticatorBase64(token=config["api_secret"])
funnel_list = FunnelsList(authenticator=auth, **config)
return get_url_to_mock(funnel_list)
annotations = Annotations(authenticator=auth, **config)
return get_url_to_mock(annotations)


@pytest.mark.parametrize(
"response_code,expect_success,response_json",
[
(200, True, {}),
(400, False, {"error": "Request error"}),
(500, False, {"error": "Server error"}),
(400, False, {"error": "Request error"})
],
)
def test_check_connection(requests_mock, check_connection_url, config_raw, response_code, expect_success, response_json):
Expand All @@ -39,6 +38,28 @@ def test_check_connection(requests_mock, check_connection_url, config_raw, respo
assert error == expected_error


def test_check_connection_all_streams_402_error(requests_mock, check_connection_url, config_raw, config):
auth = TokenAuthenticatorBase64(token=config["api_secret"])
requests_mock.register_uri("GET", get_url_to_mock(Cohorts(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}))
requests_mock.register_uri("GET", get_url_to_mock(Annotations(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}))
requests_mock.register_uri("POST", get_url_to_mock(Engage(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}))
requests_mock.register_uri("GET", get_url_to_mock(Export(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}))
requests_mock.register_uri("GET", get_url_to_mock(Revenue(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}))

ok, error = SourceMixpanel().check_connection(logger, config_raw)
assert ok is False and error == "Payment required"


def test_check_connection_402_error_on_first_stream(requests_mock, check_connection_url, config, config_raw):
auth = TokenAuthenticatorBase64(token=config["api_secret"])
requests_mock.register_uri("GET", get_url_to_mock(Cohorts(authenticator=auth, **config)), setup_response(200, {}))
requests_mock.register_uri("GET", get_url_to_mock(Annotations(authenticator=auth, **config)), setup_response(402, {"error": "Payment required"}))

ok, error = SourceMixpanel().check_connection(logger, config_raw)
assert ok is True
assert error is None


def test_check_connection_bad_config():
config = {}
source = SourceMixpanel()
Expand All @@ -52,24 +73,55 @@ def test_check_connection_incomplete(config_raw):


def test_streams(requests_mock, config_raw):
requests_mock.register_uri("POST", "https://mixpanel.com/api/2.0/engage?page_size=1000", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {"id": 123}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels", setup_response(200, {}))
requests_mock.register_uri(
"GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(200, {"funnel_id": 123, "name": "name"})
)
requests_mock.register_uri(
"GET", "https://data.mixpanel.com/api/2.0/export",
setup_response(200, {"event": "some event", "properties": {"event": 124, "time": 124124}})
)

streams = SourceMixpanel().streams(config_raw)
assert len(streams) == 7


def test_streams_string_date(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(200, {"id": 123}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {}))
requests_mock.register_uri("POST", "https://mixpanel.com/api/2.0/engage", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, {"error": "Payment required"}))
requests_mock.register_uri(
"GET", "https://data.mixpanel.com/api/2.0/export",
setup_response(200, {"event": "some event", "properties": {"event": 124, "time": 124124}})
)
config = copy.deepcopy(config_raw)
config["start_date"] = "2020-01-01"
config["end_date"] = "2020-01-02"
streams = SourceMixpanel().streams(config)
assert len(streams) == 7
assert len(streams) == 6
girarda marked this conversation as resolved.
Show resolved Hide resolved


def test_streams_disabled_402(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(402, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(402, {}))
json_response = {"error": "Your plan does not allow API calls. Upgrade at mixpanel.com/pricing"}
requests_mock.register_uri("POST", "https://mixpanel.com/api/2.0/engage?page_size=1000", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/annotations", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/cohorts/list", setup_response(402, json_response))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/revenue", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/funnels/list", setup_response(402, json_response))
requests_mock.register_uri("GET", "https://data.mixpanel.com/api/2.0/export?from_date=2017-01-20&to_date=2017-02-18", setup_response(402, json_response))
streams = SourceMixpanel().streams(config_raw)
assert {s.name for s in streams} == {"funnels", "revenue", "annotations", "cohorts"}
assert {s.name for s in streams} == {'annotations', 'engage', 'revenue'}
Loading
Loading