Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 [airbyte-cdk] Fix bug where substreams depending on an RFR parent stream don't paginate or use existing state #40671

Merged
merged 14 commits into from
Jul 11, 2024

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Jul 2, 2024

What

We identified a bug related to RFR that was blocking low-code migrations and error fixes. The specific case was for a substream that depended on a parent stream that was configured to use RFR. The two issues were:

  • The parent RFR stream did not paginate beyond one page because paging was moved out of read_records() and into read(). This would lead to missing records because the substream would be missing parent records
  • When a parent stream was already synced or shared between substreams, the up to date state would cause no new records be retrieved.

How

There are two aspects to how we're solving the above problems. To solve the pagination issue, for both the low-code and Python CDKs, we need a new entrypoint to triggering a read that reuses the more complex pagination logic originally implemented for RFR streams. read_stateless() which is mostly a convenience method so that every place we invoke read() for substreams doesn't require creating a dummy configured catalog and connector state manager

As for the shared state issue, we've made a decision to push for all substreams to instantiate independent parent streams to avoid state collisions. This also should help us in a concurrent world where streams should be as independent as possible. This PR fix doesn't strictly address this aspect. See this PR for an example of independent substreams: https://github.com/airbytehq/airbyte/pull/39559/files

Tested against source-jira using RFR streams and substreams depending on RFR streams. The original CDK w/ the bug resulted in substreams that only had parent records from the first page. I've confirmed that each of the substreams had child records across the entire parent stream:

  • boards
  • board_issues
  • filters
  • filter_sharing
  • users
  • user_groups_detailed

Review guide

User Impact

Ideally none

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

…stead of using stream_slices() + read_records() which doesn't work with RFR
Copy link

vercel bot commented Jul 2, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jul 10, 2024 10:59pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Jul 2, 2024
@brianjlai brianjlai changed the title [airbyte-cdk] Fix bug where substreams depending on an RFR parent stream don't paginate or use existing state 🐛 [airbyte-cdk] Fix bug where substreams depending on an RFR parent stream don't paginate or use existing state Jul 2, 2024
if incremental_dependency:
self._parent_state[parent_stream.name] = parent_stream.state
# update the parent state, as parent stream read all record for current slice and state is already updated
if incremental_dependency:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one drawback is that we no longer checkpoint per slice like we used to, although not sure this was something we must retain given we lose some context going from read_records to read()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems fairly undesirable to me. Will we only checkpoint at the end of the sync?

naive question: could we instead keep the iteration on the stream_slices and expose a method to read a single, but complete slice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like in order to do this, we would have to track in the model_to_component_factory which stream is a parent and if it is a parent, avoid instantiating ResumableFullRefreshCursor. It feels possible to me but it would require passing a new parameter all the way to _merge_stream_slicers. This seems fair to me though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely yield state after every slice. Otherwise, we risk having stuck syncs, where some transient error will result in a failed sync without any progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, alex and i discussed a bit on wednesday, it's a bit hacky, but one way we can do this is summarized in the above comment. By inspecting the associated_slice we can tell if we moved onto the next slice if it changes and checkpoint + emit the current set of records as mentioned in the new comment above.

If this approach I implemented seems too crazy or prone to failure, then @maxi297 's suggestion to just not use RFR might be reasonable. Although the drawback is that we have effectively two different implementations of the parent stream at runtime. Since incremental_dependency will switch our the cursor for substream, it's a small gotcha

airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager)
yield airbyte_state_message

def read_stateless( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies
self,
connector_state_manager=None,
Copy link
Contributor Author

@brianjlai brianjlai Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something of a frustrating pattern we have. I can't actually instantiate a ConnectorStateManager instance within this method because I can't import it due to a circular dependency between ConnectorStateManager and the Stream class.

This is starting poke at us a bit and become an annoying pattern so we may want to see if we want to refactor or adjust the code in connector_state_manager.py to not reference the Stream class. But for now, not sure of a good way to avoid this parameter.

And the alternative is we pass in None with the existing code and we make sure to not attempt any connector_state_manager operations within core.py if state_manager is None as the code currently has

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move or copy this comment to the code so we have a trace. that's a pretty big gotcha

@brianjlai brianjlai requested review from girarda and a team July 3, 2024 00:03
@brianjlai brianjlai marked this pull request as ready for review July 3, 2024 00:03
@brianjlai brianjlai requested a review from a team as a code owner July 3, 2024 00:03
@brianjlai brianjlai requested a review from maxi297 July 3, 2024 00:06
@girarda girarda requested a review from lazebnyi July 3, 2024 03:18
airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager)
yield airbyte_state_message

if internal_config.is_limit_reached(record_counter):
break
self._observe_state(checkpoint_reader)
checkpoint_state = checkpoint_reader.get_checkpoint()
if checkpoint_state is not None:
if state_manager and checkpoint_state is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could refactor this predicate in a method _should_checkpoint to ensure the logic stays aligned with line 215

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given it's simplicity of the predicate, how about we just define a variable at the beginning called should_checkpoint and we pass that in, subtle difference to what we have but still conveys the same idea that each condition is based on a single predicate defined at the beginning

if incremental_dependency:
self._parent_state[parent_stream.name] = parent_stream.state
# update the parent state, as parent stream read all record for current slice and state is already updated
if incremental_dependency:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems fairly undesirable to me. Will we only checkpoint at the end of the sync?

naive question: could we instead keep the iteration on the stream_slices and expose a method to read a single, but complete slice?

Copy link
Contributor

@ChristoGrab ChristoGrab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai I won't merge yet in case there's any last changes still to be made, but approving to unblock you when you're ready. I was able to test the behavior using the builder server locally, substreams that were previously stuck on the first parent page are now returning records from multiple parent slices 🙌

@brianjlai
Copy link
Contributor Author

@ChristoGrab awesome thanks for confirming!

Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great @brianjlai ! :shipit:

stream_slices_for_parent.append(
StreamSlice(partition={partition_field: partition_value, "parent_slice": parent_partition}, cursor_slice={})
continue
elif isinstance(parent_record, Record):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we raise an exception if parent_record is neither an AirbyteMessage nor a Record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are effectively calling the top level read() command which allows for StreamData which could be a Mapping, I think we also need to account for that type, but if not those 3 (record, message, mapping), then we can throw the error. will add

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will we map the record to the slice when parent_record is a Mapping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't in the end. granted like we discussed above, its only really for custom components thats don't return the right Record interface, but for a majority of our cases this should not be an issue

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of questions but I think there shouldn't be much left before this gets approved


stream_slices_for_parent = []
previous_associated_slice = None
for parent_record in parent_stream.read_stateless():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation has the constraint that the parent stream can't be concurrent as this code assumes the slices are consumed one after the other. I think that is a fair constraint in order for us to unblock a couple of things we are working on but I think we need to acknowledge that this might introduce debt that we will need to pay once we move to concurrent. For me, this is worth it as this will unblock a couple of sources to update to the latest CDK version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with everything here, we're putting dependency on slices being ordered, but this is still a fundamental problem that already exists in the legacy CDK? Since concurrent doesn't support substream parallelization, we're not worse off than we were before the fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since concurrent doesn't support substream parallelization, we're not worse off than we were before the fix

It doesn't support it but I don't think there is code in the concurrent part that assumes that. It's just that we haven't implemented a PartitionGenerator that consumes a parent concurrently.

The point I want to make is that if we start having streams where stream.read_stateless() is concurrent, we will need to:

  • Implement the concurrent part in read_stateless
  • Know where the code assumes read_stateless is not concurrent and fix it

The first part is fine as whatever we do, we will need to do it. However, the second part is a bit more challenging as it is not explicit and I'm not sure we have tests to signal that this will break if it is concurrent so we rely solely on the fact that we will hopefully thing about it. This feels dangerous to me although I don't have anything better to propose so what I'm trying to do is socialize this problem so that you can also raise the flag if this happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see what you mean. its not a solution, but i can at least add some comments in the code to make it aware that that concurrent substreams will require additional investigation to find a solution

)

# iterate over all parent stream_slices
for stream_slice in parent_stream_slices:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see cases where some sources use stream_slices and read_record afterward (like this). Are we fine with those because they don't rely on low-code so they shouldn't be affected by the RFR cursor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point and that's correct. This bug was only for low-code because all non-incremental and non-substreams for low-code automatically turned on RFR. whereas for python based sources streams are currently opt in based on a code change. I think we can live with this for now, but as we look into implementing auto-rfr for concurrent/python then we need to be aware.

@@ -232,7 +232,7 @@ def test_substream_without_input_state():

stream_instance = test_source.streams({})[1]

stream_slice = StreamSlice(partition={"parent_id": "1"},
stream_slice = StreamSlice(partition={},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a breaking change. Why has this changed?

Copy link
Contributor Author

@brianjlai brianjlai Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 ah yes sorry i should've added a PR comment ahead of time for this. It's a bit of a tale.

So we mocked output for the parent stream's read_records() method (it overwrites _read_pages() but basically equates to the response used by the SubstreamPartitionRouter's call to parent_stream.read_records().

The problem here is that this stream_slice here is actually defined incorrectly. I dug into the code and in the dependent incremental parent stream, when we call stream_slices() which returns StreamSlices where there is a time window for cursor_value and no partition. Which is expected since its just a plain incremental stream.

This only went uncaught because in the old implementation of SubstreamPartitionRouter.stream_slices(), we assign the parent_slice of the resulting StreamSlices by calling parent_stream.stream_slices() and extracting parent_stream_slice.partition. This correctly returned:

StreamSlice(partition={}, cursor_slice={"start_time": "2022-01-01", "end_time": "2022-01-31"})

and this would get added to the resulting final slices in parent_slice.

However, with the change where we no longer get parent records by calling parent_stream.stream_slices() + parent_stream.read_records() in favor of the higher level read_stateless(), we now have to inspect the record's associated_slice to populate parent_slice.

Record({"id": "1", CURSOR_FIELD: "2022-01-15"}, StreamSlice(partition={"parent_id": 1},
                               cursor_slice={"start_time": "2022-01-01", "end_time": "2022-01-31"}))

In the mocked parent record output, by accidentally populating partition in the slice, we were breaking this test because the new SubstreamPartitionRouter.stream_slices() relies on inspecting record.associated_slice

TLDR: we were incorrectly populating the partition field in the mocked records which caused the test to fail. So I fixed the incorrect mocked input which was only apparent w/ this new change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a journey! Thanks to your detailed comment, I get it now. I think the test setup confused me a bit even though it is appropriate. Can we add a comment over the patch.object to make it explicit that it will mock the parent stream Rates HTTP responses? Or rename stream_slice to parent_stream_slice? If you see any wait to make the test more explicit as well, feel free to chime in!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep i'll add a comment. it'll definitely be helpful given even i had to refresh myself yesterday for why i did this a week ago

)

expected_counter = 0
for actual_slice in partition_router.stream_slices():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This seems a bit weird to me. As a user, it means that I can't call list(partition_router.stream_slices()) else it would update the state even though the slice has not been consumed. Should we document somewhere that we assume the processing of a slice is expected to be done before the user call next when incremental_dependency = True?

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @brianjlai for diligently answering all my questions and educating me. I'm good with this change!

@brianjlai brianjlai merged commit 9e23b3f into master Jul 11, 2024
32 checks passed
@brianjlai brianjlai deleted the brian/rfr_fix_substream_depends_on_rfr_parent branch July 11, 2024 06:53
xiaohansong pushed a commit that referenced this pull request Jul 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[connector-builder] Substream Only Fetches Records from the First Page of Parent Stream
6 participants