Skip to content
10 changes: 10 additions & 0 deletions src/array_decoder/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl ArrayBatchDecoder for DecimalArrayDecoder {
let array = Arc::new(array) as ArrayRef;
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
self.inner.skip_values(n, parent_present)
}
}

/// This iter fixes the scales of the varints decoded as scale is specified on a per
Expand All @@ -112,6 +116,12 @@ struct DecimalScaleRepairDecoder {
}

impl PrimitiveValueDecoder<i128> for DecimalScaleRepairDecoder {
fn skip(&mut self, n: usize) -> Result<()> {
self.varint_iter.skip(n)?;
self.scale_iter.skip(n)?;
Ok(())
}

fn decode(&mut self, out: &mut [i128]) -> Result<()> {
// TODO: can probably optimize, reuse buffers?
let mut varint = vec![0; out.len()];
Expand Down
17 changes: 17 additions & 0 deletions src/array_decoder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,21 @@ impl ArrayBatchDecoder for ListArrayDecoder {
let array = Arc::new(array);
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
use super::skip_present_and_get_non_null_count;

let non_null_count =
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;

// Decode lengths to determine how many child values to skip
let mut lengths = vec![0; non_null_count];
self.lengths.decode(&mut lengths)?;
let total_length: i64 = lengths.iter().sum();

// Skip the child values (children don't have parent_present from list)
self.inner.skip_values(total_length as usize, None)?;

Ok(())
}
}
18 changes: 18 additions & 0 deletions src/array_decoder/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,22 @@ impl ArrayBatchDecoder for MapArrayDecoder {
let array = Arc::new(array);
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
use super::skip_present_and_get_non_null_count;

let non_null_count =
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;

// Decode lengths to determine how many entries to skip
let mut lengths = vec![0; non_null_count];
self.lengths.decode(&mut lengths)?;
let total_length: i64 = lengths.iter().sum();

// Skip both keys and values (they don't have parent_present from map)
self.keys.skip_values(total_length as usize, None)?;
self.values.skip_values(total_length as usize, None)?;

Ok(())
}
}
178 changes: 128 additions & 50 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ pub trait ArrayBatchDecoder: Send {
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef>;

/// Skip the next `n` values without decoding them, failing if it cannot skip the enough values.
/// If parent nested type (e.g. Struct) indicates a null in it's PRESENT stream,
/// then the child doesn't have a value (similar to other nullability). So we need
/// to take care to insert these null values as Arrow requires the child to hold
/// data in the null slot of the child.
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()>;
}

struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
Expand Down Expand Up @@ -123,6 +130,12 @@ impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
let array = Arc::new(array) as ArrayRef;
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
let non_null_count =
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
self.iter.skip(non_null_count)
}
}

type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
Expand Down Expand Up @@ -168,6 +181,12 @@ impl ArrayBatchDecoder for BooleanArrayDecoder {
};
Ok(Arc::new(array))
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
let non_null_count =
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
self.iter.skip(non_null_count)
}
}

struct PresentDecoder {
Expand Down Expand Up @@ -232,6 +251,42 @@ fn derive_present_vec(
}
}

/// Skip n values and return the non-null count for the data stream
fn skip_present_and_get_non_null_count(
present: &mut Option<PresentDecoder>,
parent_present: Option<&NullBuffer>,
n: usize,
) -> Result<usize> {
match (present, parent_present) {
(Some(present), Some(parent_present)) => {
// Parent has nulls, so we need to decode parent present to know how many
// of our present values to skip
let non_null_in_parent = parent_present.len() - parent_present.null_count();

// Skip our present values for non-null parents and count non-nulls
let mut our_present = vec![false; non_null_in_parent];
present.inner.decode(&mut our_present)?;
let our_non_null_count = our_present.iter().filter(|&&v| v).count();

Ok(our_non_null_count)
}
(Some(present), None) => {
// No parent present, skip n values and count non-nulls
let mut present_values = vec![false; n];
present.inner.decode(&mut present_values)?;
Ok(present_values.iter().filter(|&&v| v).count())
}
(None, Some(parent_present)) => {
// No our present stream, all non-null parents have data
Ok(parent_present.len() - parent_present.null_count())
}
(None, None) => {
// No nulls at all, all n values have data
Ok(n)
}
}
}

pub struct NaiveStripeDecoder {
stripe: Stripe,
schema_ref: SchemaRef,
Expand All @@ -243,56 +298,81 @@ pub struct NaiveStripeDecoder {
selection_index: usize,
}

impl NaiveStripeDecoder {
/// Advance according to the configured row selection and return the next batch, if any.
///
/// Behavior:
/// - Iterates `RowSelection` segments (skip/select) starting at `selection_index`.
/// - For skip segments: clamp to remaining rows in this stripe, advance decoders via
/// `skip_rows(actual_skip)`, and advance `index`. If the segment is fully consumed,
/// increment `selection_index`.
/// - For select segments: decode up to `min(row_count, batch_size, remaining_in_stripe)`,
/// advance `index`, update `selection_index` if fully consumed, and return the batch.
/// - If a segment requests rows beyond the end of the stripe, it is skipped (advancing
/// `selection_index`) without touching decoders.
fn next_with_row_selection(&mut self) -> Option<Result<RecordBatch>> {
// Process selectors until we produce a batch or exhaust selection
loop {
let (is_skip, row_count) = {
let selectors = self.row_selection.as_ref().unwrap().selectors();
if self.selection_index >= selectors.len() {
return None;
}
let selector = selectors[self.selection_index];
(selector.skip, selector.row_count)
};

if is_skip {
let remaining = self.number_of_rows - self.index;
let actual_skip = row_count.min(remaining);

if actual_skip == 0 {
// Nothing to skip in this stripe; try next selector
self.selection_index += 1;
continue;
}

// Keep decoders in sync by skipping values per column
if let Err(e) = self.skip_rows(actual_skip) {
return Some(Err(e));
}
self.index += actual_skip;

if actual_skip >= row_count {
self.selection_index += 1;
}
} else {
let rows_to_read = row_count.min(self.batch_size);
let remaining = self.number_of_rows - self.index;
let actual_rows = rows_to_read.min(remaining);

if actual_rows == 0 {
// Nothing to read from this selector in this stripe; advance selector
self.selection_index += 1;
continue;
}

let record = self.decode_next_batch(actual_rows).transpose()?;
self.index += actual_rows;

if actual_rows >= row_count {
self.selection_index += 1;
}
return Some(record);
}
}
}
}

impl Iterator for NaiveStripeDecoder {
type Item = Result<RecordBatch>;

// TODO: check if we can make this more efficient
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.number_of_rows {
// Handle row selection if present
if self.row_selection.is_some() {
// Process selectors until we find rows to select or exhaust the selection
loop {
let (is_skip, row_count) = {
// Safety: this has been checked above
let selectors = self.row_selection.as_ref().unwrap().selectors();
if self.selection_index >= selectors.len() {
return None;
}
let selector = selectors[self.selection_index];
(selector.skip, selector.row_count)
};

if is_skip {
// Skip these rows by advancing the index
self.index += row_count;
self.selection_index += 1;

// Decode and discard the skipped rows to advance the internal decoders
if let Err(e) = self.skip_rows(row_count) {
return Some(Err(e));
}
} else {
// Select these rows
let rows_to_read = row_count.min(self.batch_size);
let remaining = self.number_of_rows - self.index;
let actual_rows = rows_to_read.min(remaining);

if actual_rows == 0 {
self.selection_index += 1;
continue;
}

let record = self.decode_next_batch(actual_rows).transpose()?;
self.index += actual_rows;

// Update selector to track progress
if actual_rows >= row_count {
self.selection_index += 1;
}

return Some(record);
}
}
self.next_with_row_selection()
} else {
// No row selection - decode normally
let record = self
Expand Down Expand Up @@ -513,14 +593,12 @@ impl NaiveStripeDecoder {
})
}

/// Skip the specified number of rows by decoding and discarding them
/// Skip the specified number of rows by calling skip_values on each decoder
fn skip_rows(&mut self, count: usize) -> Result<()> {
// Decode in batches to avoid large memory allocations
let mut remaining = count;
while remaining > 0 {
let chunk = self.batch_size.min(remaining);
let _ = self.inner_decode_next_batch(chunk)?;
remaining -= chunk;
// Call skip_values on each decoder to efficiently skip rows
// Top-level decoders don't have parent_present
for decoder in &mut self.decoders {
decoder.skip_values(count, None)?;
}
Ok(())
}
Expand Down
26 changes: 26 additions & 0 deletions src/array_decoder/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,28 @@ impl<T: ByteArrayType> ArrayBatchDecoder for GenericByteArrayDecoder<T> {
let array = Arc::new(array) as ArrayRef;
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
use crate::array_decoder::skip_present_and_get_non_null_count;

let non_null_count =
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;

// Decode lengths to determine how many bytes to skip
let mut lengths = vec![0; non_null_count];
self.lengths.decode(&mut lengths)?;
let total_bytes: i64 = lengths.iter().sum();

// Skip the data bytes
// TODO: can we use the decompressor to skip the bytes?
std::io::copy(
&mut self.bytes.by_ref().take(total_bytes as u64),
&mut std::io::sink(),
)
.context(IoSnafu)?;

Ok(())
}
}

pub struct DictionaryStringArrayDecoder {
Expand Down Expand Up @@ -192,4 +214,8 @@ impl ArrayBatchDecoder for DictionaryStringArrayDecoder {
let array = Arc::new(array);
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
self.indexes.skip_values(n, parent_present)
}
}
15 changes: 15 additions & 0 deletions src/array_decoder/struct_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,19 @@ impl ArrayBatchDecoder for StructArrayDecoder {
let array = Arc::new(array);
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
use super::derive_present_vec;

// Derive the combined present buffer like in next_batch
let present = derive_present_vec(&mut self.present, parent_present, n).transpose()?;

// Skip values in all child decoders
// Pass the present buffer to children so they know which values to skip
for decoder in &mut self.decoders {
decoder.skip_values(n, present.as_ref())?;
}

Ok(())
}
}
13 changes: 13 additions & 0 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampOffsetArrayDecoder<T>
let array = Arc::new(array) as ArrayRef;
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
self.inner.skip_values(n, parent_present)
}
}

/// Wrapper around PrimitiveArrayDecoder to allow specifying the timezone of the output
Expand All @@ -286,6 +290,10 @@ impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampInstantArrayDecoder<T
let array = Arc::new(array) as ArrayRef;
Ok(array)
}

fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
self.0.skip_values(n, parent_present)
}
}

struct TimestampNanosecondAsDecimalWithTzDecoder(TimestampNanosecondAsDecimalDecoder, Tz);
Expand All @@ -308,6 +316,11 @@ impl TimestampNanosecondAsDecimalWithTzDecoder {
}

impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalWithTzDecoder {
fn skip(&mut self, n: usize) -> Result<()> {
self.0.skip(n)?;
Ok(())
}

fn decode(&mut self, out: &mut [i128]) -> Result<()> {
self.0.decode(out)?;
for x in out.iter_mut() {
Expand Down
Loading