From c3620c8ae2f5f0030be831732200dbf82c515977 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 8 May 2024 17:34:00 +0200 Subject: [PATCH 1/4] Add stats to convert-to-delta operation --- .../core/src/operations/convert_to_delta.rs | 36 ++++++++++--- crates/core/src/writer/stats.rs | 52 ++++++++++++++++--- 2 files changed, 72 insertions(+), 16 deletions(-) diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 44475d0e92..7124dede43 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -8,6 +8,7 @@ use crate::{ protocol::SaveMode, table::builder::ensure_table_uri, table::config::DeltaConfigKey, + writer::stats::stats_from_parquet_metadata, DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH, }; use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; @@ -15,6 +16,7 @@ use futures::{ future::{self, BoxFuture}, TryStreamExt, }; +use indexmap::IndexMap; use parquet::{ arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}, errors::ParquetError, @@ -328,6 +330,21 @@ impl ConvertToDeltaBuilder { subpath = iter.next(); } + let batch_builder = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new( + object_store.clone(), + file.clone(), + )) + .await?; + + // Fetch the stats + let parquet_metadata = batch_builder.metadata(); + let stats = stats_from_parquet_metadata( + &IndexMap::from_iter(partition_values.clone().into_iter()), + parquet_metadata.as_ref(), + ) + .unwrap(); + let stats_string = serde_json::to_string(&stats).unwrap(); + actions.push( Add { path: percent_decode_str(file.location.as_ref()) @@ -349,19 +366,13 @@ impl ConvertToDeltaBuilder { .collect(), modification_time: file.last_modified.timestamp_millis(), data_change: true, + stats: Some(stats_string), ..Default::default() } .into(), ); - let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new( - object_store.clone(), - file, - )) - .await? - .schema() - .as_ref() - .clone(); + let mut arrow_schema = batch_builder.schema().as_ref().clone(); // Arrow schema of Parquet files may have conflicting metatdata // Since Arrow schema metadata is not used to generate Delta table schema, we set the metadata field to an empty HashMap @@ -584,6 +595,15 @@ mod tests { "part-00000-d22c627d-9655-4153-9527-f8995620fa42-c000.snappy.parquet" ); + let Some(Scalar::Struct(min_values, _)) = action.min_values() else { + panic!("Missing min values"); + }; + assert_eq!(min_values, vec![Scalar::Date(18628), Scalar::Integer(1)]); + let Some(Scalar::Struct(max_values, _)) = action.max_values() else { + panic!("Missing max values"); + }; + assert_eq!(max_values, vec![Scalar::Date(18632), Scalar::Integer(5)]); + assert_delta_table( table, path, diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 312de6f9e3..ceee5cbdb5 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -3,6 +3,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, ops::AddAssign}; use indexmap::IndexMap; +use parquet::file::metadata::ParquetMetaData; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{basic::LogicalType, errors::ParquetError}; @@ -58,6 +59,28 @@ pub fn create_add( }) } +// As opposed to `stats_from_file_metadata` which operates on `parquet::format::FileMetaData`, +// this function produces the stats by reading the metadata from already written out files. +// +// Note that the file metadata used here is actually `parquet::file::metadata::FileMetaData` +// which is a thrift decoding of the `parquet::format::FileMetaData` which is typically obtained +// when flushing the write. +pub(crate) fn stats_from_parquet_metadata( + partition_values: &IndexMap, + parquet_metadata: &ParquetMetaData, +) -> Result { + let num_rows = parquet_metadata.file_metadata().num_rows(); + let schema_descriptor = parquet_metadata.file_metadata().schema_descr_ptr(); + let row_group_metadata = parquet_metadata.row_groups().to_vec(); + + stats_from_metadata( + partition_values, + schema_descriptor, + row_group_metadata, + num_rows, + ) +} + fn stats_from_file_metadata( partition_values: &IndexMap, file_metadata: &FileMetaData, @@ -65,16 +88,29 @@ fn stats_from_file_metadata( let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?; - let mut min_values: HashMap = HashMap::new(); - let mut max_values: HashMap = HashMap::new(); - let mut null_count: HashMap = HashMap::new(); - - let row_group_metadata: Result, ParquetError> = file_metadata + let row_group_metadata: Vec = file_metadata .row_groups .iter() .map(|rg| RowGroupMetaData::from_thrift(schema_descriptor.clone(), rg.clone())) - .collect(); - let row_group_metadata = row_group_metadata?; + .collect::, ParquetError>>()?; + + stats_from_metadata( + partition_values, + schema_descriptor, + row_group_metadata, + file_metadata.num_rows, + ) +} + +fn stats_from_metadata( + partition_values: &IndexMap, + schema_descriptor: Arc, + row_group_metadata: Vec, + num_rows: i64, +) -> Result { + let mut min_values: HashMap = HashMap::new(); + let mut max_values: HashMap = HashMap::new(); + let mut null_count: HashMap = HashMap::new(); for i in 0..schema_descriptor.num_columns() { let column_descr = schema_descriptor.column(i); @@ -118,7 +154,7 @@ fn stats_from_file_metadata( Ok(Stats { min_values, max_values, - num_records: file_metadata.num_rows, + num_records: num_rows, null_count, }) } From 3eccdef7940b4cb5c46617feeb649ffb5d6d591e Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 8 May 2024 21:59:11 +0200 Subject: [PATCH 2/4] Replace unwraps with error maps --- crates/core/src/operations/convert_to_delta.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 7124dede43..cf4c013585 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -342,8 +342,9 @@ impl ConvertToDeltaBuilder { &IndexMap::from_iter(partition_values.clone().into_iter()), parquet_metadata.as_ref(), ) - .unwrap(); - let stats_string = serde_json::to_string(&stats).unwrap(); + .map_err(|e| Error::DeltaTable(e.into()))?; + let stats_string = + serde_json::to_string(&stats).map_err(|e| Error::DeltaTable(e.into()))?; actions.push( Add { From 5972aab07723fe11243c017f1938b96b70d45810 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 9 May 2024 12:14:58 +0200 Subject: [PATCH 3/4] Fix Decimal stats parsing error from FixedLenByteArray --- crates/core/src/writer/stats.rs | 38 ++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index ceee5cbdb5..d3bad69f65 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -267,18 +267,8 @@ impl StatsScalar { v.max_bytes() }; - let val = if val.len() <= 4 { - let mut bytes = [0; 4]; - bytes[..val.len()].copy_from_slice(val); - i32::from_be_bytes(bytes) as f64 - } else if val.len() <= 8 { - let mut bytes = [0; 8]; - bytes[..val.len()].copy_from_slice(val); - i64::from_be_bytes(bytes) as f64 - } else if val.len() <= 16 { - let mut bytes = [0; 16]; - bytes[..val.len()].copy_from_slice(val); - i128::from_be_bytes(bytes) as f64 + let val = if val.len() <= 16 { + i128::from_be_bytes(sign_extend_be(val)) as f64 } else { return Err(DeltaWriterError::StatsParsingFailed { debug_value: format!("{val:?}"), @@ -320,6 +310,19 @@ impl StatsScalar { } } +/// Performs big endian sign extension +/// Copied from arrow-rs repo/parquet crate: +/// https://github.com/apache/arrow-rs/blob/b25c441745602c9967b1e3cc4a28bc469cfb1311/parquet/src/arrow/buffer/bit_util.rs#L54 +pub fn sign_extend_be(b: &[u8]) -> [u8; N] { + assert!(b.len() <= N, "Array too large, expected less than {N}"); + let is_negative = (b[0] & 128u8) == 128u8; + let mut result = if is_negative { [255u8; N] } else { [0u8; N] }; + for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) { + *d = *s; + } + result +} + impl From for serde_json::Value { fn from(scalar: StatsScalar) -> Self { match scalar { @@ -658,6 +661,17 @@ mod tests { }), Value::from(1243124142314.423), ), + ( + simple_parquet_stat!( + Statistics::FixedLenByteArray, + FixedLenByteArray::from(vec![0, 39, 16]) + ), + Some(LogicalType::Decimal { + scale: 3, + precision: 5, + }), + Value::from(10.0), + ), ( simple_parquet_stat!( Statistics::FixedLenByteArray, From 1191aa50e0a84cb7cb63fa4f1f9b9dce748c152c Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 14 May 2024 18:31:52 +0200 Subject: [PATCH 4/4] Pick up config options on which columns to skip --- crates/core/src/operations/convert_to_delta.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 19b27268e0..3f77335444 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -1,6 +1,7 @@ //! Command for converting a Parquet table to a Delta table in place // https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +use crate::operations::write::get_num_idx_cols_and_stats_columns; use crate::{ kernel::{Add, DataType, Schema, StructField}, logstore::{LogStore, LogStoreRef}, @@ -286,6 +287,10 @@ impl ConvertToDeltaBuilder { // A vector of StructField of all unique partition columns in a Parquet table let mut partition_schema_fields = HashMap::new(); + // Obtain settings on which columns to skip collecting stats on if any + let (num_indexed_cols, stats_columns) = + get_num_idx_cols_and_stats_columns(None, self.configuration.clone()); + for file in files { // A HashMap from partition column to value for this parquet file only let mut partition_values = HashMap::new(); @@ -341,8 +346,8 @@ impl ConvertToDeltaBuilder { let stats = stats_from_parquet_metadata( &IndexMap::from_iter(partition_values.clone().into_iter()), parquet_metadata.as_ref(), - -1, - &None, + num_indexed_cols, + &stats_columns, ) .map_err(|e| Error::DeltaTable(e.into()))?; let stats_string =