Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid calling shutdown after failed write of AsyncWrite #11415

Merged
merged 1 commit into from
Jul 12, 2024

Conversation

joroKr21
Copy link
Contributor

in serialize_rb_stream_to_object_store

Which issue does this PR close?

Rationale for this change

Avoid "async fn resumed after completion" panics caused by trying to shutdown an errored writer.
See e.g. BufWriter in object_store which holds on to an underlying Future and doesn't transition the state on error. (https://github.com/apache/arrow-rs/blob/master/object_store/src/buffered.rs#L376-L379)

I tried to figure out from the documentation of AsyncWrite whether we should be able to call poll_shutdown after poll_write failed but I didn't manage to figure it out so I think it's better to be defensive here.

What changes are included in this PR?

We make the WriterType returned on error optional and omit it when the error originated from the writer itself.

Are these changes tested?

No, not sure how to reproduce it outside of real-world conditions, let alone test it 😢

Are there any user-facing changes?

No user-facing changes.

@github-actions github-actions bot added the core Core DataFusion crate label Jul 11, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @joroKr21 -- this change makes sense to me

cc @devinjdangelo as I think you are the expert in this area

@@ -50,7 +50,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
) -> std::result::Result<(WriterType, u64), (Option<WriterType>, DataFusionError)> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for this PR, but this signature is getting pretty gnarly. Maybe it is time to try and encapsulate some of this logic into structs 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rationale would be hard to discern only from the code. Could you please update the documentation to reflect the change of what is returned on error?

Specifically, I think it is that if there was an error on write, the writer is dropped so we don't accidentally write to it or try and close it. For any error not involving the writer, the writer is returned as well

cc @metesynnada as I remember at some point maybe you had a usecase for calling shutdown on a writer after error 🤔 I may be misremembering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I will update the docs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be good for this PR to describe in docs what is what in return megatype and later we can factor this out into separate strict type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #11443 to track cleaning up the megatype

@@ -50,7 +50,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
) -> std::result::Result<(WriterType, u64), (Option<WriterType>, DataFusionError)> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #11443 to track cleaning up the megatype

@alamb
Copy link
Contributor

alamb commented Jul 12, 2024

Thanks again @joroKr21 and @comphead

@alamb alamb merged commit d5367f3 into apache:main Jul 12, 2024
23 checks passed
@joroKr21 joroKr21 deleted the shutdown-after-error branch July 12, 2024 20:43
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jul 17, 2024
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jul 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants