Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFR for API Sources] New Python interfaces to support resumable full refresh #37429

Merged
merged 26 commits into from May 6, 2024

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Apr 19, 2024

What

Primarily updates how the Stream class performs a read. The big change is around how we resolve what the next partition of records to retrieve is. Because resumable full refresh operates under the paradigm of an unbounded set of pages (unlike incremental partitioned time windows), we need to change how we determine the next slice.

The primary changes to the flow at a high level are:

  • Consolidates a lot of the branching logic to be agnostic of the incoming catalog's sync mode which incorporated parts of Ella's changes in Synchronous python CDK: make sync mode agnostic #36999
  • Instead of stream_slices() being the mechanism for determining the next partition of records to retrieve. I've introduced the concept of a CheckpointReader whose type is instantiated based on the Stream's implementation
  • The reader interprets and passed state back to the ConnectorStateManager by observing the stream's current state.
  • The CDK is now "state aware" meaning that it actually reads stream state instead of just passing it back to the platform like a block box. I'll go into this in more detail below.

How

Some of the major design changes in the review are:

  • The new CheckpointReader class which is now the main way a stream determines the next partition of values to read. For incremental this continues to be partitions like time windows. For RFR, this is the next page of records. And for RFR this can be parent records for substreams or a single slice {}.
  • Deprecating IncrementalMixin in favor of StateMixin since state is used by RFR streams which are not incremental. This is a better name, but I kept the old one for backwards compatibility reasons
  • Changing the default Stream.stream_slices() implementation from [None] to [{}]. None is now the indicator to stop iterating over slices
  • Adding the supports_checkpointing field to streams. It’s needed for two areas. We need to surface this value to the catalog. AND we need this to be overridable because declarative low-code sources delineation for checkpointing differs from python sources.

This PR does not implement the work for a substream that requires resumable full refresh. I have a sketched out interface to see if its possible which it does appear to be. But substream state management for RFR becomes quite convoluted due to the issue I'll go into below.

Making the CDK and Stream class read directly from the connector managed state

This is arguably the most controversial DX and design change compared to before RFR. It has some impact on connector developers. In order for RFR to function using the current read_records() method, we need some way of communicating state from the the specific connector implementation back to the CDK. To do so, we expect the developer to manage Stream.state. And with RFR we now read the input to decide what to do next. For example:

if state:
{ "pageNumber": 23 }
then,
continue syncing.

if state:
{}
then,
stop syncing no more pages.

This is a relatively simple example, but it does illustrate that the developer needs to know to emit {} to stop RFR paging. In hubspot updating state is done with self.state = self.next_page_token(response) or {}. But it feels like not an ideal precedent that a developer needs to have a general awareness of how the checkpoint reader works in order to successfully implement RFR. And this is coupled with the CheckpointReader being mostly an internal CDK implementation detail that developers shouldn't need to think of.

As mentioned earlier, substream RFR streams which would need per-partition cursors requires much more careful reading of the state object for specific structure which is indicative that state as an unstructured map is not the right data type.

This is just for connectors that implement the legacy Python CDK, and since state is managed internally by low-code connectors, I think its fair to also say from the 80:20 rule, we are aiming for a more ideal interface for low-code which makes up a majority of our connectors vs legacy Python.

Alternative:

I did also look into incremental vs. RFR streams having different versions of the read_records() method. For incremental/full refresh, they would have the normal read_records() -> iterable. And for RFR read_records() -> iterable + updated stream state. This however felt like a step in the wrong direction because we're moving back into two different runtime flows depending on sync type.

A structured state class:

This feels like the more appropriate long term solution. If we have a state interface that handles interpreting state and explicitly communicate what to do with state and what the CDK should emit back to the platform. This is well outside what we can do for the project, but something that I thought about.

Review guide

  1. checkpoint_reader.py
  2. core.py
  3. declarative_stream.py
  4. abstract_source.py
  5. test files

User Impact

noop

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Copy link

vercel bot commented Apr 19, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 6, 2024 9:35pm

@@ -45,10 +52,10 @@ def package_name_from_class(cls: object) -> str:
raise ValueError(f"Could not find package name for class {cls}")


class IncrementalMixin(ABC):
"""Mixin to make stream incremental.
class StateMixin(ABC): # Rename to CheckpointMixin?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this mixin because state setter/getter are no longer specific to Incremental streams. But to retain backwards compatibility I left the old IncrementalMixin but it just inherits from this mixin

self._state = new_state

def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
return self._state or {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we call get_checkpoint() and the end of the last slice/page and at the end of the sync, we end up emitting the same final state twice. We can potentially insert more fields to track state internally within the reader, but i don't think its worth the hassle

def state(self, value: MutableMapping[str, Any]) -> None:
self._state = value

def read_records(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the implementation to get the 2 RFR streams working for Hubspot within the new interfaces to illustrate how to uptake the changes in the CDK.

But before merging I'll remove these from the PR

@brianjlai brianjlai marked this pull request as ready for review April 24, 2024 09:24
@brianjlai brianjlai requested review from lazebnyi, oustynova and a team as code owners April 24, 2024 09:24
@octavia-squidington-iv octavia-squidington-iv requested a review from a team April 24, 2024 09:25
@octavia-squidington-iii octavia-squidington-iii added the area/documentation Improvements or additions to documentation label Apr 30, 2024
@@ -0,0 +1,77 @@
# Resumable Full Refresh Streams
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@girarda fyi i added a docs for RFR instead of inlining comments for certain methods etc. I still need to proof grammar and clean it up a little but just a heads up since some PR comments called this out


except AttributeError:
state_manager.update_state_for_stream(self.name, self.namespace, stream_state)
# todo: This can be consolidated into one ConnectorStateManager.update_and_create_state_message() method, but I want
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a follow up issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abstractmethod
def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
"""
Retrieves the current state value of the stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment explaining that None means we stop reading?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant this for the checkpointReader.next(), since None in that case stops parsing, but I'll also comment here that we don't emit state messages if return is None either.

@octavia-squidington-iv octavia-squidington-iv requested a review from a team May 1, 2024 00:32
Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great! approved pending reverting the changes to hubspot

@brianjlai brianjlai merged commit d74125b into master May 6, 2024
32 checks passed
@brianjlai brianjlai deleted the resumable_full_refresh_python_cdk_new_interfaces branch May 6, 2024 22:41
@brianjlai
Copy link
Contributor Author

ran a few local connectors-ci runs of hubspot, a few low-code connectors to see that tests passed before the merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/documentation Improvements or additions to documentation CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants