From b505d92d608cfb559dc48b4b52f054b3b12c5078 Mon Sep 17 00:00:00 2001 From: James Truty Date: Thu, 30 Nov 2023 09:41:46 -0600 Subject: [PATCH 1/2] adding custom fields to close schema --- .../connectors/source-close-com/Dockerfile | 2 +- .../source_close_com/__init__.py | 2 +- .../source-close-com/source_close_com/source.py | 17 +++++++++++++++-- docs/integrations/sources/close-com.md | 5 +++-- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-close-com/Dockerfile b/airbyte-integrations/connectors/source-close-com/Dockerfile index e77535415cb07..44603bb80be52 100644 --- a/airbyte-integrations/connectors/source-close-com/Dockerfile +++ b/airbyte-integrations/connectors/source-close-com/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.4.3 +LABEL io.airbyte.version=0.5.0 LABEL io.airbyte.name=airbyte/source-close-com diff --git a/airbyte-integrations/connectors/source-close-com/source_close_com/__init__.py b/airbyte-integrations/connectors/source-close-com/source_close_com/__init__.py index 290f7d5f74e05..26f244576b385 100644 --- a/airbyte-integrations/connectors/source-close-com/source_close_com/__init__.py +++ b/airbyte-integrations/connectors/source-close-com/source_close_com/__init__.py @@ -22,6 +22,6 @@ from .datetime_incremental_sync import CustomDatetimeIncrementalSync -from .source_lc import SourceCloseCom +from .source import SourceCloseCom __all__ = ["SourceCloseCom", "CustomDatetimeIncrementalSync"] diff --git a/airbyte-integrations/connectors/source-close-com/source_close_com/source.py b/airbyte-integrations/connectors/source-close-com/source_close_com/source.py index 2fe9c2f85ab8f..88f1e888372ba 100644 --- a/airbyte-integrations/connectors/source-close-com/source_close_com/source.py +++ b/airbyte-integrations/connectors/source-close-com/source_close_com/source.py @@ -58,7 +58,6 @@ def request_params( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: - params = {} if self.number_of_items_per_page: params.update({"_limit": self.number_of_items_per_page}) @@ -69,6 +68,21 @@ def request_params( return params + def get_custom_field_schema(self) -> Mapping[str, Any]: + """Get custom field schema if it exists.""" + if self.path() not in {"lead", "contact", "opportunity", "activity"}: + return {} + resp = requests.request("GET", url=f"{self.url_base}/custom_field/{self.path()}/", headers=self.authenticator.get_auth_header()) + resp.raise_for_status() + resp_json: Mapping[str, Any] = resp.json()["data"] + return {f"custom.{data['id']}": {"type": ["null", "string", "number", "boolean"]} for data in resp_json} + + def get_json_schema(self): + """Override default get_json_schema method to add custom fields to schema.""" + schema = super().get_json_schema() + schema["properties"].update(self.get_custom_field_schema()) + return schema + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: yield from response.json()["data"] @@ -88,7 +102,6 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: class IncrementalCloseComStream(CloseComStream): - cursor_field = "date_updated" def get_updated_state( diff --git a/docs/integrations/sources/close-com.md b/docs/integrations/sources/close-com.md index bedfdc1169496..d152f845fe9ce 100644 --- a/docs/integrations/sources/close-com.md +++ b/docs/integrations/sources/close-com.md @@ -105,8 +105,9 @@ The Close.com connector is subject to rate limits. For more information on this | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------| -| 0.4.3 | 2023-10-28 | [31534](https://github.com/airbytehq/airbyte/pull/31534) | Fixed Email Activities Stream Pagination | -| 0.4.2 | 2023-08-08 | [29206](https://github.com/airbytehq/airbyte/pull/29206) | Fixed the issue with `DatePicker` format for `start date` | +| 0.5.0 | 2023-11-30 | [32984](https://github.com/airbytehq/airbyte/pull/32984) | Add support for custom fields | +| 0.4.3 | 2023-10-28 | [31534](https://github.com/airbytehq/airbyte/pull/31534) | Fixed Email Activities Stream Pagination | +| 0.4.2 | 2023-08-08 | [29206](https://github.com/airbytehq/airbyte/pull/29206) | Fixed the issue with `DatePicker` format for `start date` | | 0.4.1 | 2023-07-04 | [27950](https://github.com/airbytehq/airbyte/pull/27950) | Add human readable titles to API Key and Start Date fields | | 0.4.0 | 2023-06-27 | [27776](https://github.com/airbytehq/airbyte/pull/27776) | Update the `Email Followup Tasks` stream schema | | 0.3.0 | 2023-05-12 | [26024](https://github.com/airbytehq/airbyte/pull/26024) | Update the `Email sequences` stream schema | From 2ee08e4fcaa3b9f6a1d04da6d98f259b1ce4c4ac Mon Sep 17 00:00:00 2001 From: James Truty Date: Sun, 10 Dec 2023 09:18:33 -0600 Subject: [PATCH 2/2] use child class --- .../source_close_com/source.py | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/airbyte-integrations/connectors/source-close-com/source_close_com/source.py b/airbyte-integrations/connectors/source-close-com/source_close_com/source.py index 88f1e888372ba..9754c114bbd8c 100644 --- a/airbyte-integrations/connectors/source-close-com/source_close_com/source.py +++ b/airbyte-integrations/connectors/source-close-com/source_close_com/source.py @@ -68,21 +68,6 @@ def request_params( return params - def get_custom_field_schema(self) -> Mapping[str, Any]: - """Get custom field schema if it exists.""" - if self.path() not in {"lead", "contact", "opportunity", "activity"}: - return {} - resp = requests.request("GET", url=f"{self.url_base}/custom_field/{self.path()}/", headers=self.authenticator.get_auth_header()) - resp.raise_for_status() - resp_json: Mapping[str, Any] = resp.json()["data"] - return {f"custom.{data['id']}": {"type": ["null", "string", "number", "boolean"]} for data in resp_json} - - def get_json_schema(self): - """Override default get_json_schema method to add custom fields to schema.""" - schema = super().get_json_schema() - schema["properties"].update(self.get_custom_field_schema()) - return schema - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: yield from response.json()["data"] @@ -101,6 +86,23 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: return backoff_time +class CloseComStreamCustomFields(CloseComStream): + """Class to get custom fields for close objects that support them.""" + + def get_custom_field_schema(self) -> Mapping[str, Any]: + """Get custom field schema if it exists.""" + resp = requests.request("GET", url=f"{self.url_base}/custom_field/{self.path()}/", headers=self.authenticator.get_auth_header()) + resp.raise_for_status() + resp_json: Mapping[str, Any] = resp.json()["data"] + return {f"custom.{data['id']}": {"type": ["null", "string", "number", "boolean"]} for data in resp_json} + + def get_json_schema(self): + """Override default get_json_schema method to add custom fields to schema.""" + schema = super().get_json_schema() + schema["properties"].update(self.get_custom_field_schema()) + return schema + + class IncrementalCloseComStream(CloseComStream): cursor_field = "date_updated" @@ -118,6 +120,10 @@ def get_updated_state( return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))} +class IncrementalCloseComStreamCustomFields(CloseComStreamCustomFields, IncrementalCloseComStream): + """Class to get custom fields for close objects using incremental stream.""" + + class CloseComActivitiesStream(IncrementalCloseComStream): """ General class for activities. Define request params based on cursor_field value. @@ -246,7 +252,7 @@ def request_params(self, stream_state=None, **kwargs): return params -class Leads(IncrementalCloseComStream): +class Leads(IncrementalCloseComStreamCustomFields): """ Get leads on a specific date API Docs: https://developer.close.com/#leads @@ -417,7 +423,7 @@ def path(self, **kwargs) -> str: return "user" -class Contacts(CloseComStream): +class Contacts(CloseComStreamCustomFields): """ Get contacts for Close.com account organization API Docs: https://developer.close.com/#contacts @@ -429,7 +435,7 @@ def path(self, **kwargs) -> str: return "contact" -class Opportunities(IncrementalCloseComStream): +class Opportunities(IncrementalCloseComStreamCustomFields): """ Get opportunities on a specific date API Docs: https://developer.close.com/#opportunities