diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 44475d0e92..3f77335444 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -1,6 +1,7 @@ //! Command for converting a Parquet table to a Delta table in place // https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +use crate::operations::write::get_num_idx_cols_and_stats_columns; use crate::{ kernel::{Add, DataType, Schema, StructField}, logstore::{LogStore, LogStoreRef}, @@ -8,6 +9,7 @@ use crate::{ protocol::SaveMode, table::builder::ensure_table_uri, table::config::DeltaConfigKey, + writer::stats::stats_from_parquet_metadata, DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH, }; use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; @@ -15,6 +17,7 @@ use futures::{ future::{self, BoxFuture}, TryStreamExt, }; +use indexmap::IndexMap; use parquet::{ arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}, errors::ParquetError, @@ -284,6 +287,10 @@ impl ConvertToDeltaBuilder { // A vector of StructField of all unique partition columns in a Parquet table let mut partition_schema_fields = HashMap::new(); + // Obtain settings on which columns to skip collecting stats on if any + let (num_indexed_cols, stats_columns) = + get_num_idx_cols_and_stats_columns(None, self.configuration.clone()); + for file in files { // A HashMap from partition column to value for this parquet file only let mut partition_values = HashMap::new(); @@ -328,6 +335,24 @@ impl ConvertToDeltaBuilder { subpath = iter.next(); } + let batch_builder = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new( + object_store.clone(), + file.clone(), + )) + .await?; + + // Fetch the stats + let parquet_metadata = batch_builder.metadata(); + let stats = stats_from_parquet_metadata( + &IndexMap::from_iter(partition_values.clone().into_iter()), + parquet_metadata.as_ref(), + num_indexed_cols, + &stats_columns, + ) + .map_err(|e| Error::DeltaTable(e.into()))?; + let stats_string = + serde_json::to_string(&stats).map_err(|e| Error::DeltaTable(e.into()))?; + actions.push( Add { path: percent_decode_str(file.location.as_ref()) @@ -349,19 +374,13 @@ impl ConvertToDeltaBuilder { .collect(), modification_time: file.last_modified.timestamp_millis(), data_change: true, + stats: Some(stats_string), ..Default::default() } .into(), ); - let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new( - object_store.clone(), - file, - )) - .await? - .schema() - .as_ref() - .clone(); + let mut arrow_schema = batch_builder.schema().as_ref().clone(); // Arrow schema of Parquet files may have conflicting metatdata // Since Arrow schema metadata is not used to generate Delta table schema, we set the metadata field to an empty HashMap @@ -584,6 +603,15 @@ mod tests { "part-00000-d22c627d-9655-4153-9527-f8995620fa42-c000.snappy.parquet" ); + let Some(Scalar::Struct(min_values, _)) = action.min_values() else { + panic!("Missing min values"); + }; + assert_eq!(min_values, vec![Scalar::Date(18628), Scalar::Integer(1)]); + let Some(Scalar::Struct(max_values, _)) = action.max_values() else { + panic!("Missing max values"); + }; + assert_eq!(max_values, vec![Scalar::Date(18632), Scalar::Integer(5)]); + assert_delta_table( table, path, diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 849179a973..2bf7dd2abd 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -4,6 +4,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, ops::AddAssign}; use indexmap::IndexMap; +use parquet::file::metadata::ParquetMetaData; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{basic::LogicalType, errors::ParquetError}; @@ -66,6 +67,32 @@ pub fn create_add( }) } +// As opposed to `stats_from_file_metadata` which operates on `parquet::format::FileMetaData`, +// this function produces the stats by reading the metadata from already written out files. +// +// Note that the file metadata used here is actually `parquet::file::metadata::FileMetaData` +// which is a thrift decoding of the `parquet::format::FileMetaData` which is typically obtained +// when flushing the write. +pub(crate) fn stats_from_parquet_metadata( + partition_values: &IndexMap, + parquet_metadata: &ParquetMetaData, + num_indexed_cols: i32, + stats_columns: &Option>, +) -> Result { + let num_rows = parquet_metadata.file_metadata().num_rows(); + let schema_descriptor = parquet_metadata.file_metadata().schema_descr_ptr(); + let row_group_metadata = parquet_metadata.row_groups().to_vec(); + + stats_from_metadata( + partition_values, + schema_descriptor, + row_group_metadata, + num_rows, + num_indexed_cols, + stats_columns, + ) +} + fn stats_from_file_metadata( partition_values: &IndexMap, file_metadata: &FileMetaData, @@ -75,27 +102,46 @@ fn stats_from_file_metadata( let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?; - let mut min_values: HashMap = HashMap::new(); - let mut max_values: HashMap = HashMap::new(); - let mut null_count: HashMap = HashMap::new(); - - let row_group_metadata: Result, ParquetError> = file_metadata + let row_group_metadata: Vec = file_metadata .row_groups .iter() .map(|rg| RowGroupMetaData::from_thrift(schema_descriptor.clone(), rg.clone())) - .collect(); - let row_group_metadata = row_group_metadata?; - let schema_cols = file_metadata - .schema - .iter() - .map(|v| &v.name) - .collect::>(); + .collect::, ParquetError>>()?; + + stats_from_metadata( + partition_values, + schema_descriptor, + row_group_metadata, + file_metadata.num_rows, + num_indexed_cols, + stats_columns, + ) +} + +fn stats_from_metadata( + partition_values: &IndexMap, + schema_descriptor: Arc, + row_group_metadata: Vec, + num_rows: i64, + num_indexed_cols: i32, + stats_columns: &Option>, +) -> Result { + let mut min_values: HashMap = HashMap::new(); + let mut max_values: HashMap = HashMap::new(); + let mut null_count: HashMap = HashMap::new(); let idx_to_iterate = if let Some(stats_cols) = stats_columns { - stats_cols - .iter() - .map(|col| schema_cols[1..].iter().position(|value| *value == col)) - .flatten() + schema_descriptor + .columns() + .into_iter() + .enumerate() + .filter_map(|(index, col)| { + if stats_cols.contains(&col.name().to_string()) { + Some(index) + } else { + None + } + }) .collect() } else if num_indexed_cols == -1 { (0..schema_descriptor.num_columns()).collect::>() @@ -149,7 +195,7 @@ fn stats_from_file_metadata( Ok(Stats { min_values, max_values, - num_records: file_metadata.num_rows, + num_records: num_rows, null_count, }) } @@ -262,18 +308,8 @@ impl StatsScalar { v.max_bytes() }; - let val = if val.len() <= 4 { - let mut bytes = [0; 4]; - bytes[..val.len()].copy_from_slice(val); - i32::from_be_bytes(bytes) as f64 - } else if val.len() <= 8 { - let mut bytes = [0; 8]; - bytes[..val.len()].copy_from_slice(val); - i64::from_be_bytes(bytes) as f64 - } else if val.len() <= 16 { - let mut bytes = [0; 16]; - bytes[..val.len()].copy_from_slice(val); - i128::from_be_bytes(bytes) as f64 + let val = if val.len() <= 16 { + i128::from_be_bytes(sign_extend_be(val)) as f64 } else { return Err(DeltaWriterError::StatsParsingFailed { debug_value: format!("{val:?}"), @@ -315,6 +351,19 @@ impl StatsScalar { } } +/// Performs big endian sign extension +/// Copied from arrow-rs repo/parquet crate: +/// https://github.com/apache/arrow-rs/blob/b25c441745602c9967b1e3cc4a28bc469cfb1311/parquet/src/arrow/buffer/bit_util.rs#L54 +pub fn sign_extend_be(b: &[u8]) -> [u8; N] { + assert!(b.len() <= N, "Array too large, expected less than {N}"); + let is_negative = (b[0] & 128u8) == 128u8; + let mut result = if is_negative { [255u8; N] } else { [0u8; N] }; + for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) { + *d = *s; + } + result +} + impl From for serde_json::Value { fn from(scalar: StatsScalar) -> Self { match scalar { @@ -653,6 +702,17 @@ mod tests { }), Value::from(1243124142314.423), ), + ( + simple_parquet_stat!( + Statistics::FixedLenByteArray, + FixedLenByteArray::from(vec![0, 39, 16]) + ), + Some(LogicalType::Decimal { + scale: 3, + precision: 5, + }), + Value::from(10.0), + ), ( simple_parquet_stat!( Statistics::FixedLenByteArray,