Skip to content

Commit

Permalink
Automated Commit - Formatting Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-yermilov-gl authored and octavia-squidington-iii committed Dec 4, 2023
1 parent 20c19d9 commit 5891e59
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 34 deletions.
Expand Up @@ -147,20 +147,22 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
enable_experimental_streams = "enable_experimental_streams" in config and config["enable_experimental_streams"]

if enable_experimental_streams:
streams.extend([
ContactsWebAnalytics(**common_params),
CompaniesWebAnalytics(**common_params),
DealsWebAnalytics(**common_params),
TicketsWebAnalytics(**common_params),
EngagementsCallsWebAnalytics(**common_params),
EngagementsEmailsWebAnalytics(**common_params),
EngagementsMeetingsWebAnalytics(**common_params),
EngagementsNotesWebAnalytics(**common_params),
EngagementsTasksWebAnalytics(**common_params),
GoalsWebAnalytics(**common_params),
LineItemsWebAnalytics(**common_params),
ProductsWebAnalytics(**common_params),
])
streams.extend(
[
ContactsWebAnalytics(**common_params),
CompaniesWebAnalytics(**common_params),
DealsWebAnalytics(**common_params),
TicketsWebAnalytics(**common_params),
EngagementsCallsWebAnalytics(**common_params),
EngagementsEmailsWebAnalytics(**common_params),
EngagementsMeetingsWebAnalytics(**common_params),
EngagementsNotesWebAnalytics(**common_params),
EngagementsTasksWebAnalytics(**common_params),
GoalsWebAnalytics(**common_params),
LineItemsWebAnalytics(**common_params),
ProductsWebAnalytics(**common_params),
]
)

api = API(credentials=credentials)
if api.is_oauth2():
Expand All @@ -186,7 +188,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
if enable_experimental_streams:
custom_objects_web_analytics_streams = self.get_web_analytics_custom_objects_stream(
custom_object_stream_instances=self.get_custom_object_streams(api=api, common_params=common_params),
common_params=common_params
common_params=common_params,
)
available_streams.extend(custom_objects_web_analytics_streams)

Expand All @@ -202,7 +204,9 @@ def get_custom_object_streams(self, api: API, common_params: Mapping[str, Any]):
**common_params,
)

def get_web_analytics_custom_objects_stream(self, custom_object_stream_instances: List[CustomObject], common_params: Any) -> WebAnalyticsStream:
def get_web_analytics_custom_objects_stream(
self, custom_object_stream_instances: List[CustomObject], common_params: Any
) -> WebAnalyticsStream:
for custom_object_stream_instance in custom_object_stream_instances:

def __init__(self, **kwargs: Any):
Expand All @@ -211,25 +215,22 @@ def __init__(self, **kwargs: Any):
schema=custom_object_stream_instance.schema,
fully_qualified_name=custom_object_stream_instance.fully_qualified_name,
custom_properties=custom_object_stream_instance.custom_properties,
**common_params
**common_params,
)
super(self.__class__, self).__init__(parent=parent, **kwargs)

def get_json_schema(self):
raw_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"$ref": "default_event_properties.json"
"$ref": "default_event_properties.json",
}
return ResourceSchemaLoader("source_hubspot")._resolve_schema_references(raw_schema=raw_schema)

custom_web_analytics_stream_class = type(
f"{custom_object_stream_instance.name.capitalize()}WebAnalytics",
(WebAnalyticsStream,),
{
"__init__": __init__,
"get_json_schema": get_json_schema
}
{"__init__": __init__, "get_json_schema": get_json_schema},
)

yield custom_web_analytics_stream_class(**common_params)
Expand Up @@ -2080,14 +2080,18 @@ def state(self) -> MutableMapping[str, Any]:
def state(self, value: MutableMapping[str, Any]):
self._state = value

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> MutableMapping[str, Any]:
def get_updated_state(
self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
) -> MutableMapping[str, Any]:
"""
Returns current state. At the moment when this method is called by sources we already have updated state stored in self._state,
because it is calculated each time we produce new record
"""
return self.state

def get_latest_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> MutableMapping[str, Any]:
def get_latest_state(
self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]
) -> MutableMapping[str, Any]:
"""
State is a composite object that keeps latest datetime of an event for each parent object:
{
Expand All @@ -2106,12 +2110,11 @@ def get_latest_state(self, current_stream_state: MutableMapping[str, Any], lates
)
else:
latest_datetime = latest_record[self.cursor_field]
return {
**self.state,
latest_record["objectId"]: {self.cursor_field: latest_datetime}
}
return {**self.state, latest_record["objectId"]: {self.cursor_field: latest_datetime}}

def client_side_filter_by_state(self, records: Iterable[Mapping[str, Any]], stream_slice: Mapping[str, Any], stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
def client_side_filter_by_state(
self, records: Iterable[Mapping[str, Any]], stream_slice: Mapping[str, Any], stream_state: Mapping[str, Any]
) -> Iterable[Mapping[str, Any]]:
"""
Filter out records that not saticfy state condition.
We should only care about state since records have already been filtered by self._start_date in parent class (See `Stream.read_records`)
Expand All @@ -2126,7 +2129,9 @@ def client_side_filter_by_state(self, records: Iterable[Mapping[str, Any]], stre
if object_id not in stream_state or record[self.cursor_field] > stream_state[object_id][self.cursor_field]:
yield record

def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] | None = None, stream_state: Mapping[str, Any] | None = None) -> Iterable[Mapping[str, Any] | None]:
def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] | None = None, stream_state: Mapping[str, Any] | None = None
) -> Iterable[Mapping[str, Any] | None]:
now = pendulum.now(tz="UTC")
for parent_slice in super().stream_slices(sync_mode, cursor_field, stream_state):

Expand All @@ -2138,7 +2143,8 @@ def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] | None = No
self._start_date,
self._field_to_datetime(self.state[object_id][self.cursor_field]),
)
if object_id in self.state else self._start_date
if object_id in self.state
else self._start_date
)

# Making slices of given slice period
Expand All @@ -2151,7 +2157,12 @@ def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] | None = No
# Once API become available for testing we should ensure those time windows are still suitable for us,
# so we neither loose records nor duplicate them
while (
(to_datetime := min(from_datetime.add(days=self.slicing_period), now) if not self.is_client_side_incremental_enabled else now) <= now
(
to_datetime := min(from_datetime.add(days=self.slicing_period), now)
if not self.is_client_side_incremental_enabled
else now
)
<= now
and from_datetime != now
and from_datetime <= to_datetime
):
Expand Down Expand Up @@ -2190,7 +2201,9 @@ def get_object_id(self, value: MutableMapping[str, Any]) -> str:
def url(self) -> str:
return "/events/v3/events"

def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
Preparing the request params dictionary for the following query string:
<url>?objectType=<parent-type>
Expand All @@ -2201,7 +2214,13 @@ def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[
params = super().request_params(stream_state, stream_slice, next_page_token)
return params | stream_slice

def read_records(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Mapping[str, Any]]:
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
record_generator = super().read_records(sync_mode, cursor_field, stream_slice, stream_state)

# TODO: consider to remove this condition or turn `is_client_side_incremental_enabled` off
Expand Down

0 comments on commit 5891e59

Please sign in to comment.