From 7ca6cc24e2c4106e503db6e7ad5b42d2dbc20847 Mon Sep 17 00:00:00 2001 From: Socrates Date: Sat, 4 Oct 2025 13:31:29 +0800 Subject: [PATCH 1/6] init mvp --- examples/row_selection_example.rs | 147 ++++++++ src/array_decoder/mod.rs | 85 ++++- src/arrow_reader.rs | 50 ++- src/lib.rs | 2 + src/row_selection.rs | 559 ++++++++++++++++++++++++++++++ 5 files changed, 836 insertions(+), 7 deletions(-) create mode 100644 examples/row_selection_example.rs create mode 100644 src/row_selection.rs diff --git a/examples/row_selection_example.rs b/examples/row_selection_example.rs new file mode 100644 index 0000000..73324f2 --- /dev/null +++ b/examples/row_selection_example.rs @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Example demonstrating how to use RowSelection to skip rows when reading ORC files + +use std::fs::File; +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use orc_rust::arrow_reader::ArrowReaderBuilder; +use orc_rust::arrow_writer::ArrowWriterBuilder; +use orc_rust::row_selection::{RowSelection, RowSelector}; + +fn main() -> Result<(), Box> { + // Step 1: Create a sample ORC file with 100 rows + println!("Creating sample ORC file..."); + let file_path = "/tmp/row_selection_example.orc"; + create_sample_orc_file(file_path)?; + + // Step 2: Read the file without row selection (baseline) + println!("\n=== Reading all rows (no selection) ==="); + let file = File::open(file_path)?; + let reader = ArrowReaderBuilder::try_new(file)?.build(); + let mut total_rows = 0; + for batch in reader { + let batch = batch?; + total_rows += batch.num_rows(); + } + println!("Total rows read: {}", total_rows); + + // Step 3: Read with row selection - skip first 30 rows, select next 40, skip rest + println!("\n=== Reading with row selection ==="); + let file = File::open(file_path)?; + + // Create a selection: skip 30, select 40, skip 30 + let selection = vec![ + RowSelector::skip(30), + RowSelector::select(40), + RowSelector::skip(30), + ] + .into(); + + let reader = ArrowReaderBuilder::try_new(file)? + .with_row_selection(selection) + .build(); + + let mut selected_rows = 0; + let mut batches = Vec::new(); + for batch in reader { + let batch = batch?; + selected_rows += batch.num_rows(); + batches.push(batch); + } + + println!("Total rows selected: {}", selected_rows); + println!("Expected: 40, Actual: {}", selected_rows); + + // Display some of the selected data + if let Some(first_batch) = batches.first() { + let id_col = first_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let name_col = first_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + println!("\nFirst 5 selected rows:"); + for i in 0..5.min(first_batch.num_rows()) { + println!( + " id: {}, name: {}", + id_col.value(i), + name_col.value(i) + ); + } + } + + // Step 4: Read with multiple non-consecutive selections + println!("\n=== Reading with multiple selections ==="); + let file = File::open(file_path)?; + + // Select rows 10-20 and 60-70 + let selection = RowSelection::from_consecutive_ranges( + vec![10..20, 60..70].into_iter(), + 100, + ); + + let reader = ArrowReaderBuilder::try_new(file)? + .with_row_selection(selection) + .build(); + + let mut selected_rows = 0; + for batch in reader { + let batch = batch?; + selected_rows += batch.num_rows(); + } + + println!("Total rows selected: {}", selected_rows); + println!("Expected: 20 (10 from each range)"); + + println!("\n✓ Row selection example completed successfully!"); + + Ok(()) +} + +fn create_sample_orc_file(path: &str) -> Result<(), Box> { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + // Create 100 rows + let ids: ArrayRef = Arc::new(Int32Array::from((0..100).collect::>())); + let names: ArrayRef = Arc::new(StringArray::from( + (0..100) + .map(|i| format!("name_{}", i)) + .collect::>(), + )); + + let batch = RecordBatch::try_new(schema.clone(), vec![ids, names])?; + + let file = File::create(path)?; + let mut writer = ArrowWriterBuilder::new(file, schema).try_build()?; + writer.write(&batch)?; + writer.close()?; + + Ok(()) +} + diff --git a/src/array_decoder/mod.rs b/src/array_decoder/mod.rs index 00db5ff..ff3f2ce 100644 --- a/src/array_decoder/mod.rs +++ b/src/array_decoder/mod.rs @@ -238,6 +238,8 @@ pub struct NaiveStripeDecoder { index: usize, batch_size: usize, number_of_rows: usize, + row_selection: Option, + selection_index: usize, } impl Iterator for NaiveStripeDecoder { @@ -245,11 +247,61 @@ impl Iterator for NaiveStripeDecoder { fn next(&mut self) -> Option { if self.index < self.number_of_rows { - let record = self - .decode_next_batch(self.number_of_rows - self.index) - .transpose()?; - self.index += self.batch_size; - Some(record) + // Handle row selection if present + if self.row_selection.is_some() { + // Process selectors until we find rows to select or exhaust the selection + loop { + let selector_info = { + let selection = self.row_selection.as_ref().unwrap(); + let selectors = selection.selectors(); + if self.selection_index >= selectors.len() { + return None; + } + let selector = selectors[self.selection_index]; + (selector.skip, selector.row_count) + }; + + let (is_skip, row_count) = selector_info; + + if is_skip { + // Skip these rows by advancing the index + self.index += row_count; + self.selection_index += 1; + + // Decode and discard the skipped rows to advance the internal decoders + if let Err(e) = self.skip_rows(row_count) { + return Some(Err(e)); + } + } else { + // Select these rows + let rows_to_read = row_count.min(self.batch_size); + let remaining = self.number_of_rows - self.index; + let actual_rows = rows_to_read.min(remaining); + + if actual_rows == 0 { + self.selection_index += 1; + continue; + } + + let record = self.decode_next_batch(actual_rows).transpose()?; + self.index += actual_rows; + + // Update selector to track progress + if actual_rows >= row_count { + self.selection_index += 1; + } + + return Some(record); + } + } + } else { + // No row selection - decode normally + let record = self + .decode_next_batch(self.number_of_rows - self.index) + .transpose()?; + self.index += self.batch_size; + Some(record) + } } else { None } @@ -433,6 +485,15 @@ impl NaiveStripeDecoder { } pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result { + Self::new_with_selection(stripe, schema_ref, batch_size, None) + } + + pub fn new_with_selection( + stripe: Stripe, + schema_ref: SchemaRef, + batch_size: usize, + row_selection: Option, + ) -> Result { let number_of_rows = stripe.number_of_rows(); let decoders = stripe .columns() @@ -448,6 +509,20 @@ impl NaiveStripeDecoder { index: 0, batch_size, number_of_rows, + row_selection, + selection_index: 0, }) } + + /// Skip the specified number of rows by decoding and discarding them + fn skip_rows(&mut self, count: usize) -> Result<()> { + // Decode in batches to avoid large memory allocations + let mut remaining = count; + while remaining > 0 { + let chunk = self.batch_size.min(remaining); + let _ = self.inner_decode_next_batch(chunk)?; + remaining -= chunk; + } + Ok(()) + } } diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index fd0eb53..b51db47 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -28,6 +28,7 @@ use crate::error::Result; use crate::projection::ProjectionMask; use crate::reader::metadata::{read_metadata, FileMetadata}; use crate::reader::ChunkReader; +use crate::row_selection::RowSelection; use crate::schema::RootDataType; use crate::stripe::{Stripe, StripeMetadata}; @@ -40,6 +41,7 @@ pub struct ArrowReaderBuilder { pub(crate) projection: ProjectionMask, pub(crate) schema_ref: Option, pub(crate) file_byte_range: Option>, + pub(crate) row_selection: Option, } impl ArrowReaderBuilder { @@ -51,6 +53,7 @@ impl ArrowReaderBuilder { projection: ProjectionMask::all(), schema_ref: None, file_byte_range: None, + row_selection: None, } } @@ -79,6 +82,33 @@ impl ArrowReaderBuilder { self } + /// Set a [`RowSelection`] to filter rows + /// + /// The [`RowSelection`] specifies which rows should be decoded from the ORC file. + /// This can be used to skip rows that don't match predicates, reducing I/O and + /// improving query performance. + /// + /// # Example + /// + /// ```no_run + /// # use std::fs::File; + /// # use orc_rust::arrow_reader::ArrowReaderBuilder; + /// # use orc_rust::row_selection::{RowSelection, RowSelector}; + /// let file = File::open("data.orc").unwrap(); + /// let selection = vec![ + /// RowSelector::skip(100), + /// RowSelector::select(50), + /// ].into(); + /// let reader = ArrowReaderBuilder::try_new(file) + /// .unwrap() + /// .with_row_selection(selection) + /// .build(); + /// ``` + pub fn with_row_selection(mut self, row_selection: RowSelection) -> Self { + self.row_selection = Some(row_selection); + self + } + /// Returns the currently computed schema /// /// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically @@ -124,6 +154,7 @@ impl ArrowReaderBuilder { schema_ref, current_stripe: None, batch_size: self.batch_size, + row_selection: self.row_selection, } } } @@ -133,6 +164,7 @@ pub struct ArrowReader { schema_ref: SchemaRef, current_stripe: Option> + Send>>, batch_size: usize, + row_selection: Option, } impl ArrowReader { @@ -146,8 +178,22 @@ impl ArrowReader { let stripe = self.cursor.next().transpose()?; match stripe { Some(stripe) => { - let decoder = - NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size)?; + // Split off the row selection for this stripe + let stripe_rows = stripe.number_of_rows(); + let selection = self.row_selection.as_mut().and_then(|s| { + if s.row_count() > 0 { + Some(s.split_off(stripe_rows)) + } else { + None + } + }); + + let decoder = NaiveStripeDecoder::new_with_selection( + stripe, + self.schema_ref.clone(), + self.batch_size, + selection, + )?; self.current_stripe = Some(Box::new(decoder)); self.next().transpose() } diff --git a/src/lib.rs b/src/lib.rs index 224ba2f..4933fec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,6 +61,7 @@ pub mod projection; #[allow(dead_code)] mod proto; pub mod reader; +pub mod row_selection; pub mod schema; pub mod statistics; pub mod stripe; @@ -70,3 +71,4 @@ pub use arrow_reader::{ArrowReader, ArrowReaderBuilder}; pub use arrow_writer::{ArrowWriter, ArrowWriterBuilder}; #[cfg(feature = "async")] pub use async_arrow_reader::ArrowStreamReader; +pub use row_selection::{RowSelection, RowSelector}; diff --git a/src/row_selection.rs b/src/row_selection.rs new file mode 100644 index 0000000..0f8abf1 --- /dev/null +++ b/src/row_selection.rs @@ -0,0 +1,559 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row selection for ORC files +//! +//! This module provides [`RowSelection`] and [`RowSelector`] types for +//! efficiently skipping rows when scanning ORC files. + +use arrow::array::{Array, BooleanArray}; +use std::cmp::Ordering; +use std::ops::Range; + +/// [`RowSelector`] represents a consecutive range of rows to either select or skip +/// when scanning an ORC file. +/// +/// A [`RowSelector`] is a building block of [`RowSelection`]. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct RowSelector { + /// The number of rows + pub row_count: usize, + + /// If true, skip `row_count` rows; otherwise select them + pub skip: bool, +} + +impl RowSelector { + /// Create a selector to select `row_count` rows + pub fn select(row_count: usize) -> Self { + Self { + row_count, + skip: false, + } + } + + /// Create a selector to skip `row_count` rows + pub fn skip(row_count: usize) -> Self { + Self { + row_count, + skip: true, + } + } +} + +/// [`RowSelection`] allows selecting or skipping rows when scanning an ORC file. +/// +/// This is applied prior to reading column data, and can therefore be used to +/// skip IO to fetch data into memory, improving query performance. +/// +/// A typical use-case would be using ORC stripe statistics or file-level +/// indexes to filter out rows that don't satisfy a predicate. +/// +/// # Example +/// +/// ``` +/// use orc_rust::row_selection::{RowSelection, RowSelector}; +/// +/// // Create selectors: skip 100 rows, select 50, skip 200 +/// let selectors = vec![ +/// RowSelector::skip(100), +/// RowSelector::select(50), +/// RowSelector::skip(200), +/// ]; +/// +/// let selection: RowSelection = selectors.into(); +/// +/// // Query properties +/// assert_eq!(selection.row_count(), 350); +/// assert_eq!(selection.selects_any(), true); +/// ``` +/// +/// A [`RowSelection`] maintains the following invariants: +/// +/// * It contains no [`RowSelector`] with 0 rows +/// * Consecutive [`RowSelector`]s alternate between skipping and selecting rows +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub struct RowSelection { + selectors: Vec, +} + +impl RowSelection { + /// Create a new empty [`RowSelection`] + pub fn new() -> Self { + Self::default() + } + + /// Create a [`RowSelection`] from a slice of [`BooleanArray`] + /// + /// # Panics + /// + /// Panics if any of the [`BooleanArray`] contain nulls + pub fn from_filters(filters: &[BooleanArray]) -> Self { + let mut next_offset = 0; + let total_rows = filters.iter().map(|x| x.len()).sum(); + + let iter = filters.iter().flat_map(|filter| { + let offset = next_offset; + next_offset += filter.len(); + assert_eq!(filter.null_count(), 0, "filter arrays must not contain nulls"); + + // Find consecutive ranges of true values + let mut ranges = vec![]; + let mut start = None; + for (idx, value) in filter.iter().enumerate() { + match (value, start) { + (Some(true), None) => start = Some(idx), + (Some(false), Some(s)) | (None, Some(s)) => { + ranges.push(s + offset..idx + offset); + start = None; + } + _ => {} + } + } + if let Some(s) = start { + ranges.push(s + offset..filter.len() + offset); + } + ranges + }); + + Self::from_consecutive_ranges(iter, total_rows) + } + + /// Create a [`RowSelection`] from an iterator of consecutive ranges to keep + /// + /// # Arguments + /// + /// * `ranges` - Iterator of consecutive ranges (e.g., `10..20`, `30..40`) + /// * `total_rows` - Total number of rows in the stripe/file + /// + /// # Example + /// + /// ``` + /// use orc_rust::row_selection::RowSelection; + /// + /// // Select rows 10-19 and 30-39 out of 50 total rows + /// let selection = RowSelection::from_consecutive_ranges( + /// vec![10..20, 30..40].into_iter(), + /// 50 + /// ); + /// ``` + pub fn from_consecutive_ranges>>( + ranges: I, + total_rows: usize, + ) -> Self { + let mut selectors: Vec = Vec::with_capacity(ranges.size_hint().0); + let mut last_end = 0; + + for range in ranges { + let len = range.end - range.start; + if len == 0 { + continue; + } + + match range.start.cmp(&last_end) { + Ordering::Equal => { + // Extend the last selector + match selectors.last_mut() { + Some(last) if !last.skip => { + last.row_count = last.row_count.checked_add(len).unwrap() + } + _ => selectors.push(RowSelector::select(len)), + } + } + Ordering::Greater => { + // Add a skip selector for the gap, then a select selector + selectors.push(RowSelector::skip(range.start - last_end)); + selectors.push(RowSelector::select(len)); + } + Ordering::Less => { + panic!("ranges must be provided in order and must not overlap") + } + } + last_end = range.end; + } + + // Add final skip if we didn't cover all rows + if last_end < total_rows { + selectors.push(RowSelector::skip(total_rows - last_end)); + } + + Self { selectors } + } + + /// Create a [`RowSelection`] that selects all `row_count` rows + pub fn select_all(row_count: usize) -> Self { + if row_count == 0 { + return Self::default(); + } + Self { + selectors: vec![RowSelector::select(row_count)], + } + } + + /// Create a [`RowSelection`] that skips all `row_count` rows + pub fn skip_all(row_count: usize) -> Self { + if row_count == 0 { + return Self::default(); + } + Self { + selectors: vec![RowSelector::skip(row_count)], + } + } + + /// Returns the total number of rows (selected + skipped) + pub fn row_count(&self) -> usize { + self.selectors.iter().map(|s| s.row_count).sum() + } + + /// Returns the number of selected rows + pub fn selected_row_count(&self) -> usize { + self.selectors + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum() + } + + /// Returns the number of skipped rows + pub fn skipped_row_count(&self) -> usize { + self.selectors + .iter() + .filter(|s| s.skip) + .map(|s| s.row_count) + .sum() + } + + /// Returns true if this selection selects any rows + pub fn selects_any(&self) -> bool { + self.selectors.iter().any(|s| !s.skip) + } + + /// Returns an iterator over the [`RowSelector`]s + pub fn iter(&self) -> impl Iterator { + self.selectors.iter() + } + + /// Returns a slice of the underlying [`RowSelector`]s + pub fn selectors(&self) -> &[RowSelector] { + &self.selectors + } + + /// Splits off the first `row_count` rows from this [`RowSelection`] + /// + /// Returns a new [`RowSelection`] containing the first `row_count` rows, + /// and updates `self` to contain the remaining rows. + /// + /// # Example + /// + /// ``` + /// use orc_rust::row_selection::{RowSelection, RowSelector}; + /// + /// let mut selection = RowSelection::from_consecutive_ranges( + /// vec![10..20, 30..40].into_iter(), + /// 50 + /// ); + /// + /// let first = selection.split_off(25); + /// assert_eq!(first.row_count(), 25); + /// assert_eq!(selection.row_count(), 25); + /// ``` + pub fn split_off(&mut self, row_count: usize) -> Self { + let mut total_count = 0; + + // Find the index where the selector exceeds the row count + let find = self.selectors.iter().position(|selector| { + total_count += selector.row_count; + total_count > row_count + }); + + let split_idx = match find { + Some(idx) => idx, + None => { + // Return all selectors if row_count exceeds total + let selectors = std::mem::take(&mut self.selectors); + return Self { selectors }; + } + }; + + let mut remaining = self.selectors.split_off(split_idx); + + // Split the selector that crosses the boundary + let next = remaining.first_mut().unwrap(); + let overflow = total_count - row_count; + + if next.row_count != overflow { + self.selectors.push(RowSelector { + row_count: next.row_count - overflow, + skip: next.skip, + }); + } + next.row_count = overflow; + + std::mem::swap(&mut remaining, &mut self.selectors); + Self { + selectors: remaining, + } + } + + /// Combine two [`RowSelection`]s using logical AND + /// + /// Returns a new [`RowSelection`] representing rows that are selected + /// in both input selections. + /// + /// # Panics + /// + /// Panics if `other` does not have a length equal to the number of rows + /// selected by this RowSelection + pub fn and_then(&self, other: &Self) -> Self { + let mut selectors = vec![]; + let mut first = self.selectors.iter().cloned().peekable(); + let mut second = other.selectors.iter().cloned().peekable(); + + let mut to_skip = 0; + while let Some(b) = second.peek_mut() { + let a = first + .peek_mut() + .expect("selection exceeds the number of selected rows"); + + if b.row_count == 0 { + second.next().unwrap(); + continue; + } + + if a.row_count == 0 { + first.next().unwrap(); + continue; + } + + if a.skip { + // Records were skipped when producing second + to_skip += a.row_count; + first.next().unwrap(); + continue; + } + + let skip = b.skip; + let to_process = a.row_count.min(b.row_count); + + a.row_count -= to_process; + b.row_count -= to_process; + + match skip { + true => to_skip += to_process, + false => { + if to_skip != 0 { + selectors.push(RowSelector::skip(to_skip)); + to_skip = 0; + } + selectors.push(RowSelector::select(to_process)); + } + } + } + + // Process any remaining selectors from first (should all be skip) + for v in first { + if v.row_count != 0 { + assert!( + v.skip, + "selection contains less than the number of selected rows" + ); + to_skip += v.row_count; + } + } + + if to_skip != 0 { + selectors.push(RowSelector::skip(to_skip)); + } + + Self { selectors } + } +} + +impl From> for RowSelection { + fn from(selectors: Vec) -> Self { + let mut result: Vec = Vec::new(); + for selector in selectors { + if selector.row_count == 0 { + continue; + } + match result.last_mut() { + Some(last) if last.skip == selector.skip => { + last.row_count += selector.row_count; + } + _ => result.push(selector), + } + } + Self { selectors: result } + } +} + +impl From for Vec { + fn from(selection: RowSelection) -> Self { + selection.selectors + } +} + +impl FromIterator for RowSelection { + fn from_iter>(iter: T) -> Self { + iter.into_iter().collect::>().into() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_row_selector_select() { + let selector = RowSelector::select(100); + assert_eq!(selector.row_count, 100); + assert!(!selector.skip); + } + + #[test] + fn test_row_selector_skip() { + let selector = RowSelector::skip(50); + assert_eq!(selector.row_count, 50); + assert!(selector.skip); + } + + #[test] + fn test_row_selection_from_consecutive_ranges() { + let selection = RowSelection::from_consecutive_ranges(vec![5..10, 15..20].into_iter(), 25); + + let expected = vec![ + RowSelector::skip(5), + RowSelector::select(5), + RowSelector::skip(5), + RowSelector::select(5), + RowSelector::skip(5), + ]; + + assert_eq!(selection.selectors, expected); + assert_eq!(selection.row_count(), 25); + assert_eq!(selection.selected_row_count(), 10); + assert_eq!(selection.skipped_row_count(), 15); + } + + #[test] + fn test_row_selection_consolidation() { + let selectors = vec![ + RowSelector::skip(5), + RowSelector::skip(5), + RowSelector::select(10), + RowSelector::select(5), + ]; + + let selection: RowSelection = selectors.into(); + + let expected = vec![RowSelector::skip(10), RowSelector::select(15)]; + + assert_eq!(selection.selectors, expected); + } + + #[test] + fn test_row_selection_select_all() { + let selection = RowSelection::select_all(100); + assert_eq!(selection.row_count(), 100); + assert_eq!(selection.selected_row_count(), 100); + assert_eq!(selection.skipped_row_count(), 0); + assert!(selection.selects_any()); + } + + #[test] + fn test_row_selection_skip_all() { + let selection = RowSelection::skip_all(100); + assert_eq!(selection.row_count(), 100); + assert_eq!(selection.selected_row_count(), 0); + assert_eq!(selection.skipped_row_count(), 100); + assert!(!selection.selects_any()); + } + + #[test] + fn test_row_selection_split_off() { + let mut selection = + RowSelection::from_consecutive_ranges(vec![10..30, 40..60].into_iter(), 100); + + let first = selection.split_off(35); + + assert_eq!(first.row_count(), 35); + assert_eq!(selection.row_count(), 65); + + // First should have: skip(10) + select(20) + skip(5) + assert_eq!(first.selected_row_count(), 20); + + // Remaining should have: skip(5) + select(20) + skip(40) + assert_eq!(selection.selected_row_count(), 20); + } + + #[test] + fn test_row_selection_and_then() { + // First selection: skip 5, select 10, skip 5 + let first = + RowSelection::from_consecutive_ranges(vec![5..15].into_iter(), 20); + + // Second selection (operates on the 10 selected rows): skip 2, select 5, skip 3 + let second = + RowSelection::from_consecutive_ranges(vec![2..7].into_iter(), 10); + + let result = first.and_then(&second); + + // Should skip first 5, then skip 2 more (= 7), then select 5, then skip rest + assert_eq!(result.row_count(), 20); + assert_eq!(result.selected_row_count(), 5); + + let expected = vec![ + RowSelector::skip(7), + RowSelector::select(5), + RowSelector::skip(8), + ]; + assert_eq!(result.selectors, expected); + } + + #[test] + fn test_row_selection_from_filters() { + use arrow::array::BooleanArray; + + // Create a boolean filter: [false, false, true, true, false] + let filter = BooleanArray::from(vec![false, false, true, true, false]); + + let selection = RowSelection::from_filters(&[filter]); + + let expected = vec![ + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(1), + ]; + + assert_eq!(selection.selectors, expected); + } + + #[test] + fn test_row_selection_empty() { + let selection = RowSelection::new(); + assert_eq!(selection.row_count(), 0); + assert_eq!(selection.selected_row_count(), 0); + assert!(!selection.selects_any()); + } + + #[test] + #[should_panic(expected = "ranges must be provided in order")] + fn test_row_selection_out_of_order() { + RowSelection::from_consecutive_ranges(vec![10..20, 5..15].into_iter(), 25); + } +} + From 352a387dd357be4f186c2e52fe61e8dcee2b366c Mon Sep 17 00:00:00 2001 From: Socrates Date: Sat, 4 Oct 2025 14:03:35 +0800 Subject: [PATCH 2/6] add regression-test --- tests/row_selection/main.rs | 397 ++++++++++++++++++++++++++++++++++++ 1 file changed, 397 insertions(+) create mode 100644 tests/row_selection/main.rs diff --git a/tests/row_selection/main.rs b/tests/row_selection/main.rs new file mode 100644 index 0000000..bcb5b64 --- /dev/null +++ b/tests/row_selection/main.rs @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Regression tests for RowSelection functionality + +use std::fs::File; + +use arrow::record_batch::RecordBatch; +use arrow::util::pretty; +use orc_rust::arrow_reader::ArrowReaderBuilder; +use orc_rust::projection::ProjectionMask; +use orc_rust::row_selection::{RowSelection, RowSelector}; + +fn basic_path(path: &str) -> String { + let dir = env!("CARGO_MANIFEST_DIR"); + format!("{dir}/tests/basic/data/{path}") +} + +// Helper function to compare batches with expected output +fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { + let formatted = pretty::pretty_format_batches(batches).unwrap().to_string(); + let actual_lines: Vec<_> = formatted.trim().lines().collect(); + assert_eq!( + &actual_lines, expected_lines, + "\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n" + ); +} + +#[test] +fn test_row_selection_skip_first_select_middle() { + // Skip first 2 rows, select next 2 rows, skip rest + let path = basic_path("test.orc"); + let f = File::open(path).expect("no file found"); + + let selection = vec![ + RowSelector::skip(2), + RowSelector::select(2), + RowSelector::skip(1), + ] + .into(); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should only read 2 rows (rows index 2 and 3 from the file) + assert_eq!(total_rows, 2); + + // Verify data content - should be rows 2 and 3 (0-indexed) + let expected = [ + "+-----+------+------------+-----+----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+---------------------+-------------+----------------+", + "| a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple | tinyint_simple |", + "+-----+------+------------+-----+----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+---------------------+-------------+----------------+", + "| | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 | 1 |", + "| 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 | 127 |", + "+-----+------+------------+-----+----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+---------------------+-------------+----------------+", + ]; + assert_batches_eq(&batches, &expected); +} + +#[test] +fn test_row_selection_select_all() { + let path = basic_path("test.orc"); + let f = File::open(path).expect("no file found"); + + // Select all 5 rows + let selection = RowSelection::select_all(5); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 5); +} + +#[test] +fn test_row_selection_skip_all() { + let path = basic_path("test.orc"); + let f = File::open(path).expect("no file found"); + + // Skip all 5 rows + let selection = RowSelection::skip_all(5); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should read 0 rows + assert_eq!(total_rows, 0); +} + +#[test] +fn test_row_selection_select_first_only() { + let path = basic_path("test.orc"); + let f = File::open(path).expect("no file found"); + + // Select only first row + let selection = vec![RowSelector::select(1), RowSelector::skip(4)].into(); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 1); + + let expected = [ + "+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+----------------+", + "| a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple | tinyint_simple |", + "+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+----------------+", + "| 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 | -1 |", + "+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+----------------+", + ]; + assert_batches_eq(&batches, &expected); +} + +#[test] +fn test_row_selection_select_last_only() { + let path = basic_path("test.orc"); + let f = File::open(path).expect("no file found"); + + // Skip first 4 rows, select last row + let selection = vec![RowSelector::skip(4), RowSelector::select(1)].into(); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 1); + + let expected = [ + "+-----+-------+------------+-----+---+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+---------------------+-------------+----------------+", + "| a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple | tinyint_simple |", + "+-----+-------+------------+-----+---+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+---------------------+-------------+----------------+", + "| 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 | -127 |", + "+-----+-------+------------+-----+---+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+---------------------+-------------+----------------+", + ]; + assert_batches_eq(&batches, &expected); +} + +#[test] +fn test_row_selection_with_consecutive_ranges() { + let path = basic_path("test.orc"); + let f = File::open(path).expect("no file found"); + + // Select rows at indices 0-1 and 3-4 (skip row 2) + let selection = RowSelection::from_consecutive_ranges(vec![0..2, 3..5].into_iter(), 5); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Should read 4 rows (skip the middle one) + assert_eq!(total_rows, 4); +} + +#[test] +fn test_row_selection_with_projection() { + // Test that row selection works with column projection + let path = basic_path("test.orc"); + let f = File::open(path).expect("no file found"); + + let builder = ArrowReaderBuilder::try_new(f).unwrap(); + let projection = + ProjectionMask::named_roots(builder.file_metadata().root_data_type(), &["a", "b"]); + + let selection = vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(2), + ] + .into(); + + let reader = builder + .with_projection(projection) + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 2); + assert_eq!(batches[0].num_columns(), 2); // Only 2 columns projected + + let expected = [ + "+-----+-------+", + "| a | b |", + "+-----+-------+", + "| 2.0 | false |", + "| | |", + "+-----+-------+", + ]; + assert_batches_eq(&batches, &expected); +} + +#[test] +fn test_row_selection_with_nested_struct() { + let path = basic_path("nested_struct.orc"); + let f = File::open(path).expect("no file found"); + + // Select first 2 rows and last row + let selection = vec![ + RowSelector::select(2), + RowSelector::skip(2), + RowSelector::select(1), + ] + .into(); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 3); + + let expected = [ + "+-------------------+", + "| nest |", + "+-------------------+", + "| {a: 1.0, b: true} |", + "| {a: 3.0, b: } |", + "| {a: -3.0, b: } |", + "+-------------------+", + ]; + assert_batches_eq(&batches, &expected); +} + +#[test] +fn test_row_selection_with_nested_array() { + let path = basic_path("nested_array.orc"); + let f = File::open(path).expect("no file found"); + + // Select middle rows (index 1-2) + let selection = vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(2), + ] + .into(); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 2); + + let expected = [ + "+--------------------+", + "| value |", + "+--------------------+", + "| [5, , 32, 4, 15] |", + "| [16, , 3, 4, 5, 6] |", + "+--------------------+", + ]; + assert_batches_eq(&batches, &expected); +} + +#[test] +fn test_row_selection_with_large_file() { + // Test with a larger file that spans multiple stripes + let path = basic_path("string_long_long.orc"); + let f = File::open(path).expect("no file found"); + + // Skip first 1000 rows, select next 500, skip rest + let selection = vec![ + RowSelector::skip(1000), + RowSelector::select(500), + RowSelector::skip(8500), + ] + .into(); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 500); +} + +#[test] +fn test_row_selection_empty_selection() { + let path = basic_path("test.orc"); + let f = File::open(path).expect("no file found"); + + // Empty selection - skip all rows + let selection = RowSelection::skip_all(5); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + // Empty selection should read 0 rows + assert_eq!(total_rows, 0); +} + +#[test] +fn test_row_selection_with_compression() { + // Test that row selection works with compressed files + let path = basic_path("string_dict_gzip.orc"); + let f = File::open(path).expect("no file found"); + + let selection = vec![ + RowSelector::skip(10), + RowSelector::select(20), + RowSelector::skip(34), + ] + .into(); + + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_row_selection(selection) + .build(); + + let batches = reader.collect::, _>>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 20); +} + +// TODO: Async version doesn't support row_selection yet +// Need to update async_arrow_reader.rs to pass row_selection to NaiveStripeDecoder +// #[cfg(feature = "async")] +// #[tokio::test] +// async fn test_row_selection_async() { +// let path = basic_path("test.orc"); +// let f = tokio::fs::File::open(path).await.unwrap(); +// +// let selection = vec![ +// RowSelector::skip(1), +// RowSelector::select(3), +// RowSelector::skip(1), +// ] +// .into(); +// +// let reader = ArrowReaderBuilder::try_new_async(f) +// .await +// .unwrap() +// .with_row_selection(selection) +// .build_async(); +// +// let batches = reader.try_collect::>().await.unwrap(); +// let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); +// +// assert_eq!(total_rows, 3); +// } + From 033910435435454f287103609cb1998062ead8f2 Mon Sep 17 00:00:00 2001 From: Socrates Date: Sat, 4 Oct 2025 14:07:05 +0800 Subject: [PATCH 3/6] clean --- examples/row_selection_example.rs | 147 ------------------------------ 1 file changed, 147 deletions(-) delete mode 100644 examples/row_selection_example.rs diff --git a/examples/row_selection_example.rs b/examples/row_selection_example.rs deleted file mode 100644 index 73324f2..0000000 --- a/examples/row_selection_example.rs +++ /dev/null @@ -1,147 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Example demonstrating how to use RowSelection to skip rows when reading ORC files - -use std::fs::File; -use std::sync::Arc; - -use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; -use arrow::datatypes::{DataType, Field, Schema}; -use orc_rust::arrow_reader::ArrowReaderBuilder; -use orc_rust::arrow_writer::ArrowWriterBuilder; -use orc_rust::row_selection::{RowSelection, RowSelector}; - -fn main() -> Result<(), Box> { - // Step 1: Create a sample ORC file with 100 rows - println!("Creating sample ORC file..."); - let file_path = "/tmp/row_selection_example.orc"; - create_sample_orc_file(file_path)?; - - // Step 2: Read the file without row selection (baseline) - println!("\n=== Reading all rows (no selection) ==="); - let file = File::open(file_path)?; - let reader = ArrowReaderBuilder::try_new(file)?.build(); - let mut total_rows = 0; - for batch in reader { - let batch = batch?; - total_rows += batch.num_rows(); - } - println!("Total rows read: {}", total_rows); - - // Step 3: Read with row selection - skip first 30 rows, select next 40, skip rest - println!("\n=== Reading with row selection ==="); - let file = File::open(file_path)?; - - // Create a selection: skip 30, select 40, skip 30 - let selection = vec![ - RowSelector::skip(30), - RowSelector::select(40), - RowSelector::skip(30), - ] - .into(); - - let reader = ArrowReaderBuilder::try_new(file)? - .with_row_selection(selection) - .build(); - - let mut selected_rows = 0; - let mut batches = Vec::new(); - for batch in reader { - let batch = batch?; - selected_rows += batch.num_rows(); - batches.push(batch); - } - - println!("Total rows selected: {}", selected_rows); - println!("Expected: 40, Actual: {}", selected_rows); - - // Display some of the selected data - if let Some(first_batch) = batches.first() { - let id_col = first_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let name_col = first_batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - println!("\nFirst 5 selected rows:"); - for i in 0..5.min(first_batch.num_rows()) { - println!( - " id: {}, name: {}", - id_col.value(i), - name_col.value(i) - ); - } - } - - // Step 4: Read with multiple non-consecutive selections - println!("\n=== Reading with multiple selections ==="); - let file = File::open(file_path)?; - - // Select rows 10-20 and 60-70 - let selection = RowSelection::from_consecutive_ranges( - vec![10..20, 60..70].into_iter(), - 100, - ); - - let reader = ArrowReaderBuilder::try_new(file)? - .with_row_selection(selection) - .build(); - - let mut selected_rows = 0; - for batch in reader { - let batch = batch?; - selected_rows += batch.num_rows(); - } - - println!("Total rows selected: {}", selected_rows); - println!("Expected: 20 (10 from each range)"); - - println!("\n✓ Row selection example completed successfully!"); - - Ok(()) -} - -fn create_sample_orc_file(path: &str) -> Result<(), Box> { - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - ])); - - // Create 100 rows - let ids: ArrayRef = Arc::new(Int32Array::from((0..100).collect::>())); - let names: ArrayRef = Arc::new(StringArray::from( - (0..100) - .map(|i| format!("name_{}", i)) - .collect::>(), - )); - - let batch = RecordBatch::try_new(schema.clone(), vec![ids, names])?; - - let file = File::create(path)?; - let mut writer = ArrowWriterBuilder::new(file, schema).try_build()?; - writer.write(&batch)?; - writer.close()?; - - Ok(()) -} - From cdb3c0baa97e2282fc81de44d2b4183ac2621fd1 Mon Sep 17 00:00:00 2001 From: Socrates Date: Sat, 4 Oct 2025 14:17:26 +0800 Subject: [PATCH 4/6] fmt --- src/array_decoder/mod.rs | 14 +++++++------- src/row_selection.rs | 15 ++++++++------- tests/row_selection/main.rs | 1 - 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/array_decoder/mod.rs b/src/array_decoder/mod.rs index ff3f2ce..9ed0c6a 100644 --- a/src/array_decoder/mod.rs +++ b/src/array_decoder/mod.rs @@ -260,14 +260,14 @@ impl Iterator for NaiveStripeDecoder { let selector = selectors[self.selection_index]; (selector.skip, selector.row_count) }; - + let (is_skip, row_count) = selector_info; - + if is_skip { // Skip these rows by advancing the index self.index += row_count; self.selection_index += 1; - + // Decode and discard the skipped rows to advance the internal decoders if let Err(e) = self.skip_rows(row_count) { return Some(Err(e)); @@ -277,20 +277,20 @@ impl Iterator for NaiveStripeDecoder { let rows_to_read = row_count.min(self.batch_size); let remaining = self.number_of_rows - self.index; let actual_rows = rows_to_read.min(remaining); - + if actual_rows == 0 { self.selection_index += 1; continue; } - + let record = self.decode_next_batch(actual_rows).transpose()?; self.index += actual_rows; - + // Update selector to track progress if actual_rows >= row_count { self.selection_index += 1; } - + return Some(record); } } diff --git a/src/row_selection.rs b/src/row_selection.rs index 0f8abf1..f09f49a 100644 --- a/src/row_selection.rs +++ b/src/row_selection.rs @@ -109,8 +109,12 @@ impl RowSelection { let iter = filters.iter().flat_map(|filter| { let offset = next_offset; next_offset += filter.len(); - assert_eq!(filter.null_count(), 0, "filter arrays must not contain nulls"); - + assert_eq!( + filter.null_count(), + 0, + "filter arrays must not contain nulls" + ); + // Find consecutive ranges of true values let mut ranges = vec![]; let mut start = None; @@ -503,12 +507,10 @@ mod tests { #[test] fn test_row_selection_and_then() { // First selection: skip 5, select 10, skip 5 - let first = - RowSelection::from_consecutive_ranges(vec![5..15].into_iter(), 20); + let first = RowSelection::from_consecutive_ranges(vec![5..15].into_iter(), 20); // Second selection (operates on the 10 selected rows): skip 2, select 5, skip 3 - let second = - RowSelection::from_consecutive_ranges(vec![2..7].into_iter(), 10); + let second = RowSelection::from_consecutive_ranges(vec![2..7].into_iter(), 10); let result = first.and_then(&second); @@ -556,4 +558,3 @@ mod tests { RowSelection::from_consecutive_ranges(vec![10..20, 5..15].into_iter(), 25); } } - diff --git a/tests/row_selection/main.rs b/tests/row_selection/main.rs index bcb5b64..ca04ed5 100644 --- a/tests/row_selection/main.rs +++ b/tests/row_selection/main.rs @@ -394,4 +394,3 @@ fn test_row_selection_with_compression() { // // assert_eq!(total_rows, 3); // } - From 399a445877029bfc0b734b634655ca55bd103907 Mon Sep 17 00:00:00 2001 From: Socrates Date: Sat, 4 Oct 2025 14:54:00 +0800 Subject: [PATCH 5/6] fix clippy --- src/row_selection.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/row_selection.rs b/src/row_selection.rs index f09f49a..9e1c72d 100644 --- a/src/row_selection.rs +++ b/src/row_selection.rs @@ -507,10 +507,10 @@ mod tests { #[test] fn test_row_selection_and_then() { // First selection: skip 5, select 10, skip 5 - let first = RowSelection::from_consecutive_ranges(vec![5..15].into_iter(), 20); + let first = RowSelection::from_consecutive_ranges(std::iter::once(5..15), 20); // Second selection (operates on the 10 selected rows): skip 2, select 5, skip 3 - let second = RowSelection::from_consecutive_ranges(vec![2..7].into_iter(), 10); + let second = RowSelection::from_consecutive_ranges(std::iter::once(2..7), 10); let result = first.and_then(&second); From 2cec1aa1fbef95ce59e954647b7480b324b15731 Mon Sep 17 00:00:00 2001 From: Socrates Date: Sun, 5 Oct 2025 17:25:38 +0800 Subject: [PATCH 6/6] fix --- src/array_decoder/mod.rs | 13 ++++++------- tests/row_selection/main.rs | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/array_decoder/mod.rs b/src/array_decoder/mod.rs index 9ed0c6a..afbdaa0 100644 --- a/src/array_decoder/mod.rs +++ b/src/array_decoder/mod.rs @@ -40,6 +40,7 @@ use crate::error::{ use crate::proto::stream::Kind; use crate::schema::DataType; use crate::stripe::Stripe; +use crate::RowSelection; use self::decimal::new_decimal_decoder; use self::list::ListArrayDecoder; @@ -238,7 +239,7 @@ pub struct NaiveStripeDecoder { index: usize, batch_size: usize, number_of_rows: usize, - row_selection: Option, + row_selection: Option, selection_index: usize, } @@ -251,9 +252,9 @@ impl Iterator for NaiveStripeDecoder { if self.row_selection.is_some() { // Process selectors until we find rows to select or exhaust the selection loop { - let selector_info = { - let selection = self.row_selection.as_ref().unwrap(); - let selectors = selection.selectors(); + let (is_skip, row_count) = { + // Safety: this has been checked above + let selectors = self.row_selection.as_ref().unwrap().selectors(); if self.selection_index >= selectors.len() { return None; } @@ -261,8 +262,6 @@ impl Iterator for NaiveStripeDecoder { (selector.skip, selector.row_count) }; - let (is_skip, row_count) = selector_info; - if is_skip { // Skip these rows by advancing the index self.index += row_count; @@ -492,7 +491,7 @@ impl NaiveStripeDecoder { stripe: Stripe, schema_ref: SchemaRef, batch_size: usize, - row_selection: Option, + row_selection: Option, ) -> Result { let number_of_rows = stripe.number_of_rows(); let decoders = stripe diff --git a/tests/row_selection/main.rs b/tests/row_selection/main.rs index ca04ed5..7a580f9 100644 --- a/tests/row_selection/main.rs +++ b/tests/row_selection/main.rs @@ -8,7 +8,7 @@ // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, +// Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the