-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source hubspot: add incremental streams #24711
Source hubspot: add incremental streams #24711
Conversation
/test connector=connectors/source-hubspot
Build FailedTest summary info:
|
/test connector=connectors/source-hubspot
Build FailedTest summary info:
|
record_value = ( | ||
pendulum.parse(record.get(self.cursor_field)).int_timestamp | ||
if isinstance(record.get(self.cursor_field), str) | ||
else record.get(self.cursor_field) // 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 1000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored. date-fields is stored in milliseconds from now.
/test connector=connectors/source-hubspot
Build FailedTest summary info:
|
/test connector=connectors/source-hubspot
Build FailedTest summary info:
|
…9265 # Conflicts: # airbyte-integrations/connectors/source-hubspot/Dockerfile # airbyte-integrations/connectors/source-hubspot/unit_tests/test_streams.py # docs/integrations/sources/hubspot.md
/test connector=connectors/source-hubspot
Build FailedTest summary info:
|
/test connector=connectors/source-hubspot
Build PassedTest summary info:
|
…d abstract method
/test connector=connectors/source-hubspot
Build FailedTest summary info:
|
], | ||
) | ||
def test_streams_read(stream, endpoint, requests_mock, common_params, fake_properties_list): | ||
def test_streams_read(stream, endpoint, cursor_value, requests_mock, common_params, fake_properties_list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this test that the filtering works properly? at the SemiIncrementalStream
level? I think no, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the testfix for full refresh, as cursor value is not the same for all streams
@@ -664,6 +666,57 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta | |||
yield record | |||
|
|||
|
|||
class SemiIncrementalStream(Stream, IncrementalMixin): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a better name for this pattern is ClientSideIncrementalStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
docs/integrations/sources/hubspot.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a note to this doc that there are two kinds of incremental sync: server-side incremental sync (i.e: where the hubspot API returns only the data updated or generated since the last sync) and client-side incremental sync (where the Hubspot API returns all the data / doesn't allow filtering and the connector filters only the records which were updated) and update the Supported Streams
list to incidcate which sync types they support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs updated
record_value, datetime_format = ( | ||
(pendulum.parse(record.get(self.cursor_field)), "YYYY-MM-DDTHH:mm:ss.SSSSSSZ") | ||
if isinstance(record.get(self.cursor_field), str) | ||
else (pendulum.from_format(str(record.get(self.cursor_field)), "x"), "x") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a smaller comment but fwiw this should be done using inversion of control i.e: the super class should not know what the subclasses do. Instead the subclasses should have a way to provide the datetime object given their knowledge of their own date format.
/test connector=connectors/source-hubspot
Build PassedTest summary info:
|
@sherifnada , agreed with inversion of control principle. I refactored and added |
/test connector=connectors/source-hubspot
Build PassedTest summary info:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing from the connector ops side:
- Changes to acceptance-test-config.yml and sample files used in it
- Version bumping, semver, changelog
airbyte-integrations/connectors/source-hubspot/sample_files/incremental_catalog.json
Show resolved
Hide resolved
"stream": { | ||
"name": "form_submissions", | ||
"json_schema": {}, | ||
"supported_sync_modes": ["full_refresh"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not include this stream in the incremental catalog example/acceptance test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this stream is empty, while for incremental tests, all streams should produce at least 1 record
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we don't have empty_streams
for incremental tests, got it. Thanks for the explanation!
@@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot | |||
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" | |||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] | |||
|
|||
LABEL io.airbyte.version=0.4.0 | |||
LABEL io.airbyte.version=0.5.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backwards compatible new features: minor release 👌🏻
/publish connector=connectors/source-hubspot
if you have connectors that successfully published but failed definition generation, follow step 4 here |
…remental-19265' into artem1205/source-hubspot-add-incremental-19265
…9265 # Conflicts: # connectors.md
…9265 # Conflicts: # airbyte-config/init/src/main/resources/seed/source_definitions.yaml # airbyte-config/init/src/main/resources/seed/source_specs.yaml
What
Resolving #19265
How
add cursors with filtering by date
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
no breaking changes
Pre-merge Checklist
Updating a connector
Community member or Airbyter
Grant edit access to maintainers (instructions)
Secrets in the connector's spec are annotated with
airbyte_secret
Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.Code reviews completed
Connector version has been incremented
Dockerfile
has updated versionDocumentation updated
README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
with an entry for the new version. See changelog examplePR name follows PR naming conventions
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described here