Skip to content

Commit

Permalink
🐛 Source Hubspot: Fix issue with getting 414 HTTP error for streams (#…
Browse files Browse the repository at this point in the history
…6954)

* Source Hubspot: Fix issue with getting 414 HTTP error for streams

* update code and schemas

* bump version
  • Loading branch information
yevhenii-ldv committed Oct 26, 2021
1 parent 722f82d commit 928aa62
Show file tree
Hide file tree
Showing 14 changed files with 428 additions and 2,476 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "36c891d9-4bd9-43ac-bad2-10e12756272c",
"name": "Hubspot",
"dockerRepository": "airbyte/source-hubspot",
"dockerImageTag": "0.1.18",
"dockerImageTag": "0.1.19",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/hubspot",
"icon": "hubspot.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
- sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
name: Hubspot
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.1.18
dockerImageTag: 0.1.19
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ RUN pip install .

ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh"

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/source-hubspot
144 changes: 96 additions & 48 deletions airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from abc import ABC, abstractmethod
from functools import lru_cache, partial
from http import HTTPStatus
from typing import Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union

import backoff
import pendulum as pendulum
Expand All @@ -17,6 +17,10 @@
from base_python.entrypoint import logger
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout

# The value is obtained experimentally, Hubspot allows the URL length up to ~ 16300 symbols,
# so it was decided to limit the length of the `properties` parameter to 15000 characters.
PROPERTIES_PARAM_MAX_LENGTH = 15000

# we got this when provided API Token has incorrect format
CLOUDFLARE_ORIGIN_DNS_ERROR = 530

Expand Down Expand Up @@ -49,6 +53,22 @@
CUSTOM_FIELD_VALUE_TO_TYPE = {v: k for k, v in CUSTOM_FIELD_TYPE_TO_VALUE.items()}


def split_properties(properties_list: List[str]) -> Iterator[Tuple[str]]:
summary_length = 0
local_properties = []
for property_ in properties_list:
if len(property_) + summary_length >= PROPERTIES_PARAM_MAX_LENGTH:
yield local_properties
local_properties = []
summary_length = 0

local_properties.append(property_)
summary_length += len(property_)

if local_properties:
yield local_properties


def retry_connection_handler(**kwargs):
"""Retry helper, log each attempt"""

Expand Down Expand Up @@ -177,6 +197,7 @@ class Stream(ABC):
page_field = "offset"
limit_field = "limit"
limit = 100
offset = 0

@property
@abstractmethod
Expand Down Expand Up @@ -302,58 +323,85 @@ def _filter_old_records(self, records: Iterable) -> Iterable:
yield record

def _read(self, getter: Callable, params: MutableMapping[str, Any] = None) -> Iterator:
next_page_token = None
while True:
response = getter(params=params)
if isinstance(response, Mapping):
if response.get("status", None) == "error":
"""
When the API Key doen't have the permissions to access the endpoint,
we break the read, skip this stream and log warning message for the user.
Example:
response.json() = {
'status': 'error',
'message': 'This hapikey (....) does not have proper permissions! (requires any of [automation-access])',
'correlationId': '111111-2222-3333-4444-55555555555'}
"""
logger.warn(f"Stream `{self.data_field}` cannot be procced. {response.get('message')}")
break

if response.get(self.data_field) is None:
"""
When the response doen't have the stream's data, raise an exception.
"""
raise RuntimeError("Unexpected API response: {} not in {}".format(self.data_field, response.keys()))

yield from response[self.data_field]

# pagination
if "paging" in response: # APIv3 pagination
if "next" in response["paging"]:
params["after"] = response["paging"]["next"]["after"]
else:
break
else:
if not response.get(self.more_key, False):
break
if self.page_field in response:
params[self.page_filter] = response[self.page_field]
if next_page_token:
params.update(next_page_token)

properties_list = list(self.properties.keys())
if properties_list:
# TODO: Additional processing was added due to the fact that users receive 414 errors while syncing their streams (issues #3977 and #5835).
# We will need to fix this code when the Hubspot developers add the ability to use a special parameter to get all properties for an entity.
# According to Hubspot Community (https://community.hubspot.com/t5/APIs-Integrations/Get-all-contact-properties-without-explicitly-listing-them/m-p/447950)
# and the official documentation, this does not exist at the moment.
stream_records = {}

for properties in split_properties(properties_list):
params.update({"properties": ",".join(properties)})
response = getter(params=params)
for record in self._transform(self.parse_response(response)):
if record["id"] not in stream_records:
stream_records[record["id"]] = record
elif stream_records[record["id"]].get("properties"):
stream_records[record["id"]]["properties"].update(record.get("properties", {}))

yield from [value for key, value in stream_records.items()]
else:
response = list(response)
yield from response
response = getter(params=params)
yield from self._transform(self.parse_response(response))

# pagination
if len(response) < self.limit:
break
else:
params[self.page_filter] = params.get(self.page_filter, 0) + self.limit
next_page_token = self.next_page_token(response)
if not next_page_token:
break

def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
default_params = {self.limit_field: self.limit, "properties": ",".join(self.properties.keys())}
default_params = {self.limit_field: self.limit}
params = {**default_params, **params} if params else {**default_params}
yield from self._filter_dynamic_fields(self._filter_old_records(self._read(getter, params)))

def parse_response(self, response: Union[Mapping[str, Any], List[dict]]) -> Iterator:
if isinstance(response, Mapping):
if response.get("status", None) == "error":
"""
When the API Key doen't have the permissions to access the endpoint,
we break the read, skip this stream and log warning message for the user.
Example:
yield from self._filter_dynamic_fields(self._filter_old_records(self._transform(self._read(getter, params))))
response.json() = {
'status': 'error',
'message': 'This hapikey (....) does not have proper permissions! (requires any of [automation-access])',
'correlationId': '111111-2222-3333-4444-55555555555'}
"""
logger.warn(f"Stream `{self.entity}` cannot be procced. {response.get('message')}")
return

if response.get(self.data_field) is None:
"""
When the response doen't have the stream's data, raise an exception.
"""
raise RuntimeError("Unexpected API response: {} not in {}".format(self.data_field, response.keys()))

yield from response[self.data_field]

else:
response = list(response)
yield from response

def next_page_token(self, response: Union[Mapping[str, Any], List[dict]]) -> Optional[Mapping[str, Union[int, str]]]:
if isinstance(response, Mapping):
if "paging" in response: # APIv3 pagination
if "next" in response["paging"]:
return {"after": response["paging"]["next"]["after"]}
else:
if not response.get(self.more_key, False):
return
if self.page_field in response:
return {self.page_filter: response[self.page_field]}
else:
if len(response) >= self.limit:
self.offset += self.limit
return {self.page_filter: self.offset}

@staticmethod
def _get_field_props(field_type: str) -> Mapping[str, List[str]]:
Expand Down Expand Up @@ -639,13 +687,13 @@ def _transform(self, records: Iterable) -> Iterable:


class FormStream(Stream):
"""Marketing Forms, API v2
"""Marketing Forms, API v3
by default non-marketing forms are filtered out of this endpoint
Docs: https://developers.hubspot.com/docs/api/marketing/forms
"""

entity = "form"
url = "/forms/v2/forms"
url = "/marketing/v3/forms"
updated_at_field = "updatedAt"
created_at_field = "createdAt"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,25 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"properties": {
"portalId": {
"type": ["null", "integer"]
},
"companyId": {
"type": ["null", "integer"]
},
"isDeleted": {
"type": ["null", "boolean"]
"id": {
"type": ["null", "string"]
},
"stateChanges": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"]
}
"createdAt": {
"type": ["null", "string"],
"format": "date-time"
},
"additionalDomains": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
}
"updatedAt": {
"type": ["null", "string"],
"format": "date-time"
},
"mergeAudits": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"mergedCompanyId": {
"type": ["null", "integer"]
},
"canonicalCompanyId": {
"type": ["null", "integer"]
},
"sourceId": {
"type": ["null", "string"]
},
"entityId": {
"type": ["null", "string"]
},
"mergedCompanyName": {
"type": ["null", "string"]
},
"movedProperties": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
}
}
}
}
"archived": {
"type": ["null", "boolean"]
},
"contacts": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
"type": "string"
}
},
"createdAt": {
"type": ["null", "string"]
},
"updatedAt": {
"type": ["null", "string"]
}
}
}

0 comments on commit 928aa62

Please sign in to comment.