Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::error::{
use crate::proto::stream::Kind;
use crate::schema::DataType;
use crate::stripe::Stripe;
use crate::RowSelection;

use self::decimal::new_decimal_decoder;
use self::list::ListArrayDecoder;
Expand Down Expand Up @@ -238,18 +239,68 @@ pub struct NaiveStripeDecoder {
index: usize,
batch_size: usize,
number_of_rows: usize,
row_selection: Option<RowSelection>,
selection_index: usize,
}

impl Iterator for NaiveStripeDecoder {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
if self.index < self.number_of_rows {
let record = self
.decode_next_batch(self.number_of_rows - self.index)
.transpose()?;
self.index += self.batch_size;
Some(record)
// Handle row selection if present
if self.row_selection.is_some() {
// Process selectors until we find rows to select or exhaust the selection
loop {
let (is_skip, row_count) = {
// Safety: this has been checked above
let selectors = self.row_selection.as_ref().unwrap().selectors();
if self.selection_index >= selectors.len() {
return None;
}
let selector = selectors[self.selection_index];
(selector.skip, selector.row_count)
};

if is_skip {
// Skip these rows by advancing the index
self.index += row_count;
self.selection_index += 1;

// Decode and discard the skipped rows to advance the internal decoders
if let Err(e) = self.skip_rows(row_count) {
return Some(Err(e));
}
} else {
// Select these rows
let rows_to_read = row_count.min(self.batch_size);
let remaining = self.number_of_rows - self.index;
let actual_rows = rows_to_read.min(remaining);

if actual_rows == 0 {
self.selection_index += 1;
continue;
}

let record = self.decode_next_batch(actual_rows).transpose()?;
self.index += actual_rows;

// Update selector to track progress
if actual_rows >= row_count {
self.selection_index += 1;
}

return Some(record);
}
}
} else {
// No row selection - decode normally
let record = self
.decode_next_batch(self.number_of_rows - self.index)
.transpose()?;
self.index += self.batch_size;
Some(record)
}
} else {
None
}
Expand Down Expand Up @@ -433,6 +484,15 @@ impl NaiveStripeDecoder {
}

pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result<Self> {
Self::new_with_selection(stripe, schema_ref, batch_size, None)
}

pub fn new_with_selection(
stripe: Stripe,
schema_ref: SchemaRef,
batch_size: usize,
row_selection: Option<RowSelection>,
) -> Result<Self> {
let number_of_rows = stripe.number_of_rows();
let decoders = stripe
.columns()
Expand All @@ -448,6 +508,20 @@ impl NaiveStripeDecoder {
index: 0,
batch_size,
number_of_rows,
row_selection,
selection_index: 0,
})
}

/// Skip the specified number of rows by decoding and discarding them
fn skip_rows(&mut self, count: usize) -> Result<()> {
// Decode in batches to avoid large memory allocations
let mut remaining = count;
while remaining > 0 {
let chunk = self.batch_size.min(remaining);
let _ = self.inner_decode_next_batch(chunk)?;
remaining -= chunk;
}
Ok(())
}
}
50 changes: 48 additions & 2 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::error::Result;
use crate::projection::ProjectionMask;
use crate::reader::metadata::{read_metadata, FileMetadata};
use crate::reader::ChunkReader;
use crate::row_selection::RowSelection;
use crate::schema::RootDataType;
use crate::stripe::{Stripe, StripeMetadata};

Expand All @@ -40,6 +41,7 @@ pub struct ArrowReaderBuilder<R> {
pub(crate) projection: ProjectionMask,
pub(crate) schema_ref: Option<SchemaRef>,
pub(crate) file_byte_range: Option<Range<usize>>,
pub(crate) row_selection: Option<RowSelection>,
}

impl<R> ArrowReaderBuilder<R> {
Expand All @@ -51,6 +53,7 @@ impl<R> ArrowReaderBuilder<R> {
projection: ProjectionMask::all(),
schema_ref: None,
file_byte_range: None,
row_selection: None,
}
}

Expand Down Expand Up @@ -79,6 +82,33 @@ impl<R> ArrowReaderBuilder<R> {
self
}

/// Set a [`RowSelection`] to filter rows
///
/// The [`RowSelection`] specifies which rows should be decoded from the ORC file.
/// This can be used to skip rows that don't match predicates, reducing I/O and
/// improving query performance.
///
/// # Example
///
/// ```no_run
/// # use std::fs::File;
/// # use orc_rust::arrow_reader::ArrowReaderBuilder;
/// # use orc_rust::row_selection::{RowSelection, RowSelector};
/// let file = File::open("data.orc").unwrap();
/// let selection = vec![
/// RowSelector::skip(100),
/// RowSelector::select(50),
/// ].into();
/// let reader = ArrowReaderBuilder::try_new(file)
/// .unwrap()
/// .with_row_selection(selection)
/// .build();
/// ```
pub fn with_row_selection(mut self, row_selection: RowSelection) -> Self {
self.row_selection = Some(row_selection);
self
}

/// Returns the currently computed schema
///
/// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
Expand Down Expand Up @@ -124,6 +154,7 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
schema_ref,
current_stripe: None,
batch_size: self.batch_size,
row_selection: self.row_selection,
}
}
}
Expand All @@ -133,6 +164,7 @@ pub struct ArrowReader<R> {
schema_ref: SchemaRef,
current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
batch_size: usize,
row_selection: Option<RowSelection>,
}

impl<R> ArrowReader<R> {
Expand All @@ -146,8 +178,22 @@ impl<R: ChunkReader> ArrowReader<R> {
let stripe = self.cursor.next().transpose()?;
match stripe {
Some(stripe) => {
let decoder =
NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size)?;
// Split off the row selection for this stripe
let stripe_rows = stripe.number_of_rows();
let selection = self.row_selection.as_mut().and_then(|s| {
if s.row_count() > 0 {
Some(s.split_off(stripe_rows))
} else {
None
}
});

let decoder = NaiveStripeDecoder::new_with_selection(
stripe,
self.schema_ref.clone(),
self.batch_size,
selection,
)?;
self.current_stripe = Some(Box::new(decoder));
self.next().transpose()
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod projection;
#[allow(dead_code)]
mod proto;
pub mod reader;
pub mod row_selection;
pub mod schema;
pub mod statistics;
pub mod stripe;
Expand All @@ -70,3 +71,4 @@ pub use arrow_reader::{ArrowReader, ArrowReaderBuilder};
pub use arrow_writer::{ArrowWriter, ArrowWriterBuilder};
#[cfg(feature = "async")]
pub use async_arrow_reader::ArrowStreamReader;
pub use row_selection::{RowSelection, RowSelector};
Loading