Skip to content

Commit

Permalink
Table Scan: Add Row Selection Filtering (apache#565)
Browse files Browse the repository at this point in the history
* feat(scan): add row selection capability via PageIndexEvaluator

* test(row-selection): add first few row selection tests

* feat(scan): add more tests, fix bug where min/max args swapped

* fix: ad test and fix for logic bug in PageIndexEvaluator in-clause handler

* feat: changes suggested from PR review
  • Loading branch information
sdd authored Sep 24, 2024
1 parent b3709ba commit 420b4e2
Show file tree
Hide file tree
Showing 5 changed files with 1,617 additions and 4 deletions.
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand Down
95 changes: 92 additions & 3 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::ParquetMetaData;
Expand All @@ -41,6 +41,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
Expand All @@ -56,6 +57,7 @@ pub struct ArrowReaderBuilder {
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
}

impl ArrowReaderBuilder {
Expand All @@ -68,6 +70,7 @@ impl ArrowReaderBuilder {
file_io,
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
}
}

Expand All @@ -90,13 +93,20 @@ impl ArrowReaderBuilder {
self
}

/// Determines whether to enable row selection.
pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
self.row_selection_enabled = row_selection_enabled;
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
}
}
}
Expand All @@ -111,6 +121,7 @@ pub struct ArrowReader {
concurrency_limit_data_files: usize,

row_group_filtering_enabled: bool,
row_selection_enabled: bool,
}

impl ArrowReader {
Expand All @@ -121,6 +132,7 @@ impl ArrowReader {
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;

let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();
Expand All @@ -142,6 +154,7 @@ impl ArrowReader {
file_io,
tx,
row_group_filtering_enabled,
row_selection_enabled,
)
.await
})
Expand All @@ -168,6 +181,7 @@ impl ArrowReader {
file_io: FileIO,
mut tx: Sender<Result<RecordBatch>>,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
) -> Result<()> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
Expand All @@ -176,11 +190,12 @@ impl ArrowReader {
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let should_load_page_index = row_selection_enabled && task.predicate().is_some();

// Start creating the record batch stream, which wraps the parquet file reader
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
parquet_file_reader,
// Page index will be required in upcoming row selection PR
ArrowReaderOptions::new().with_page_index(false),
ArrowReaderOptions::new().with_page_index(should_load_page_index),
)
.await?;

Expand Down Expand Up @@ -224,6 +239,19 @@ impl ArrowReader {
selected_row_groups = Some(result);
}

if row_selection_enabled {
let row_selection = Self::get_row_selection(
predicate,
record_batch_stream_builder.metadata(),
&selected_row_groups,
&field_id_map,
task.schema(),
)?;

record_batch_stream_builder =
record_batch_stream_builder.with_row_selection(row_selection);
}

if let Some(selected_row_groups) = selected_row_groups {
record_batch_stream_builder =
record_batch_stream_builder.with_row_groups(selected_row_groups);
Expand Down Expand Up @@ -377,6 +405,67 @@ impl ArrowReader {

Ok(results)
}

fn get_row_selection(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
selected_row_groups: &Option<Vec<usize>>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<RowSelection> {
let Some(column_index) = parquet_metadata.column_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain a column index",
));
};

let Some(offset_index) = parquet_metadata.offset_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain an offset index",
));
};

let mut selected_row_groups_idx = 0;

let page_index = column_index
.iter()
.enumerate()
.zip(offset_index)
.zip(parquet_metadata.row_groups());

let mut results = Vec::new();
for (((idx, column_index), offset_index), row_group_metadata) in page_index {
if let Some(selected_row_groups) = selected_row_groups {
// skip row groups that aren't present in selected_row_groups
if idx == selected_row_groups[selected_row_groups_idx] {
selected_row_groups_idx += 1;
} else {
continue;
}
}

let selections_for_page = PageIndexEvaluator::eval(
predicate,
column_index,
offset_index,
row_group_metadata,
field_id_map,
snapshot_schema,
)?;

results.push(selections_for_page);

if let Some(selected_row_groups) = selected_row_groups {
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
}
}

Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
}
}

/// Build the map of parquet field id to Parquet column index in the schema.
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/expr/visitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ pub(crate) mod expression_evaluator;
pub(crate) mod inclusive_metrics_evaluator;
pub(crate) mod inclusive_projection;
pub(crate) mod manifest_evaluator;
pub(crate) mod page_index_evaluator;
pub(crate) mod row_group_metrics_evaluator;
Loading

0 comments on commit 420b4e2

Please sign in to comment.