diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 19794f6cae10..d23602c4bfd7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use bytes::{BufMut, BytesMut}; use datafusion_common::DataFusionError; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; @@ -52,12 +53,14 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; #[derive(Debug)] pub struct ParquetFormat { enable_pruning: bool, + metadata_size_hint: Option, } impl Default for ParquetFormat { fn default() -> Self { Self { enable_pruning: true, + metadata_size_hint: None, } } } @@ -69,10 +72,24 @@ impl ParquetFormat { self.enable_pruning = enable; self } + + /// Provide a hint to the size of the file metadata. If a hint is provided + /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. + /// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then + /// another read to fetch the metadata length encoded in the footer. + pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self { + self.metadata_size_hint = Some(size_hint); + self + } /// Return true if pruning is enabled pub fn enable_pruning(&self) -> bool { self.enable_pruning } + + /// Return the metadata size hint if set + pub fn metadata_size_hint(&self) -> Option { + self.metadata_size_hint + } } #[async_trait] @@ -88,7 +105,8 @@ impl FileFormat for ParquetFormat { ) -> Result { let mut schemas = Vec::with_capacity(objects.len()); for object in objects { - let schema = fetch_schema(store.as_ref(), object).await?; + let schema = + fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?; schemas.push(schema) } let schema = Schema::try_merge(schemas)?; @@ -101,7 +119,13 @@ impl FileFormat for ParquetFormat { table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { - let stats = fetch_statistics(store.as_ref(), table_schema, object).await?; + let stats = fetch_statistics( + store.as_ref(), + table_schema, + object, + self.metadata_size_hint, + ) + .await?; Ok(stats) } @@ -119,7 +143,11 @@ impl FileFormat for ParquetFormat { None }; - Ok(Arc::new(ParquetExec::new(conf, predicate))) + Ok(Arc::new(ParquetExec::new( + conf, + predicate, + self.metadata_size_hint(), + ))) } } @@ -290,6 +318,7 @@ fn summarize_min_max( pub(crate) async fn fetch_parquet_metadata( store: &dyn ObjectStore, meta: &ObjectMeta, + size_hint: Option, ) -> Result { if meta.size < 8 { return Err(DataFusionError::Execution(format!( @@ -298,13 +327,22 @@ pub(crate) async fn fetch_parquet_metadata( ))); } - let footer_start = meta.size - 8; + // If a size hint is provided, read more than the minimum size + // to try and avoid a second fetch. + let footer_start = if let Some(size_hint) = size_hint { + meta.size.saturating_sub(size_hint) + } else { + meta.size - 8 + }; + let suffix = store .get_range(&meta.location, footer_start..meta.size) .await?; + let suffix_len = suffix.len(); + let mut footer = [0; 8]; - footer.copy_from_slice(suffix.as_ref()); + footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]); let length = decode_footer(&footer)?; @@ -316,17 +354,35 @@ pub(crate) async fn fetch_parquet_metadata( ))); } - let metadata_start = meta.size - length - 8; - let metadata = store - .get_range(&meta.location, metadata_start..footer_start) - .await?; + // Did not fetch the entire file metadata in the initial read, need to make a second request + if length > suffix_len - 8 { + let metadata_start = meta.size - length - 8; + let remaining_metadata = store + .get_range(&meta.location, metadata_start..footer_start) + .await?; + + let mut metadata = BytesMut::with_capacity(length); - Ok(decode_metadata(metadata.as_ref())?) + metadata.put(remaining_metadata.as_ref()); + metadata.put(&suffix[..suffix_len - 8]); + + Ok(decode_metadata(metadata.as_ref())?) + } else { + let metadata_start = meta.size - length - 8; + + Ok(decode_metadata( + &suffix[metadata_start - footer_start..suffix_len - 8], + )?) + } } /// Read and parse the schema of the Parquet file at location `path` -async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result { - let metadata = fetch_parquet_metadata(store, file).await?; +async fn fetch_schema( + store: &dyn ObjectStore, + file: &ObjectMeta, + metadata_size_hint: Option, +) -> Result { + let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; let file_metadata = metadata.file_metadata(); let schema = parquet_to_arrow_schema( file_metadata.schema_descr(), @@ -340,8 +396,9 @@ async fn fetch_statistics( store: &dyn ObjectStore, table_schema: SchemaRef, file: &ObjectMeta, + metadata_size_hint: Option, ) -> Result { - let metadata = fetch_parquet_metadata(store, file).await?; + let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( @@ -458,6 +515,9 @@ pub(crate) mod test_util { mod tests { use super::super::test_util::scan_format; use crate::physical_plan::collect; + use std::fmt::{Display, Formatter}; + use std::ops::Range; + use std::sync::atomic::{AtomicUsize, Ordering}; use super::*; @@ -469,9 +529,14 @@ mod tests { StringArray, TimestampNanosecondArray, }; use arrow::record_batch::RecordBatch; + use async_trait::async_trait; + use bytes::Bytes; use datafusion_common::ScalarValue; + use futures::stream::BoxStream; use futures::StreamExt; use object_store::local::LocalFileSystem; + use object_store::path::Path; + use object_store::{GetResult, ListResult}; #[tokio::test] async fn read_merged_batches() -> Result<()> { @@ -489,7 +554,8 @@ mod tests { let format = ParquetFormat::default(); let schema = format.infer_schema(&store, &meta).await.unwrap(); - let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0]).await?; + let stats = + fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?; assert_eq!(stats.num_rows, Some(3)); let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; @@ -497,7 +563,7 @@ mod tests { assert_eq!(c1_stats.null_count, Some(1)); assert_eq!(c2_stats.null_count, Some(3)); - let stats = fetch_statistics(store.as_ref(), schema, &meta[1]).await?; + let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?; assert_eq!(stats.num_rows, Some(3)); let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1]; @@ -509,6 +575,172 @@ mod tests { Ok(()) } + #[derive(Debug)] + struct RequestCountingObjectStore { + inner: Arc, + request_count: AtomicUsize, + } + + impl Display for RequestCountingObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "RequestCounting({})", self.inner) + } + } + + impl RequestCountingObjectStore { + pub fn new(inner: Arc) -> Self { + Self { + inner, + request_count: Default::default(), + } + } + + pub fn request_count(&self) -> usize { + self.request_count.load(Ordering::SeqCst) + } + + pub fn upcast(self: &Arc) -> Arc { + self.clone() + } + } + + #[async_trait] + impl ObjectStore for RequestCountingObjectStore { + async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> { + Err(object_store::Error::NotImplemented) + } + + async fn get(&self, _location: &Path) -> object_store::Result { + Err(object_store::Error::NotImplemented) + } + + async fn get_range( + &self, + location: &Path, + range: Range, + ) -> object_store::Result { + self.request_count.fetch_add(1, Ordering::SeqCst); + self.inner.get_range(location, range).await + } + + async fn head(&self, _location: &Path) -> object_store::Result { + Err(object_store::Error::NotImplemented) + } + + async fn delete(&self, _location: &Path) -> object_store::Result<()> { + Err(object_store::Error::NotImplemented) + } + + async fn list( + &self, + _prefix: Option<&Path>, + ) -> object_store::Result>> + { + Err(object_store::Error::NotImplemented) + } + + async fn list_with_delimiter( + &self, + _prefix: Option<&Path>, + ) -> object_store::Result { + Err(object_store::Error::NotImplemented) + } + + async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { + Err(object_store::Error::NotImplemented) + } + + async fn copy_if_not_exists( + &self, + _from: &Path, + _to: &Path, + ) -> object_store::Result<()> { + Err(object_store::Error::NotImplemented) + } + } + + #[tokio::test] + async fn fetch_metadata_with_size_hint() -> Result<()> { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + + let store = Arc::new(RequestCountingObjectStore::new(Arc::new( + LocalFileSystem::new(), + ))); + let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; + + // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch + // for the remaining metadata + fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], Some(9)) + .await + .expect("error reading metadata with hint"); + + assert_eq!(store.request_count(), 2); + + let format = ParquetFormat::default().with_metadata_size_hint(9); + let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); + + let stats = + fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9)) + .await?; + + assert_eq!(stats.num_rows, Some(3)); + let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; + let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1]; + assert_eq!(c1_stats.null_count, Some(1)); + assert_eq!(c2_stats.null_count, Some(3)); + + let store = Arc::new(RequestCountingObjectStore::new(Arc::new( + LocalFileSystem::new(), + ))); + + // Use the file size as the hint so we can get the full metadata from the first fetch + let size_hint = meta[0].size; + + fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint)) + .await + .expect("error reading metadata with hint"); + + // ensure the requests were coalesced into a single request + assert_eq!(store.request_count(), 1); + + let format = ParquetFormat::default().with_metadata_size_hint(size_hint); + let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); + let stats = fetch_statistics( + store.upcast().as_ref(), + schema.clone(), + &meta[0], + Some(size_hint), + ) + .await?; + + assert_eq!(stats.num_rows, Some(3)); + let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; + let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1]; + assert_eq!(c1_stats.null_count, Some(1)); + assert_eq!(c2_stats.null_count, Some(3)); + + let store = Arc::new(RequestCountingObjectStore::new(Arc::new( + LocalFileSystem::new(), + ))); + + // Use the a size hint larger than the file size to make sure we don't panic + let size_hint = meta[0].size + 100; + + fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint)) + .await + .expect("error reading metadata with hint"); + + assert_eq!(store.request_count(), 1); + + Ok(()) + } + #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index e9e14abf6394..ec2a1355ff26 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -271,6 +271,7 @@ mod tests { table_partition_cols: vec![], }, None, + None, )) } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index cae92ddbb49c..b2239876f75a 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -75,6 +75,8 @@ pub struct ParquetExec { metrics: ExecutionPlanMetricsSet, /// Optional predicate for pruning row groups pruning_predicate: Option, + /// Optional hint for the size of the parquet metadata + metadata_size_hint: Option, } /// Stores metrics about the parquet execution for a particular parquet file @@ -90,7 +92,11 @@ struct ParquetFileMetrics { impl ParquetExec { /// Create a new Parquet reader execution plan provided file list and schema. - pub fn new(base_config: FileScanConfig, predicate: Option) -> Self { + pub fn new( + base_config: FileScanConfig, + predicate: Option, + metadata_size_hint: Option, + ) -> Self { debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", base_config.file_groups, base_config.projection, predicate, base_config.limit); @@ -120,6 +126,7 @@ impl ParquetExec { projected_statistics, metrics, pruning_predicate, + metadata_size_hint, } } @@ -212,6 +219,7 @@ impl ExecutionPlan for ParquetExec { batch_size: context.session_config().batch_size(), pruning_predicate: self.pruning_predicate.clone(), table_schema: self.base_config.file_schema.clone(), + metadata_size_hint: self.metadata_size_hint, metrics: self.metrics.clone(), }; @@ -266,6 +274,7 @@ struct ParquetOpener { batch_size: usize, pruning_predicate: Option, table_schema: SchemaRef, + metadata_size_hint: Option, metrics: ExecutionPlanMetricsSet, } @@ -285,6 +294,7 @@ impl FormatReader for ParquetOpener { let reader = ParquetFileReader { store, meta, + metadata_size_hint: self.metadata_size_hint, metrics: metrics.clone(), }; @@ -331,6 +341,7 @@ impl FormatReader for ParquetOpener { struct ParquetFileReader { store: Arc, meta: ObjectMeta, + metadata_size_hint: Option, metrics: ParquetFileMetrics, } @@ -353,14 +364,18 @@ impl AsyncFileReader for ParquetFileReader { &mut self, ) -> BoxFuture<'_, parquet::errors::Result>> { Box::pin(async move { - let metadata = fetch_parquet_metadata(self.store.as_ref(), &self.meta) - .await - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_metadata error: {}", - e - )) - })?; + let metadata = fetch_parquet_metadata( + self.store.as_ref(), + &self.meta, + self.metadata_size_hint, + ) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_metadata error: {}", + e + )) + })?; Ok(Arc::new(metadata)) }) } @@ -618,6 +633,7 @@ mod tests { table_partition_cols: vec![], }, predicate, + None, ); let session_ctx = SessionContext::new(); @@ -1004,6 +1020,7 @@ mod tests { table_partition_cols: vec![], }, None, + None, ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); let results = parquet_exec.execute(0, task_ctx)?.next().await; @@ -1103,6 +1120,7 @@ mod tests { ], }, None, + None, ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); @@ -1159,6 +1177,7 @@ mod tests { table_partition_cols: vec![], }, None, + None, ); let mut results = parquet_exec.execute(0, task_ctx)?;