Skip to content

Commit

Permalink
🎉 Source Hubspot: added custom availability strategy (#26418)
Browse files Browse the repository at this point in the history
* Added custom availability strategy which catches permission errors from parent streams

* added PR number to doc

* Automated Change

* fixed expected records

* updated version in metadata

* Automated Change

* merged the latest changes

* updated doc

* updated doc

* Automated Change

---------

Co-authored-by: midavadim <midavadim@users.noreply.github.com>
  • Loading branch information
midavadim and midavadim committed May 31, 2023
1 parent 431b7d1 commit 812aa3a
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 6 deletions.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.8.1
LABEL io.airbyte.version=0.8.2
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ acceptance_tests:
- config_path: secrets/config_oauth.json
expect_records:
path: integration_tests/expected_records.jsonl
extra_records: yes
timeout_seconds: 3600
empty_streams:
- name: form_submissions
Expand Down Expand Up @@ -59,9 +60,29 @@ acceptance_tests:
deals:
- name: properties/hs_time_*
bypass_reason: Hubspot time depend on current time
- name: properties/hs_acv
bypass_reason: value can be an integer or float
- name: properties/hs_arr
bypass_reason: value can be an integer or float
- name: properties/hs_mrr
bypass_reason: value can be an integer or float
- name: properties/hs_tcv
bypass_reason: value can be an integer or float
- name: properties/hs_num_of_associated_line_items
bypass_reason: value can be an integer or float
deals_archived:
- name: properties/hs_time_*
bypass_reason: Hubspot time depend on current time
- name: properties/hs_acv
bypass_reason: value can be an integer or float
- name: properties/hs_arr
bypass_reason: value can be an integer or float
- name: properties/hs_mrr
bypass_reason: value can be an integer or float
- name: properties/hs_tcv
bypass_reason: value can be an integer or float
- name: properties/hs_num_of_associated_line_items
bypass_reason: value can be an integer or float
tickets:
- name: properties/hs_time_*
bypass_reason: Hubspot time depend on current time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerImageTag: 0.8.1
dockerImageTag: 0.8.2
dockerRepository: airbyte/source-hubspot
githubIssueLabel: source-hubspot
icon: hubspot.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
import logging
import sys
import time
from abc import ABC, abstractmethod
Expand All @@ -15,12 +16,15 @@
import requests
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import IncrementalMixin
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from requests import codes
from requests import HTTPError, codes
from source_hubspot.constants import OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout, InvalidStartDateConfigError
from source_hubspot.helpers import APIv1Property, APIv3Property, GroupByKey, IRecordPostProcessor, IURLPropertyRepresentation, StoreAsIs
Expand Down Expand Up @@ -105,6 +109,18 @@ def log_giveup(_details):
)


class HubspotAvailabilityStrategy(HttpAvailabilityStrategy):
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
"""Catch HTTPError thrown from parent stream which is called by get_first_stream_slice"""
try:
return super().check_availability(stream, logger, source)
except HTTPError as error:
is_available, reason = self.handle_http_error(stream, logger, source, error)
if reason:
reason = f"Unable to sync stream '{stream.name}' because of permission error in parent stream. {reason}"
return is_available, reason


class API:
"""HubSpot API interface, authorize, retrieve and post, supports backoff logic"""

Expand Down Expand Up @@ -714,6 +730,10 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta
record[name.replace(" ", "_")] = [row["id"] for row in association.get("results", [])]
yield record

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return HubspotAvailabilityStrategy()


class ClientSideIncrementalStream(Stream, IncrementalMixin):
_cursor_value = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,46 @@ def test_stream_forbidden(requests_mock, config, caplog):
assert not records


def test_parent_stream_forbidden(requests_mock, config, caplog, fake_properties_list):
json = {
"status": "error",
"message": "This access_token does not have proper permissions!",
}
requests_mock.get("https://api.hubapi.com/marketing/v3/forms", json=json, status_code=403)
properties_response = [
{
"json": [
{"name": property_name, "type": "string", "updatedAt": 1571085954360, "createdAt": 1565059306048}
for property_name in fake_properties_list
],
"status_code": 200,
}
]
requests_mock.get("https://api.hubapi.com/properties/v2/form/properties", properties_response)
requests_mock.get("https://api.hubapi.com/crm/v3/schemas", json=json, status_code=403)

catalog = ConfiguredAirbyteCatalog.parse_obj(
{
"streams": [
{
"stream": {
"name": "form_submissions",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
}
)

records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records


class TestSplittingPropertiesFunctionality:
BASE_OBJECT_BODY = {
"createdAt": "2020-12-10T07:58:09.554Z",
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ Now that you have set up the Hubspot source connector, check out the following H
## Changelog
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.8.1 | 2023-05-29 | [26719](https://github.com/airbytehq/airbyte/pull/26719) | Handle issue when `state` value is literaly `"" (empty str)`
| 0.8.2 | 2023-05-16 | [26418](https://github.com/airbytehq/airbyte/pull/26418) | Added custom availability strategy which catches permission errors from parent streams |
| 0.8.1 | 2023-05-29 | [26719](https://github.com/airbytehq/airbyte/pull/26719) | Handle issue when `state` value is literally `"" (empty str)` |
| 0.8.0 | 2023-04-10 | [16032](https://github.com/airbytehq/airbyte/pull/16032) | Add new stream `Custom Object` |
| 0.7.0 | 2023-04-10 | [24450](https://github.com/airbytehq/airbyte/pull/24450) | Add new stream `Goals` |
| 0.6.2 | 2023-04-28 | [25667](https://github.com/airbytehq/airbyte/pull/25667) | Fixed bug with `Invalid Date` like `2000-00-00T00:00:00Z` while settip up the connector |
| 0.6.2 | 2023-04-28 | [25667](https://github.com/airbytehq/airbyte/pull/25667) | Fixed bug with `Invalid Date` like `2000-00-00T00:00:00Z` while settip up the connector |
| 0.6.1 | 2023-04-10 | [21423](https://github.com/airbytehq/airbyte/pull/21423) | Update scope for `DealPipelines` stream to only `crm.objects.contacts.read` |
| 0.6.0 | 2023-04-07 | [24980](https://github.com/airbytehq/airbyte/pull/24980) | Add new stream `DealsArchived` |
| 0.5.2 | 2023-04-07 | [24915](https://github.com/airbytehq/airbyte/pull/24915) | Fix field key parsing (replace whitespace with uderscore) |
Expand Down

0 comments on commit 812aa3a

Please sign in to comment.