Skip to content

Commit

Permalink
馃悰 Source Hubspot: Removed stage history from deals stream (#10707)
Browse files Browse the repository at this point in the history
* Removed stage history from Hubspot deal stream

* Fixed stream slices on CrmSearchStream

* Removed unused DealStageHistoryStream class

* Bumped connector version
  • Loading branch information
lgomezm committed Mar 5, 2022
1 parent e34c357 commit d39b636
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.1.44
dockerImageTag: 0.1.45
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3294,7 +3294,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-hubspot:0.1.44"
- dockerImage: "airbyte/source-hubspot:0.1.45"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.44
LABEL io.airbyte.version=0.1.45
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
payload["after"] = int(response["paging"]["next"]["after"])

return {"params": params, "payload": payload}

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
return [None]


class CRMObjectStream(Stream):
Expand Down Expand Up @@ -996,62 +1001,13 @@ def request_params(
return params


class DealStageHistoryStream(Stream):
"""Deal stage history, API v1
Deal stage history is exposed by the v1 API, but not the v3 API.
The v1 endpoint requires the contacts scope.
Docs: https://legacydocs.hubspot.com/docs/methods/deals/get-all-deals
"""

url = "/deals/v1/deal/paged"
more_key = "hasMore"
data_field = "deals"
updated_at_field = "timestamp"

def _transform(self, records: Iterable) -> Iterable:
for record in super()._transform(records):
dealstage = record.get("properties", {}).get("dealstage", {})
updated_at = dealstage.get(self.updated_at_field)
if updated_at:
yield {"id": record.get("dealId"), "dealstage": dealstage, self.updated_at_field: updated_at}

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
return {"propertiesWithHistory": "dealstage"}


class Deals(CRMSearchStream):
"""Deals, API v3"""

entity = "deal"
last_modified_field = "hs_lastmodifieddate"
associations = ["contacts", "companies"]

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._stage_history = DealStageHistoryStream(**kwargs)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
history_by_id = {}
for record in self._stage_history.read_records(sync_mode):
if all(field in record for field in ("id", "dealstage")):
history_by_id[record["id"]] = record["dealstage"]

for record in super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state):
if record.get("id") and int(record["id"]) in history_by_id:
record["dealstage"] = history_by_id[int(record["id"])]
yield record


class DealPipelines(Stream):
"""Deal pipelines, API v1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,74 +244,6 @@ def test_stream_with_splitting_properties_with_new_record(self, requests_mock, c

test_stream = Deals(**common_params)

deal_stage_history_response = {
"deals": [
{
"portalId": 123,
"dealId": 111,
"isDeleted": False,
"associations": None,
"properties": {
"dealstage": {
"value": "appointmentscheduled",
"timestamp": 1610533842221,
"source": "API",
"sourceId": None,
"updatedByUserId": None,
"versions": [
{
"name": "dealstage",
"value": "appointmentscheduled",
"timestamp": 1610533842221,
"source": "API",
"sourceVid": [],
"requestId": "19f07c43-b187-4ab6-9fab-4a0f261f0a8c",
}
],
}
},
"stateChanges": [],
},
{
"portalId": 123,
"dealId": 112,
"isDeleted": False,
"associations": None,
"properties": {
"dealstage": {
"value": "appointmentscheduled",
"timestamp": 1610533911154,
"source": "API",
"sourceId": None,
"updatedByUserId": None,
"versions": [
{
"name": "dealstage",
"value": "appointmentscheduled",
"timestamp": 1610533911154,
"source": "API",
"sourceVid": [],
"requestId": "41a1eeff-569b-4193-ba80-238d3bd13f56",
}
],
}
},
"stateChanges": [],
},
]
}

requests_mock.register_uri(
"GET",
test_stream._stage_history.path(),
[
{
"json": deal_stage_history_response,
"status_code": 200,
}
],
)

ids_list = ["6043593519", "1092593519", "1092593518", "1092593517", "1092593516"]
for property_slice in parsed_properties:
record_responses = [
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ If you are using Oauth, most of the streams require the appropriate [scopes](htt

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:-----------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.45 | 2022-03-04 | [10707](https://github.com/airbytehq/airbyte/pull/10707) | Remove stage history from deals stream to increase efficiency |
| 0.1.44 | 2022-02-24 | [9027](https://github.com/airbytehq/airbyte/pull/9027) | Add associations companies to deals, ticket and contact stream |
| 0.1.43 | 2022-02-24 | [10576](https://github.com/airbytehq/airbyte/pull/10576) | Cast timestamp to date/datetime|
| 0.1.42 | 2022-02-22 | [10492](https://github.com/airbytehq/airbyte/pull/10492) | Add `date-time` format to datetime fields|
Expand Down

0 comments on commit d39b636

Please sign in to comment.