Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Mixpanel: reimplement backoff strategy #27752

Merged
Merged
Show file tree
Hide file tree
Changes from 77 commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
92b7e1f
Connector health: source hubspot, gitlab, snapchat-marketing: fix builds
davydov-d May 10, 2023
01ee45e
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 10, 2023
4ae17ec
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 12, 2023
3fadfe3
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 12, 2023
a51c998
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 15, 2023
afd1fae
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 16, 2023
72efed0
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 17, 2023
db47568
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 17, 2023
7880cf0
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 19, 2023
d1d08dc
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 19, 2023
5e731f7
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 22, 2023
a8780f9
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 23, 2023
cac6c95
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 24, 2023
c772743
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 24, 2023
ebf235c
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 24, 2023
313db7d
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 25, 2023
779dfee
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 26, 2023
92995de
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 26, 2023
986f29e
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 29, 2023
2705474
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 30, 2023
265a5b4
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 30, 2023
2e9ed46
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d May 31, 2023
6711afb
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 1, 2023
79fe56b
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 1, 2023
e7b4177
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 1, 2023
a8968c4
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 2, 2023
9f0b93e
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 6, 2023
1eb1cb2
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 8, 2023
3d6b33b
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 8, 2023
4532c19
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 8, 2023
f3b6a62
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 9, 2023
a51e219
Add stream ShippingRates
btkcodedev Jun 9, 2023
b80f81e
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 12, 2023
dfa3579
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 12, 2023
7686f0c
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 13, 2023
2f4783a
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 14, 2023
6537212
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 15, 2023
b491aed
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 15, 2023
9d8e6ba
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 15, 2023
d10a4cb
Merge branch 'master' into stripeShippingrates
btkcodedev Jun 15, 2023
504cbb6
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 16, 2023
c4d1fd2
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 16, 2023
54923f8
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 16, 2023
5e9374b
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 16, 2023
b7ffd11
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 19, 2023
77d5b1a
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 19, 2023
9ece7e9
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 20, 2023
e30d936
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 21, 2023
bb01a74
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 21, 2023
7d2d344
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 21, 2023
92b6103
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 22, 2023
64c38fc
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 23, 2023
3fc09f4
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 23, 2023
157b7c9
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 26, 2023
7c4973c
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 26, 2023
5a06aa3
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 27, 2023
62198ed
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 27, 2023
68d330e
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 27, 2023
9466e52
#2363 source mixpanel: reimplement backoff strategy
davydov-d Jun 27, 2023
b13a78b
#2363 source mixpanel: upd changelog
davydov-d Jun 27, 2023
a38520c
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 27, 2023
823943c
#2363 source mixpanel: revert v0.1.32
davydov-d Jun 27, 2023
0915ea2
Merge branch 'master' into ddavydov/#2363-source-mixpanel-reimplement…
davydov-d Jun 29, 2023
dfaf3ab
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 29, 2023
03902f5
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 29, 2023
b987bcb
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 30, 2023
ac8ceac
Merge branch 'master' into ddavydov/#2363-source-mixpanel-reimplement…
davydov-d Jun 30, 2023
e58fcd0
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jun 30, 2023
347659d
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 3, 2023
77ea8c6
Merge branch 'master' into ddavydov/#2363-source-mixpanel-reimplement…
davydov-d Jul 3, 2023
07e2c1e
rework rate limits
davydov-d Jul 4, 2023
c7201d1
Merge branch 'ddavydov/#2363-source-mixpanel-reimplement-backoff-stra…
davydov-d Jul 4, 2023
75110cb
Merge branch 'master' into ddavydov/#2363-source-mixpanel-reimplement…
davydov-d Jul 4, 2023
88ca8f3
wait only when running CAT + increase timeouts
davydov-d Jul 4, 2023
59f7c31
#2363 return backoff time
davydov-d Jul 4, 2023
1b69ecc
Merge branch 'master' into ddavydov/#2363-source-mixpanel-reimplement…
alafanechere Jul 4, 2023
8087271
code format
davydov-d Jul 5, 2023
f3b97ca
Merge branch 'ddavydov/#2363-source-mixpanel-reimplement-backoff-stra…
davydov-d Jul 5, 2023
237ea88
use env variable instead of a hidden field in the config
davydov-d Jul 5, 2023
87bb12a
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 5, 2023
a3eff85
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 6, 2023
f9fa17d
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 7, 2023
105774a
Merge branch 'master' into ddavydov/#2363-source-mixpanel-reimplement…
davydov-d Jul 7, 2023
8833b19
review comments
davydov-d Jul 7, 2023
c7add9a
review fixes
davydov-d Jul 7, 2023
519f6eb
fix reading env var
davydov-d Jul 7, 2023
6ade362
Merge branch 'master' into stripeShippingrates
btkcodedev Jul 9, 2023
d55a3cb
Resolve conflicts, Bump version
btkcodedev Jul 9, 2023
f42a197
Merge branch 'stripeShippingrates' of https://github.com/btkcodedev/a…
davydov-d Jul 10, 2023
652571f
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 10, 2023
0da45de
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 11, 2023
ae579cc
Attempt to bump version to avoid caching
maxi297 Jul 11, 2023
8f173a6
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 12, 2023
55cdd0e
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 13, 2023
d56c5d9
Merge branch 'master' of github.com:airbytehq/airbyte
davydov-d Jul 14, 2023
4a85d7b
Merge branch 'master' into ddavydov/#2363-source-mixpanel-reimplement…
davydov-d Jul 14, 2023
d133001
split test config into to projects
davydov-d Jul 14, 2023
99c2bd6
Merge branch 'ddavydov/#2363-source-mixpanel-reimplement-backoff-stra…
davydov-d Jul 14, 2023
4f619c4
rollback version
davydov-d Jul 14, 2023
e40e7e4
undo non related changes
davydov-d Jul 14, 2023
9db16d7
remove non related changes
davydov-d Jul 14, 2023
91dd700
revert new line
davydov-d Jul 14, 2023
6438b65
fix funnel slice patching
davydov-d Jul 14, 2023
2835add
fix funnel slice patching
davydov-d Jul 14, 2023
443781e
do not use stream state for generating request params!
davydov-d Jul 17, 2023
c638de3
fix updating state
davydov-d Jul 17, 2023
2f842a2
Revert "do not use stream state for generating request params!"
davydov-d Jul 18, 2023
468f377
revert prev commits
davydov-d Jul 18, 2023
5c32696
fix timestamp filtering
davydov-d Jul 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ FROM python:3.9-slim
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

WORKDIR /airbyte/integration_code
COPY source_mixpanel ./source_mixpanel
COPY main.py ./
COPY setup.py ./
RUN pip install .
COPY source_mixpanel ./source_mixpanel
COPY main.py ./

ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.35
LABEL io.airbyte.version=0.1.36
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-mixpanel:dev
custom_environment_variables:
SLEEP_BETWEEN_CHECK_AND_DISCOVER_REQUESTS: true
ALIGN_DATE_RANGE_TO_LAST_N_DAYS: true
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
test_strictness_level: "high"
acceptance_tests:
spec:
Expand All @@ -10,16 +13,20 @@ acceptance_tests:
tests:
- config_path: "secrets/config_old.json"
status: "succeed"
timeout_seconds: 75
- config_path: "secrets/config_project_secret.json"
status: "succeed"
timeout_seconds: 75
- config_path: "secrets/config.json"
status: "succeed"
timeout_seconds: 75
- config_path: "integration_tests/invalid_config.json"
status: "failed"
timeout_seconds: 75
discovery:
tests:
- config_path: "secrets/config_old.json"
timeout_seconds: 60
timeout_seconds: 135
basic_read:
tests:
- config_path: "secrets/config.json"
Expand Down
davydov-d marked this conversation as resolved.
Show resolved Hide resolved

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-13", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1684508037955}
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-10", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1} }, "emitted_at": 1684508037956}
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-09", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1684508037956}
{"stream": "engage", "data": {"distinct_id": "123@gmail.com", "email": "123@gmail.com", "name": "123", "123": "123456", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042343}
{"stream": "engage", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042345}
{"stream": "engage", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1684508042346}
{"stream": "cohorts", "data": {"id": 1478097, "project_id": 2529987, "name": "Cohort1", "description": "", "data_group_id": null, "count": 2, "is_visible": 1, "created": "2021-09-14 15:57:43"}, "emitted_at": 1684508052373}
{"stream": "cohort_members", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1684508059432}
{"stream": "cohort_members", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1684508059434}
{"stream": "revenue", "data": {"date": "2023-06-11", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063120}
{"stream": "revenue", "data": {"date": "2023-06-12", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063121}
{"stream": "revenue", "data": {"date": "2023-06-13", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1684508063121}
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-25", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1687889775303}
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-26", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1687889775303}
{"stream": "funnels", "data": {"funnel_id": 36152117, "name": "test", "date": "2023-06-27", "steps": [{"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "Purchase", "goal": "Purchase", "step_label": "Purchase", "overall_conv_ratio": 1, "step_conv_ratio": 1}, {"count": 0, "avg_time": null, "avg_time_from_start": null, "event": "$custom_event:1305068", "goal": "$custom_event:1305068", "step_label": "111", "custom_event": true, "custom_event_id": 1305068, "overall_conv_ratio": 0, "step_conv_ratio": 0}], "analysis": {"completion": 0, "starting_amount": 0, "steps": 2, "worst": 1}}, "emitted_at": 1687889775303}
{"stream": "engage", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1687889778985}
{"stream": "engage", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1687889778988}
{"stream": "engage", "data": {"distinct_id": "123@gmail.com", "email": "123@gmail.com", "name": "123", "123": "123456", "last_seen": "2023-01-01T00:00:00"}, "emitted_at": 1687889778988}
{"stream": "cohorts", "data": {"id": 1478097, "project_id": 2529987, "name": "Cohort1", "description": "", "data_group_id": null, "count": 2, "is_visible": 1, "created": "2021-09-14 15:57:43"}, "emitted_at": 1687889787689}
{"stream": "cohort_members", "data": {"distinct_id": "integration-test.db4415.mp-service-account", "name": "test", "test": "test", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1687889914154}
{"stream": "cohort_members", "data": {"distinct_id": "integration-test@airbyte.io", "name": "Integration Test1", "test": "test", "email": "integration-test@airbyte.io", "last_seen": "2023-01-01T00:00:00", "cohort_id": 1478097}, "emitted_at": 1687889914156}
{"stream": "revenue", "data": {"date": "2023-06-25", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1687889918052}
{"stream": "revenue", "data": {"date": "2023-06-26", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1687889918052}
{"stream": "revenue", "data": {"date": "2023-06-27", "amount": 0.0, "count": 3, "paid_count": 0}, "emitted_at": 1687889918052}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 0.1.35
dockerImageTag: 0.1.36
dockerRepository: airbyte/source-mixpanel
githubIssueLabel: source-mixpanel
icon: mixpanel.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#

import base64
import json
import logging
import os
from typing import Any, List, Mapping, Tuple

import pendulum
Expand Down Expand Up @@ -82,28 +84,27 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
except Exception as e:
return False, e

# https://github.com/airbytehq/airbyte/pull/27252#discussion_r1228356872
# temporary solution, testing access for all streams to avoid 402 error
streams = [Annotations, Cohorts, Engage, Export, Revenue]
connected = False
reason = None
for stream_class in streams:
# https://github.com/airbytehq/airbyte/pull/27252
# https://github.com/airbytehq/oncall/issues/2363
# On one hand there's a number of APIs that is limited by the account plan, so we should probably try to connect to each stream.
# On the other hand, we have a timeout for this operation and connecting to each stream may take up to one minute.
# That's why we're validating connectivity by only reading from the stream we definitely know is available independent of a plan.

try:
stream_kwargs = {"authenticator": auth, **config}
if not os.environ.get("SLEEP_BETWEEN_CHECK_AND_DISCOVER_REQUESTS", False):
# We preserve sleeping between requests in case this is a running acceptance test.
# Otherwise, we do not want to wait to not time out
stream_kwargs["reqs_per_hour_limit"] = 0
stream = Export(**stream_kwargs)
next(read_full_refresh(stream), None)
except Exception as e:
try:
stream = stream_class(authenticator=auth, **config)
next(read_full_refresh(stream), None)
connected = True
break
except requests.HTTPError as e:
reason = e.response.json()["error"]
if e.response.status_code == 402:
logger.info(f"Stream {stream_class.__name__}: {e.response.json()['error']}")
else:
return connected, reason
except Exception as e:
return connected, e

reason = None if connected else reason
return connected, reason
return False, reason
except json.decoder.JSONDecodeError:
return False, e
return True, None

@adapt_streams_if_testing
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Expand All @@ -115,24 +116,40 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
logger.info(f"Using start_date: {config['start_date']}, end_date: {config['end_date']}")

auth = self.get_authenticator(config)
streams = []
for stream in [
Annotations(authenticator=auth, **config),
Cohorts(authenticator=auth, **config),
Funnels(authenticator=auth, **config),
Revenue(authenticator=auth, **config),
CohortMembers(authenticator=auth, **config),
Engage(authenticator=auth, **config),
streams = [
Export(authenticator=auth, **config),
]:
]

# we only make calls to one stream of each API type to save time, based on the assumption that
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
# if one stream of the API type is available, all others are available as well and vice versa

streams_by_api_types = [
[
Annotations,
],
[Engage, Cohorts, CohortMembers, Funnels, Revenue],
]
stream_kwargs = {"authenticator": auth, **config}
if not os.environ.get("SLEEP_BETWEEN_CHECK_AND_DISCOVER_REQUESTS", False):
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
# set reqs_per_hour_limit = 0 to save time for discovery
# We preserve sleeping between requests in case this is a running acceptance test.
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
# Otherwise, we do not want to wait to not time out
stream_kwargs["reqs_per_hour_limit"] = 0
for stream_set in streams_by_api_types:
current_stream_set = [stream(**stream_kwargs) for stream in stream_set]
test_stream, *_ = current_stream_set
try:
next(read_full_refresh(stream), None)
stream.get_json_schema()
next(read_full_refresh(test_stream), None)
except requests.HTTPError as e:
if e.response.status_code != 402:
raise e
logger.warning("Stream '%s' - is disabled, reason: 402 Payment Required", stream.name)
logger.warning(
f"Streams {', '.join([stream.name for stream in current_stream_set])} are disabled, reason: 402 Payment Required"
)
else:
streams.append(stream)
streams.extend(current_stream_set)

for stream in streams:
# roll back to default value
stream.reqs_per_hour_limit = stream.DEFAULT_REQS_PER_HOUR_LIMIT
return streams
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import time
from abc import ABC
from datetime import timedelta
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
Expand All @@ -20,14 +21,21 @@ class MixpanelStream(HttpStream, ABC):
60 queries per hour.
"""

DEFAULT_REQS_PER_HOUR_LIMIT = 60

@property
def url_base(self):
prefix = "eu." if self.region == "EU" else ""
return f"https://{prefix}mixpanel.com/api/2.0/"

# https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits
reqs_per_hour_limit: int = 60 # 1 query per minute
retries: int = 0
@property
def reqs_per_hour_limit(self):
# https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits
return self._reqs_per_hour_limit

@reqs_per_hour_limit.setter
def reqs_per_hour_limit(self, value):
self._reqs_per_hour_limit = value

def __init__(
self,
Expand All @@ -40,6 +48,7 @@ def __init__(
attribution_window: int = 0, # in days
select_properties_by_default: bool = True,
project_id: int = None,
reqs_per_hour_limit: int = DEFAULT_REQS_PER_HOUR_LIMIT,
**kwargs,
):
self.start_date = start_date
Expand All @@ -50,6 +59,8 @@ def __init__(
self.region = region
self.project_timezone = project_timezone
self.project_id = project_id
self.retries = 0
self._reqs_per_hour_limit = reqs_per_hour_limit

super().__init__(authenticator=authenticator)

Expand All @@ -62,15 +73,6 @@ def request_headers(
) -> Mapping[str, Any]:
return {"Accept": "application/json"}

def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
try:
return super()._send_request(request, request_kwargs)
except requests.exceptions.HTTPError as e:
error_message = e.response.text
if error_message:
self.logger.error(f"Stream {self.name}: {e.response.status_code} {e.response.reason} - {error_message}")
raise e

def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json()
if self.data_field is not None:
Expand All @@ -93,6 +95,12 @@ def parse_response(
# parse the whole response
yield from self.process_response(response, stream_state=stream_state, **kwargs)

if self.reqs_per_hour_limit > 0:
# we skip this block, if self.reqs_per_hour_limit = 0,
# in all other cases wait for X seconds to match API limitations
self.logger.info("Sleep for %s seconds to match API limitations", 3600 / self.reqs_per_hour_limit)
time.sleep(3600 / self.reqs_per_hour_limit)

def backoff_time(self, response: requests.Response) -> float:
"""
Some API endpoints do not return "Retry-After" header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@ def stream_slices(
if sync_mode == SyncMode.incremental:
self.set_cursor(cursor_field)

stream_slices = []
# full refresh is needed because even though some cohorts might already have been read
# they can still have new members added
cohorts = Cohorts(**self.get_stream_params()).read_records(SyncMode.full_refresh)
for cohort in cohorts:
stream_slices.append({"id": cohort["id"]})

return stream_slices
yield {"id": cohort["id"]}

def process_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
records = super().process_response(response, **kwargs)
Expand Down
Loading
Loading