Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions parquet/src/arrow/push_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,15 @@ impl ParquetPushDecoder {
pub fn buffered_bytes(&self) -> u64 {
self.state.buffered_bytes()
}

/// Clear any staged byte ranges currently buffered for future decode work.
///
/// This clears byte ranges still owned by the decoder's internal
/// `PushBuffers`. It does not affect any data that has already been handed
/// off to an active [`ParquetRecordBatchReader`].
pub fn clear_all_ranges(&mut self) {
self.state.clear_all_ranges();
}
}

/// Internal state machine for the [`ParquetPushDecoder`]
Expand Down Expand Up @@ -573,6 +582,20 @@ impl ParquetDecoderState {
ParquetDecoderState::Finished => 0,
}
}

/// Clear any staged ranges currently buffered in the decoder.
fn clear_all_ranges(&mut self) {
match self {
ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
} => remaining_row_groups.clear_all_ranges(),
ParquetDecoderState::DecodingRowGroup {
record_batch_reader: _,
remaining_row_groups,
} => remaining_row_groups.clear_all_ranges(),
ParquetDecoderState::Finished => {}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -665,6 +688,55 @@ mod test {
assert_eq!(all_output, *TEST_BATCH);
}

/// Releasing staged ranges should free speculative buffers without affecting
/// the active row group reader.
#[test]
fn test_decoder_clear_all_ranges() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_batch_size(100)
.build()
.unwrap();

decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
.unwrap();
assert_eq!(decoder.buffered_bytes(), test_file_len());

// The current row group reader is built from the prefetched bytes, but
// the speculative full-file range remains staged in the decoder.
let batch1 = expect_data(decoder.try_decode());
assert_eq!(batch1, TEST_BATCH.slice(0, 100));
assert_eq!(decoder.buffered_bytes(), test_file_len());

// All of the buffer is released
decoder.clear_all_ranges();
assert_eq!(decoder.buffered_bytes(), 0);

// The active reader still owns the current row group's bytes, so it can
// continue decoding without consulting PushBuffers.
let batch2 = expect_data(decoder.try_decode());
assert_eq!(batch2, TEST_BATCH.slice(100, 100));
assert_eq!(decoder.buffered_bytes(), 0);

// Moving to the next row group now requires the decoder to ask for data
// again because the staged speculative ranges were released.
let ranges = expect_needs_data(decoder.try_decode());
let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
push_ranges_to_decoder(&mut decoder, ranges);
assert_eq!(decoder.buffered_bytes(), num_bytes_requested);

let batch3 = expect_data(decoder.try_decode());
assert_eq!(batch3, TEST_BATCH.slice(200, 100));
assert_eq!(decoder.buffered_bytes(), 0);

let batch4 = expect_data(decoder.try_decode());
assert_eq!(batch4, TEST_BATCH.slice(300, 100));
assert_eq!(decoder.buffered_bytes(), 0);

expect_finished(decoder.try_decode());
}

/// Decode the entire file incrementally, simulating partial reads
#[test]
fn test_decoder_partial() {
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/arrow/push_decoder/reader_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ impl RowGroupReaderBuilder {
self.buffers.buffered_bytes()
}

/// Clear any staged ranges currently buffered for future decode work.
pub fn clear_all_ranges(&mut self) {
self.buffers.clear_all_ranges();
}

/// take the current state, leaving None in its place.
///
/// Returns an error if there the state wasn't put back after the previous
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/arrow/push_decoder/remaining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ impl RemainingRowGroups {
self.row_group_reader_builder.buffered_bytes()
}

/// Clear any staged ranges currently buffered for future decode work
pub fn clear_all_ranges(&mut self) {
self.row_group_reader_builder.clear_all_ranges();
}

/// returns [`ParquetRecordBatchReader`] suitable for reading the next
/// group of rows from the Parquet data, or the list of data ranges still
/// needed to proceed
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/util/push_buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ impl PushBuffers {
self.ranges = new_ranges;
self.buffers = new_buffers;
}

/// Clear all buffered ranges and their corresponding data
#[cfg(feature = "arrow")]
pub fn clear_all_ranges(&mut self) {
self.ranges.clear();
self.buffers.clear();
}
}

impl Length for PushBuffers {
Expand Down
Loading