diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py index d7eece982d5df..6710527ed8d78 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py @@ -147,20 +147,22 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: enable_experimental_streams = "enable_experimental_streams" in config and config["enable_experimental_streams"] if enable_experimental_streams: - streams.extend([ - ContactsWebAnalytics(**common_params), - CompaniesWebAnalytics(**common_params), - DealsWebAnalytics(**common_params), - TicketsWebAnalytics(**common_params), - EngagementsCallsWebAnalytics(**common_params), - EngagementsEmailsWebAnalytics(**common_params), - EngagementsMeetingsWebAnalytics(**common_params), - EngagementsNotesWebAnalytics(**common_params), - EngagementsTasksWebAnalytics(**common_params), - GoalsWebAnalytics(**common_params), - LineItemsWebAnalytics(**common_params), - ProductsWebAnalytics(**common_params), - ]) + streams.extend( + [ + ContactsWebAnalytics(**common_params), + CompaniesWebAnalytics(**common_params), + DealsWebAnalytics(**common_params), + TicketsWebAnalytics(**common_params), + EngagementsCallsWebAnalytics(**common_params), + EngagementsEmailsWebAnalytics(**common_params), + EngagementsMeetingsWebAnalytics(**common_params), + EngagementsNotesWebAnalytics(**common_params), + EngagementsTasksWebAnalytics(**common_params), + GoalsWebAnalytics(**common_params), + LineItemsWebAnalytics(**common_params), + ProductsWebAnalytics(**common_params), + ] + ) api = API(credentials=credentials) if api.is_oauth2(): @@ -186,7 +188,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: if enable_experimental_streams: custom_objects_web_analytics_streams = self.get_web_analytics_custom_objects_stream( custom_object_stream_instances=self.get_custom_object_streams(api=api, common_params=common_params), - common_params=common_params + common_params=common_params, ) available_streams.extend(custom_objects_web_analytics_streams) @@ -202,7 +204,9 @@ def get_custom_object_streams(self, api: API, common_params: Mapping[str, Any]): **common_params, ) - def get_web_analytics_custom_objects_stream(self, custom_object_stream_instances: List[CustomObject], common_params: Any) -> WebAnalyticsStream: + def get_web_analytics_custom_objects_stream( + self, custom_object_stream_instances: List[CustomObject], common_params: Any + ) -> WebAnalyticsStream: for custom_object_stream_instance in custom_object_stream_instances: def __init__(self, **kwargs: Any): @@ -211,7 +215,7 @@ def __init__(self, **kwargs: Any): schema=custom_object_stream_instance.schema, fully_qualified_name=custom_object_stream_instance.fully_qualified_name, custom_properties=custom_object_stream_instance.custom_properties, - **common_params + **common_params, ) super(self.__class__, self).__init__(parent=parent, **kwargs) @@ -219,17 +223,14 @@ def get_json_schema(self): raw_schema = { "$schema": "http://json-schema.org/draft-07/schema#", "type": ["null", "object"], - "$ref": "default_event_properties.json" + "$ref": "default_event_properties.json", } return ResourceSchemaLoader("source_hubspot")._resolve_schema_references(raw_schema=raw_schema) custom_web_analytics_stream_class = type( f"{custom_object_stream_instance.name.capitalize()}WebAnalytics", (WebAnalyticsStream,), - { - "__init__": __init__, - "get_json_schema": get_json_schema - } + {"__init__": __init__, "get_json_schema": get_json_schema}, ) yield custom_web_analytics_stream_class(**common_params) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index e7d21fafb6d8a..b52e101cb4664 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -2080,14 +2080,18 @@ def state(self) -> MutableMapping[str, Any]: def state(self, value: MutableMapping[str, Any]): self._state = value - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> MutableMapping[str, Any]: + def get_updated_state( + self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] + ) -> MutableMapping[str, Any]: """ Returns current state. At the moment when this method is called by sources we already have updated state stored in self._state, because it is calculated each time we produce new record """ return self.state - def get_latest_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> MutableMapping[str, Any]: + def get_latest_state( + self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any] + ) -> MutableMapping[str, Any]: """ State is a composite object that keeps latest datetime of an event for each parent object: { @@ -2106,12 +2110,11 @@ def get_latest_state(self, current_stream_state: MutableMapping[str, Any], lates ) else: latest_datetime = latest_record[self.cursor_field] - return { - **self.state, - latest_record["objectId"]: {self.cursor_field: latest_datetime} - } + return {**self.state, latest_record["objectId"]: {self.cursor_field: latest_datetime}} - def client_side_filter_by_state(self, records: Iterable[Mapping[str, Any]], stream_slice: Mapping[str, Any], stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + def client_side_filter_by_state( + self, records: Iterable[Mapping[str, Any]], stream_slice: Mapping[str, Any], stream_state: Mapping[str, Any] + ) -> Iterable[Mapping[str, Any]]: """ Filter out records that not saticfy state condition. We should only care about state since records have already been filtered by self._start_date in parent class (See `Stream.read_records`) @@ -2126,7 +2129,9 @@ def client_side_filter_by_state(self, records: Iterable[Mapping[str, Any]], stre if object_id not in stream_state or record[self.cursor_field] > stream_state[object_id][self.cursor_field]: yield record - def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] | None = None, stream_state: Mapping[str, Any] | None = None) -> Iterable[Mapping[str, Any] | None]: + def stream_slices( + self, sync_mode: SyncMode, cursor_field: List[str] | None = None, stream_state: Mapping[str, Any] | None = None + ) -> Iterable[Mapping[str, Any] | None]: now = pendulum.now(tz="UTC") for parent_slice in super().stream_slices(sync_mode, cursor_field, stream_state): @@ -2138,7 +2143,8 @@ def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] | None = No self._start_date, self._field_to_datetime(self.state[object_id][self.cursor_field]), ) - if object_id in self.state else self._start_date + if object_id in self.state + else self._start_date ) # Making slices of given slice period @@ -2151,7 +2157,12 @@ def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] | None = No # Once API become available for testing we should ensure those time windows are still suitable for us, # so we neither loose records nor duplicate them while ( - (to_datetime := min(from_datetime.add(days=self.slicing_period), now) if not self.is_client_side_incremental_enabled else now) <= now + ( + to_datetime := min(from_datetime.add(days=self.slicing_period), now) + if not self.is_client_side_incremental_enabled + else now + ) + <= now and from_datetime != now and from_datetime <= to_datetime ): @@ -2190,7 +2201,9 @@ def get_object_id(self, value: MutableMapping[str, Any]) -> str: def url(self) -> str: return "/events/v3/events" - def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: """ Preparing the request params dictionary for the following query string: ?objectType= @@ -2201,7 +2214,13 @@ def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[ params = super().request_params(stream_state, stream_slice, next_page_token) return params | stream_slice - def read_records(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Mapping[str, Any]]: + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: record_generator = super().read_records(sync_mode, cursor_field, stream_slice, stream_state) # TODO: consider to remove this condition or turn `is_client_side_incremental_enabled` off