Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 127 additions & 2 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,23 @@ impl ParquetObjectReader {
}
}

/// Load the Column Index as part of [`Self::get_metadata`]
/// Whether to load the Column Index as part of [`Self::get_metadata`]
///
/// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`.
/// If `page_index_policy` is `Optional` or `Required`, it will take precedence
/// over this preload flag. When it is `Skip` (default), this flag is used.
pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
Self {
preload_column_index,
..self
}
}

/// Load the Offset Index as part of [`Self::get_metadata`]
/// Whether to load the Offset Index as part of [`Self::get_metadata`]
///
/// Note: This setting may be overridden by [`ArrowReaderOptions`] `page_index_policy`.
/// If `page_index_policy` is `Optional` or `Required`, it will take precedence
/// over this preload flag. When it is `Skip` (default), this flag is used.
pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
Self {
preload_offset_index,
Expand Down Expand Up @@ -213,6 +221,16 @@ impl AsyncFileReader for ParquetObjectReader {
);
}

// Override page index policies from ArrowReaderOptions if specified and not Skip.
// When page_index_policy is Skip (default), use the reader's preload flags.
// When page_index_policy is Optional or Required, override the preload flags
// to ensure the specified policy takes precedence.
if let Some(options) = options {
if options.page_index_policy != PageIndexPolicy::Skip {
metadata = metadata.with_page_index_policy(options.page_index_policy);
}
}

let metadata = if let Some(file_size) = self.file_size {
metadata.load_and_finish(self, file_size).await?
} else {
Expand All @@ -226,6 +244,8 @@ impl AsyncFileReader for ParquetObjectReader {

#[cfg(test)]
mod tests {
use crate::arrow::async_reader::ArrowReaderOptions;
use crate::file::metadata::PageIndexPolicy;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -254,6 +274,18 @@ mod tests {
(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}

async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();

let meta = store
.head(&Path::from("alltypes_tiny_pages_plain.parquet"))
.await
.unwrap();

(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}

#[tokio::test]
async fn test_simple() {
let (meta, store) = get_meta_store().await;
Expand Down Expand Up @@ -382,4 +414,97 @@ mod tests {

assert!(err.to_string().contains("was cancelled"));
}

#[tokio::test]
async fn test_page_index_policy_skip_uses_preload_true() {
let (meta, store) = get_meta_store_with_page_index().await;

// Create reader with preload flags set to true
let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
.with_file_size(meta.size)
.with_preload_column_index(true)
.with_preload_offset_index(true);

// Create options with page_index_policy set to Skip (default)
let mut options = ArrowReaderOptions::new();
options.page_index_policy = PageIndexPolicy::Skip;

// Get metadata - Skip means use reader's preload flags (true)
let metadata = reader.get_metadata(Some(&options)).await.unwrap();

// With preload=true, indexes should be loaded since the test file has them
assert!(metadata.column_index().is_some());
}

#[tokio::test]
async fn test_page_index_policy_optional_overrides_preload_false() {
let (meta, store) = get_meta_store_with_page_index().await;

// Create reader with preload flags set to false
let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
.with_file_size(meta.size)
.with_preload_column_index(false)
.with_preload_offset_index(false);

// Create options with page_index_policy set to Optional
let mut options = ArrowReaderOptions::new();
options.page_index_policy = PageIndexPolicy::Optional;

// Get metadata - Optional overrides preload flags and attempts to load indexes
let metadata = reader.get_metadata(Some(&options)).await.unwrap();

// With Optional policy, it will TRY to load indexes but won't fail if they don't exist
// The test file has page indexes, so they will be some
assert!(metadata.column_index().is_some());
}

#[tokio::test]
async fn test_page_index_policy_optional_vs_skip() {
let (meta, store) = get_meta_store_with_page_index().await;

// Test 1: preload=false + Skip policy -> uses preload flags (false)
let mut reader1 = ParquetObjectReader::new(store.clone(), meta.location.clone())
.with_file_size(meta.size)
.with_preload_column_index(false)
.with_preload_offset_index(false);

let mut options1 = ArrowReaderOptions::new();
options1.page_index_policy = PageIndexPolicy::Skip;
let metadata1 = reader1.get_metadata(Some(&options1)).await.unwrap();

// Test 2: preload=false + Optional policy -> overrides to try loading
let mut reader2 = ParquetObjectReader::new(store.clone(), meta.location.clone())
.with_file_size(meta.size)
.with_preload_column_index(false)
.with_preload_offset_index(false);

let mut options2 = ArrowReaderOptions::new();
options2.page_index_policy = PageIndexPolicy::Optional;
let metadata2 = reader2.get_metadata(Some(&options2)).await.unwrap();

// Both should succeed (no panic/error)
// metadata1 (Skip) uses preload=false -> Skip policy
// metadata2 (Optional) overrides preload=false -> Optional policy
assert!(metadata1.column_index().is_none());
assert!(metadata2.column_index().is_some());
}

#[tokio::test]
async fn test_page_index_policy_no_options_uses_preload() {
let (meta, store) = get_meta_store_with_page_index().await;

// Create reader with preload flags set to true
let mut reader = ParquetObjectReader::new(store, meta.location)
.with_file_size(meta.size)
.with_preload_column_index(true)
.with_preload_offset_index(true);

// Get metadata without options - should use reader's preload flags
let metadata = reader.get_metadata(None).await.unwrap();

// With no options provided, preload flags (true) should be respected
// and converted to Optional policy internally (preload=true -> Optional)
// The test file has page indexes, so they will be some
assert!(metadata.column_index().is_some() && metadata.column_index().is_some());
}
}
Loading