Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-based CDK: make incremental syncs concurrent #34540

Merged
merged 1 commit into from
Feb 8, 2024

Conversation

clnoll
Copy link
Contributor

@clnoll clnoll commented Jan 25, 2024

Provides the ability for connector developers to make incremental syncs concurrent, by using the FileBasedConcurrentCursor.

This has largely the same logic as the DefaultFileBasedCursor, with the exception of the cursor value. In DefaultFileBasedCursor, the cursor is set to the most recently synced file. Because we sync files in order, this is an appropriate value. To avoid an edge case condition where a file with an older last-modified finishes uploading between syncs, we don't simply pick back up from this cursor value during the next sync, but look back some amount of time.

In the FileBasedConcurrentCursor, because we don't know what order the files are being synced, we have to keep track of the low water mark instead - the time before which all other files have been synced. To do this, we keep track of all pending files. When a file has been synced, we pop it off of the list of pending files. This way, to determine the low water mark, we can simply look at the oldest pending file. Note: in cases where the sync doesn't complete, this means the cursor value will not be in history (because it's not put into history until it's popped out of the pending list). However, since we still maintain the behavior where we check to see if the file is in history before deciding whether to sync it based on the cursor value, we will not skip the file.

Recommended Reading Order

  1. airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py
  2. airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
  3. airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

TODO

  • Test against S3.

@clnoll clnoll requested a review from a team as a code owner January 25, 2024 20:08
Copy link

vercel bot commented Jan 25, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Feb 8, 2024 1:15am

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit connectors/source/s3 labels Jan 25, 2024
Copy link
Contributor

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@octavia-squidington-iii octavia-squidington-iii removed the area/connectors Connector related issues label Jan 25, 2024
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the file_based_concurrent_cursor code makes sense but I would like to check it back later with some added context

expected_cursor_value: str,
):
cursor = _make_cursor(initial_state)
cursor._pending_files = {uri: RemoteFile(uri=uri, last_modified=datetime.strptime(timestamp, DATE_TIME_FORMAT)) for uri, timestamp in pending_files}
Copy link
Contributor

@maxi297 maxi297 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like tests that sets up internal parameters and assert them because:

  • It does not describe very well how the user would use the object and what he can expect from it as they should not use internal fields. As a reviewer, there is a relatively high number of lines added in the PR so I wanted to see how this code would be used by checking the tests and I'm still unsure as to how it should be used when I check this
  • It generates test that if they are red, it doesn't mean that the behavior that it test is not working anymore. For example, if we change the internals of cursor because we want to add a feature, this test might break even though the behavior still works fine

Me not liking this should not be a reason to change that but do the reasons make sense to you? If so, how can we change that so that it represents user usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it address your concern if I called set_pending_files instead of setting cursor._pending_files?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it represents the user usage, yes! I would also challenge the assertions though as the user would probably not call cursor._pending_files as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also challenge the assertions though as the user would probably not call cursor._pending_files as well

True, but I think I'm okay with that in this case because the assertion is there to ensure that the internal state has been set up appropriately when add_file is called.

) -> "FileBasedStreamFacade":
"""
Create a ConcurrentStream from a FileBasedStream object.
"""
pk = get_primary_key_from_stream(stream.primary_key)
cursor_field = get_cursor_field_from_stream(stream)
stream.cursor = cursor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of question regarding this line:

  • AbstractFileBasedStream does not seem to have a cursor field. Am I missing something?
  • Could stream.cursor be set at the init time of the stream? It feels like this makes the stream stateful which adds complexity to the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely could (should?) init AbstractFileBasedStream with a cursor, but with the way the code is now it would effectively still be None.

The problem here is that the ConnectionStateManager requires us to give it a mapping of stream name to stream (on init), but the cursor requires a connection state manager. So we can either init the stream with the cursor and set the state manager on the cursor separately, or init the cursor with the state manager and set it on the stream separately. Do you think it would be preferable to set the state manager on the cursor separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's unfortunate.

looking at how the connection state manager uses the stream name to instance mapping, I think it only needs an AirbyteStream, not the actual stream instance so I think the path forward should be

  1. Modify the state manager so it depends on AirbyteStreams instead of on Streams
  2. Instantiate the state manager before instantiating the cursor
  3. Instantiate the cursor from the state manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@girarda that doesn't quite work either because we get the AirbyteStreams from the catalog, but we don't always have a catalog - e.g. during check.

It looks like the only pieces of information the state manager actually needs are the name and namespace, which I can get off of the config. WDYT about that modification? My worry is that we might eventually want/need other info in the ConnectionStateManager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the namespace also defined in the configured_catalog?

the check operation shouldn't need the namespace or the state manager. maybe it should only be instantiated for reads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean having different behavior in the streams method depending on who the caller is, which I don't love. But that seems like a reasonable tradeoff to be able to init the stream with a cursor.

self._cursor.set_pending_partitions(pending_partitions)
if not pending_partitions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a concurrent concern that is shared and hence should be fixed for every concurrent cases by the action item from this discussion. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me, I'll keep an eye out for your change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be removed following #34605?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good call. Removed.

@octavia-squidington-iii octavia-squidington-iii added the area/connectors Connector related issues label Jan 29, 2024
Base automatically changed from fcdk-with-ccdk to master January 30, 2024 00:33
@octavia-squidington-iii octavia-squidington-iii removed the area/connectors Connector related issues label Feb 1, 2024
@clnoll clnoll force-pushed the fcdk-with-ccdk-incremental branch 2 times, most recently from a3cdfdf to 8a99006 Compare February 1, 2024 20:37
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the PR, I can't spot any logical issues so I think I'm fine with approving the PR as is. However, I feel we should be going in a different direction eventually. My concerns are:

  • Given the lock on set_pending_partitions and the fact that it supports only one call, it seems like the partition generation for a specific stream/cursor can't be concurrent (not on the horizon now but this would be an added blocker to enabling that)
  • There is a lot of work to maintain _ab_source_file_last_modified which I don't see much value in except from helping debug
  • The cursor basically seems like per-partition with a limit of partitions (correct me if I'm wrong). If we were to implement this in a generic manner, it feels like many sources could leverage this

def close_partition(self, partition: "FileBasedStreamPartition") -> None:
pass

def set_pending_partitions(self, partitions: Dict[str, "FileBasedStreamPartition"]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The type here does not match the parent

self._cursor.set_pending_partitions(pending_partitions)
if not pending_partitions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be removed following #34605?

Copy link
Contributor Author

@clnoll clnoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the lock on set_pending_partitions and the fact that it supports only one call, it seems like the partition generation for a specific stream/cursor can't be concurrent (not on the horizon now but this would be an added blocker to enabling that)
This is an interesting point, but as of right now file-based streams don't support substreams and I'm not sure what other scenario we'd want to have the partition generation be concurrent. So I'm okay punting until we need it, especially since this implementation is pretty consistent with the non-concurrent cursor.

There is a lot of work to maintain _ab_source_file_last_modified which I don't see much value in except from helping debug
This actually has another role: as the sync progresses it gets bumped up when all files before it have been synced. If history is full because we synced a bunch of newer files first, but there are a bunch of older files that haven't been synced yet, the cursor value should reflect the fact that we need to go back and sync those older files.

The cursor basically seems like per-partition with a limit of partitions (correct me if I'm wrong). If we were to implement this in a generic manner, it feels like many sources could leverage this
Interesting thought, I could see that.

self._cursor.set_pending_partitions(pending_partitions)
if not pending_partitions:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good call. Removed.

@clnoll clnoll merged commit e8910e4 into master Feb 8, 2024
22 checks passed
@clnoll clnoll deleted the fcdk-with-ccdk-incremental branch February 8, 2024 01:41
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 21, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit connectors/source/s3
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants