-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 New Source: Amplitude #3664
🎉 New Source: Amplitude #3664
Conversation
/test connector=source-amplitude
|
|
||
MAIN_REQUIREMENTS = [ | ||
"airbyte-cdk~=0.1", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were all requirements included here? E.g, there are pendulum and request importing in api.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These libraries are installed together with airbyte-cdk
# Unable to use 'state_path' because Amplitude returns an error when specifying a date in the future. | ||
# state_path: "integration_tests/abnormal_state.json" | ||
cursor_paths: | ||
events: [ "event_time" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two more incremental streams. Should cursor path be set for them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, it is not at all necessary to specify "cursor_paths
" here if it is specified in the directory, however, there is a bug and we must specify the cursor
for at least one stream for the test to work.
yield from respose_data.get(self.name, []) | ||
|
||
def path(self, **kwargs) -> str: | ||
return f"/{self.api_version}/{self.name}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, version should be a part of base url.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inconvenient because different versions are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return f"/{self.api_version}/{self.name}" | |
@property | |
def url_base(self) -> str: | |
return f"https://amplitude.com/api/{self.api_version}/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about this?
|
||
class Cohorts(AmplitudeStream): | ||
primary_key = "id" | ||
api_version = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why several versions are used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used documentation for Amplitude, and there, different versions are used for different streams:
- All Cohorts - version 3
- Active User Counts - version 2
- Export Events - version 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the stream Events
from the tests, because there is too much data on it. I tried to run it locally and the test only passes if the start_date
is 1 or 2 days ago (no more).
Otherwise, the test crashes due to timeout or insufficient memory (since it stores data for comparison).
/test connector=source-amplitude
|
LGTM only small change |
airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py
Show resolved
Hide resolved
airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py
Show resolved
Hide resolved
airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py
Show resolved
Hide resolved
/test connector=source-amplitude
|
…/new-amplitude-connector � Conflicts: � docs/integrations/connector-health.md � tools/bin/ci_credentials.sh
/test connector=source-amplitude
|
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! few questions
return params | ||
|
||
|
||
class Events(IncrementalAmplitudeStream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are events returned in ascending order of the cursor field? if so, why don't we checkpoint using checkpoint_interval? This would allow the connector to resume where it left off even if it failed halfway through the sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question for other streams BTW
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this would happen automatically if we override the stream_slices
method. Because you are doing time slicing here, it's a good fit for slicing and keeps track of state automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used state_checkpoint_interval
, it was equal to 100 for all Incremental streams, but now I slightly corrected, for all I set 10, and for Events I set 1000 (since there are quite a few entries)
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: | ||
response_data = response.json().get("data", []) | ||
if response_data: | ||
series = response_data["series"][0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we getting the first index in the series list? Can you add a comment explaining it? It's not obvious from the API docs there will be only one record I think https://developers.amplitude.com/docs/dashboard-rest-api#average-session-length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment
…/new-amplitude-connector � Conflicts: � airbyte-integrations/builds.md � docs/SUMMARY.md
/test connector=source-amplitude
|
…/new-amplitude-connector
/publish connector=connectors/source-amplitude
|
/test connector=source-amplitude
|
What
closes #1457
How
Describe the solution
Pre-merge Checklist
Recommended reading order
test.java
component.ts