Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,10 +1199,7 @@ impl RowGroupsPrunedParquetOpen {
predicate_cache_records,
baseline_metrics: prepared.baseline_metrics,
},
|mut state| async move {
let result = state.transition().await;
result.map(|r| (r, state))
},
|state| async move { state.transition().await },
)
.fuse();

Expand Down Expand Up @@ -1248,15 +1245,15 @@ impl PushDecoderStreamState {
/// fetched from the [`AsyncFileReader`] and fed back into the decoder.
/// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned.
/// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`).
async fn transition(&mut self) -> Option<Result<RecordBatch>> {
///
/// Takes `self` by value (rather than `&mut self`) so the generated future
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Makes esne to me

/// owns the state directly. This avoids a Stacked Borrows violation under
/// miri where `&mut self` creates a single opaque borrow that conflicts
/// with `unfold`'s ownership across yield points.
async fn transition(mut self) -> Option<(Result<RecordBatch>, Self)> {
loop {
match self.decoder.try_decode() {
Ok(DecodeResult::NeedsData(ranges)) => {
// IO (get_byte_ranges) and CPU (push_ranges) are still
// decoupled — they just can't live in a nested async block
// because that captures `&mut self` as one opaque borrow,
// which violates Stacked Borrows across the yield point.
// Inlining lets the compiler split the disjoint field borrows.
let data = self
.reader
.get_byte_ranges(ranges.clone())
Expand All @@ -1265,24 +1262,26 @@ impl PushDecoderStreamState {
match data {
Ok(data) => {
if let Err(e) = self.decoder.push_ranges(ranges, data) {
return Some(Err(DataFusionError::from(e)));
return Some((Err(DataFusionError::from(e)), self));
}
}
Err(e) => return Some(Err(e)),
Err(e) => return Some((Err(e), self)),
}
}
Ok(DecodeResult::Data(batch)) => {
let mut timer = self.baseline_metrics.elapsed_compute().timer();
self.copy_arrow_reader_metrics();
let result = self.project_batch(&batch);
timer.stop();
return Some(result);
// Release the borrow on baseline_metrics before moving self
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

drop(timer);
return Some((result, self));
}
Ok(DecodeResult::Finished) => {
return None;
}
Err(e) => {
return Some(Err(DataFusionError::from(e)));
return Some((Err(DataFusionError::from(e)), self));
}
}
}
Expand Down
Loading