Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Hubspot: check if it has a state on search streams #15110

Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -950,9 +950,18 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
self.set_sync(sync_mode)
self.set_sync(sync_mode, stream_state)
return [None]

def set_sync(self, sync_mode: SyncMode, stream_state):
self._sync_mode = sync_mode
if self._sync_mode == SyncMode.incremental:
if stream_state:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vladimir-remar this is the main change here, right?

could you please provide an example of the use case for this? I don't quite get what's the point of this change and under what conditions it would come into action

btw, isn't stream_state value same as self._state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @davydov-d, thanks for answer.

Indeed it has the same value if the sync mode is incremental and run with a valid state.
Lets get into the main idea I have:

  • First sync: Incremental / no state, should use f"/crm/v3/objects/{self.entity}"
  • Following syncs: Incremental / state, should use f"/crm/v3/objects/{self.entity}/search"

Probably I missed something or I got the wrong idea refer to this.
Anyways thanks for your help.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vladimir-remar thanks for your time.
the reason I'm asking is that unfortunately, this connector's code is quite complicated and this change does not bring in more clarity..

so, if stream_state value is same as self._state, can't it be simplified to:

if self._sync_mode == SyncMode.incremental:
    if self._state:
        if not self._state:
            self._state = self._start_date
        else:
            self._state = self._start_date = max(self._state, self._start_date)

?
if so, not branch will never execute. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davydov-d Thanks to you.
I am not referring to the current value of the state, but to whether it had a previous initial state, before the first sync,
that is why my suggestion is oriented to whether it is the first sync or not. That's why I identify the initial value of stream_state and then I set the initial value of self._state

It is more related in which endpoint will the stream use

It depends if state property was set before, the actual approach in incremental mode the state will be set always incurring use the /search endpoint.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vladimir-remar I'm afraid you were misled by this if self._state statement - it's kind of a workaround with potential pitfalls. The idea is to use /search in Incremental mode (or whenever you have a filtering criteria as @grubberr mentioned above) as main url regardless of the state. This code should be refactored I believe 😕

if not self._state:
self._state = self._start_date
else:
self._state = self._start_date = max(self._state, self._start_date)


class CRMObjectStream(Stream):
"""Unified stream interface for CRM objects.
Expand Down