Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚨🚨 Source Hubspot - fixed incremental for engagement stream #27161

Merged
merged 27 commits into from Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2f9f689
fixed incremental for engagement stream
midavadim Jun 8, 2023
3542f7f
added comments
midavadim Jun 8, 2023
85815f7
updated docs
midavadim Jun 8, 2023
4cb52b0
updated schema for email_events
midavadim Jun 8, 2023
857f2c8
updated expected records and increased major connector version
midavadim Jun 9, 2023
1db2e4c
added backward_compatibility_tests_config
midavadim Jun 9, 2023
80a9b49
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 9, 2023
3284504
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 12, 2023
cb48802
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 12, 2023
eeac803
added email_event record with value in browser.version field
midavadim Jun 20, 2023
a398b44
Merge remote-tracking branch 'origin/midavadim/26365-hubspot-fix-incr…
midavadim Jun 20, 2023
3b86d90
🤖 Auto format source-hubspot code [skip ci]
octavia-squidington-iii Jun 20, 2023
cb335c3
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 20, 2023
6ac4ffc
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 21, 2023
9c38682
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 22, 2023
b1c3ddf
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 22, 2023
798fed7
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 22, 2023
335c609
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 29, 2023
9338c20
Merge remote-tracking branch 'origin/midavadim/26365-hubspot-fix-incr…
midavadim Jun 29, 2023
b887d91
fixed expected records
midavadim Jun 29, 2023
7ab2e20
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 29, 2023
575e1f5
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 29, 2023
7b10190
updated version if docker file
midavadim Jun 29, 2023
072ada4
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 29, 2023
87f8431
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 29, 2023
8597d53
Merge branch 'master' into midavadim/26365-hubspot-fix-incremental-sync
midavadim Jun 29, 2023
3f21afa
fix expected_records.jsonl
midavadim Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.8.4
LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.name=airbyte/source-hubspot
Expand Up @@ -19,6 +19,8 @@ acceptance_tests:
discovery:
tests:
- config_path: secrets/config_oauth.json
backward_compatibility_tests_config:
disable_for_version: 0.8.4
basic_read:
tests:
- config_path: secrets/config_oauth.json
Expand Down
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

Large diffs are not rendered by default.

Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerImageTag: 0.8.4
dockerImageTag: 1.0.0
dockerRepository: airbyte/source-hubspot
githubIssueLabel: source-hubspot
icon: hubspot.svg
Expand Down
@@ -1,3 +1,6 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from .source import SourceHubspot

__all__ = ["SourceHubspot"]
Expand Up @@ -36,7 +36,10 @@
"type": ["null", "string"]
},
"version": {
"type": ["null", "string"]
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
}
}
}
},
Expand Down
Expand Up @@ -7,6 +7,7 @@
import sys
import time
from abc import ABC, abstractmethod
from datetime import timedelta
from functools import cached_property, lru_cache
from http import HTTPStatus
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union
Expand Down Expand Up @@ -1392,25 +1393,13 @@ class EmailEvents(IncrementalStream):
scopes = {"content"}


class Engagements(IncrementalStream):
"""Engagements, API v1
Docs: https://legacydocs.hubspot.com/docs/methods/engagements/get-all-engagements
https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
"""

url = "/engagements/v1/engagements/paged"
class EngagementsABC(Stream, ABC):
more_key = "hasMore"
updated_at_field = "lastUpdated"
created_at_field = "createdAt"
primary_key = "id"
scopes = {"crm.objects.companies.read", "crm.objects.contacts.read", "crm.objects.deals.read", "tickets", "e-commerce"}

@property
def url(self):
if self.state:
return "/engagements/v1/engagements/recent/modified"
return "/engagements/v1/engagements/paged"

def _transform(self, records: Iterable) -> Iterable:
yield from super()._transform({**record.pop("engagement"), **record} for record in records)

Expand All @@ -1423,57 +1412,150 @@ def request_params(
params = {"count": 250}
if next_page_token:
params["offset"] = next_page_token["offset"]
if self.state:
params.update({"since": int(self._state.timestamp() * 1000), "count": 100})
return params


class EngagementsAll(EngagementsABC):
"""All Engagements API:
https://legacydocs.hubspot.com/docs/methods/engagements/get-all-engagements

Note: Returns all engagements records ordered by 'createdAt' (not 'lastUpdated') field
"""

@property
def url(self):
return "/engagements/v1/engagements/paged"


class EngagementsRecentError(Exception):
pass


class EngagementsRecent(EngagementsABC):
"""Recent Engagements API:
https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements

Get the most recently created or updated engagements in a portal, sorted by when they were last updated,
with the most recently updated engagements first.

Important: This endpoint returns only last 10k most recently updated records in the last 30 days.
"""

total_records_limit = 10000
last_days_limit = 29

@property
def url(self):
return "/engagements/v1/engagements/recent/modified"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._start_date < pendulum.now() - timedelta(days=self.last_days_limit):
raise EngagementsRecentError(
'"Recent engagements" API returns records updated in the last 30 days only. '
f'Start date {self._start_date} is older so "All engagements" API should be used'
)

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
params.update(
{
"since": int(self._start_date.timestamp() * 1000),
"count": 100,
}
)
return params

def parse_response(
self,
response: requests.Response,
*,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
# Check if "Recent engagements" API is applicable for use
response_info = response.json()
if response_info:
total = response_info.get("total")
if total > self.total_records_limit:
yield from []
raise EngagementsRecentError(
'"Recent engagements" API returns only 10k most recently updated records. '
'API response indicates that there are more records so "All engagements" API should be used'
)
yield from super().parse_response(response, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)


class Engagements(EngagementsABC, IncrementalStream):
"""Engagements stream does not send requests directly, instead it uses:
- EngagementsRecent if start_date/state is less than 30 days and API is able to return all records (<10k), or
- EngagementsAll which extracts all records, but supports filter on connector side
"""

@property
def url(self):
return "/engagements/v1/engagements/paged"

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
self.set_sync(sync_mode)
return [None]

def process_records(self, records: Iterable[Mapping[str, Any]]) -> Iterable[Mapping[str, Any]]:
"""Process each record to find latest cursor value"""
for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
self.latest_cursor = max(cursor, self.latest_cursor) if self.latest_cursor else cursor
yield record

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False

next_page_token = None
latest_cursor = None

while not pagination_complete:
response = self.handle_request(stream_slice=stream_slice, stream_state=stream_state, next_page_token=next_page_token)
records = self._transform(self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice))

if self.filter_old_records:
records = self._filter_old_records(records)
self.latest_cursor = None

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record

next_page_token = self.next_page_token(response)
if self.state and next_page_token and next_page_token["offset"] >= 10000:
# As per Hubspot documentation, the recent engagements endpoint will only return the 10K
# most recently updated engagements. Since they are returned sorted by `lastUpdated` in
# descending order, we stop getting records if we have already reached 10,000. Attempting
# to get more than 10K will result in a HTTP 400 error.
# https://legacydocs.hubspot.com/docs/methods/engagements/get-recent-engagements
next_page_token = None

if not next_page_token:
pagination_complete = True
# The date we need records since
since_date = self._start_date
if stream_state:
since_date_timestamp = stream_state.get(self.updated_at_field)
if since_date_timestamp:
since_date = pendulum.from_timestamp(int(since_date_timestamp) / 1000)
Comment on lines +1529 to +1533
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More pythonic way)

since_date_timestamp = stream_state.get(self.updated_at_field) if stream_state else stream_state
since_date = pendulum.from_timestamp(int(since_date_timestamp) / 1000) if since_date_timestamp else self._start_date


stream_params = {
"api": self._api,
"start_date": since_date,
"credentials": self._credentials,
}

# Always return an empty generator just in case no records were ever yielded
yield from []
try:
# Try 'Recent' API first, since it is more efficient
records = EngagementsRecent(**stream_params).read_records(sync_mode.full_refresh, cursor_field)
yield from self.process_records(records)
except EngagementsRecentError as e:
# if 'Recent' API in not applicable and raises the error
# then use 'All' API which returns all records, which are filtered on connector side
self.logger.info(e)
records = EngagementsAll(**stream_params).read_records(sync_mode.full_refresh, cursor_field)
yield from self.process_records(records)

# State should be updated only once at the end of the sync
# because records are not ordered in ascending by 'lastUpdated' field
self._update_state(latest_cursor=self.latest_cursor, is_last_record=True)

self._update_state(latest_cursor=latest_cursor, is_last_record=True)
def _transform(self, records: Iterable) -> Iterable:
# transformation is not needed, because it was done in a substream
yield from records


class Forms(ClientSideIncrementalStream):
Expand Down
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
Expand Up @@ -4,6 +4,7 @@


import logging
from datetime import timedelta
from http import HTTPStatus
from unittest.mock import MagicMock

Expand Down Expand Up @@ -525,36 +526,95 @@ def test_engagements_stream_pagination_works(requests_mock, common_params):
test_stream = Engagements(**common_params)
records, _ = read_incremental(test_stream, {})
# The stream should handle pagination correctly and output 250 records.
assert len(records) == 250
assert len(records) == 100
assert test_stream.state["lastUpdated"] == int(test_stream._init_sync.timestamp() * 1000)


def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, common_params, fake_properties_list):
def test_engagements_stream_since_old_date(requests_mock, common_params, fake_properties_list):
"""
If there are more than 10,000 engagements that would be returned by the Hubspot recent engagements endpoint,
the Engagements instance should stop at the 10Kth record.
Connector should use 'All Engagements' API for old dates (more than 30 days)
"""

old_date = 1614038400000 # Tuesday, 23 February 2021 г., 0:00:00
responses = [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": 1641234595252}} for y in range(100)],
"hasMore": True,
"offset": x * 100,
"results": [{"engagement": {"id": f"{y}", "lastUpdated": old_date}} for y in range(100)],
"hasMore": False,
"offset": 0,
"total": 100
},
"status_code": 200,
}
for x in range(1, 102)
]

# Create test_stream instance with some state
test_stream = Engagements(**common_params)
test_stream.state = {"lastUpdated": 1641234595251}
test_stream.state = {"lastUpdated": old_date}
# Mocking Request
requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?count=100", responses)
requests_mock.register_uri("GET", "/engagements/v1/engagements/paged?count=250", responses)
records, _ = read_incremental(test_stream, {})
# The stream should not attempt to get more than 10K records.
assert len(records) == 10000
assert len(records) == 100
assert test_stream.state["lastUpdated"] == int(test_stream._init_sync.timestamp() * 1000)


def test_engagements_stream_since_recent_date(requests_mock, common_params, fake_properties_list):
"""
Connector should use 'Recent Engagements' API for recent dates (less than 30 days)
"""
recent_date = pendulum.now() - timedelta(days=10) # 10 days ago
recent_date = int(recent_date.timestamp() * 1000)
responses = [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": recent_date}} for y in range(100)],
"hasMore": False,
"offset": 0,
"total": 100
},
"status_code": 200,
}
]

# Create test_stream instance with some state
test_stream = Engagements(**common_params)
test_stream.state = {"lastUpdated": recent_date}
# Mocking Request
requests_mock.register_uri("GET", f"/engagements/v1/engagements/recent/modified?count=100&since={recent_date}", responses)
records, _ = read_incremental(test_stream, {"lastUpdated": recent_date})
# The stream should not attempt to get more than 10K records.
assert len(records) == 100
assert test_stream.state["lastUpdated"] == int(test_stream._init_sync.timestamp() * 1000)


def test_engagements_stream_since_recent_date_more_than_10k(requests_mock, common_params, fake_properties_list):
"""
Connector should use 'Recent Engagements' API for recent dates (less than 30 days).
If response from 'Recent Engagements' API returns 10k records, it means that there more records,
so 'All Engagements' API should be used.
"""
recent_date = pendulum.now() - timedelta(days=10) # 10 days ago
recent_date = int(recent_date.timestamp() * 1000)
responses = [
{
"json": {
"results": [{"engagement": {"id": f"{y}", "lastUpdated": recent_date}} for y in range(100)],
"hasMore": False,
"offset": 0,
"total": 10001
},
"status_code": 200,
}
]

# Create test_stream instance with some state
test_stream = Engagements(**common_params)
test_stream.state = {"lastUpdated": recent_date}
# Mocking Request
requests_mock.register_uri("GET", f"/engagements/v1/engagements/recent/modified?count=100&since={recent_date}", responses)
requests_mock.register_uri("GET", "/engagements/v1/engagements/paged?count=250", responses)
records, _ = read_incremental(test_stream, {"lastUpdated": recent_date})
assert len(records) == 100
assert test_stream.state["lastUpdated"] == int(test_stream._init_sync.timestamp() * 1000)


Expand Down