diff --git a/parquet/src/file/metadata/thrift/encryption.rs b/parquet/src/file/metadata/thrift/encryption.rs index 9744f0f7a6b..4c926d0687c 100644 --- a/parquet/src/file/metadata/thrift/encryption.rs +++ b/parquet/src/file/metadata/thrift/encryption.rs @@ -144,10 +144,18 @@ fn row_group_from_encrypted_thrift( } Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => { let column_name = crypto_metadata.path_in_schema.join("."); - decryptor.get_column_metadata_decryptor( + // Try to get the decryptor - if it fails, we don't have the key + match decryptor.get_column_metadata_decryptor( column_name.as_str(), crypto_metadata.key_metadata.as_deref(), - )? + ) { + Ok(dec) => dec, + Err(_) => { + // We don't have the key for this column, so we can't decrypt its metadata. + columns.push(c); + continue; + } + } } Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => { decryptor.get_footer_decryptor()? diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 14774910961..a469ec680b1 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -1197,13 +1197,30 @@ pub(super) fn serialize_column_meta_data( if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset { last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?; } - // PageStatistics is the same as thrift Statistics, but writable - let stats = page_stats_to_thrift(column_chunk.statistics()); - if let Some(stats) = stats { - last_field_id = stats.write_thrift_field(w, 12, last_field_id)?; + + // Only write statistics to plaintext footer if column is not encrypted + + #[cfg(feature = "encryption")] + if column_chunk.crypto_metadata().is_none() { + // PageStatistics is the same as thrift Statistics, but writable + let stats = page_stats_to_thrift(column_chunk.statistics()); + if let Some(stats) = stats { + last_field_id = stats.write_thrift_field(w, 12, last_field_id)?; + } + if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() { + last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?; + } } - if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() { - last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?; + #[cfg(not(feature = "encryption"))] + { + // PageStatistics is the same as thrift Statistics, but writable + let stats = page_stats_to_thrift(column_chunk.statistics()); + if let Some(stats) = stats { + last_field_id = stats.write_thrift_field(w, 12, last_field_id)?; + } + if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() { + last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?; + } } if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset { last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?; @@ -1459,12 +1476,11 @@ impl WriteThrift for ColumnChunkMetaData { #[cfg(feature = "encryption")] { - // only write the ColumnMetaData if we haven't already encrypted it - if self.encrypted_column_metadata.is_none() { - writer.write_field_begin(FieldType::Struct, 3, last_field_id)?; - serialize_column_meta_data(self, writer)?; - last_field_id = 3; - } + // Always write the ColumnMetaData struct + // Statistics are conditionally excluded based on crypto_metadata in serialize_column_meta_data + writer.write_field_begin(FieldType::Struct, 3, last_field_id)?; + serialize_column_meta_data(self, writer)?; + last_field_id = 3; } #[cfg(not(feature = "encryption"))] { diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 124bc11bddc..4d32df556bb 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -758,13 +758,42 @@ impl MetadataObjectWriter { match column_chunk.column_crypto_metadata.as_deref() { None => {} Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => { + use crate::file::metadata::thrift::serialize_column_meta_data; // When uniform encryption is used the footer is already encrypted, // so the column chunk does not need additional encryption. + // Except if we're in plaintext footer mode, then we need to encrypt + // the column metadata here. + let is_footer_encrypted = file_encryptor.properties().encrypt_footer(); + + if !is_footer_encrypted { + // Temporarily clear crypto_metadata so statistics get included in encrypted blob + let crypto_metadata = column_chunk.column_crypto_metadata.take(); + let mut encryptor = file_encryptor.get_footer_encryptor()?; + let aad = create_module_aad( + file_encryptor.file_aad(), + ModuleType::ColumnMetaData, + row_group_index, + column_index, + None, + )?; + // create temp ColumnMetaData that we can encrypt + let mut buffer: Vec = vec![]; + { + let mut prot = ThriftCompactOutputProtocol::new(&mut buffer); + serialize_column_meta_data(&column_chunk, &mut prot)?; + } + let ciphertext = encryptor.encrypt(&buffer, &aad)?; + + column_chunk.column_crypto_metadata = crypto_metadata; + column_chunk.encrypted_column_metadata = Some(ciphertext); + } } Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => { use crate::file::metadata::thrift::serialize_column_meta_data; let column_path = col_key.path_in_schema.join("."); + // Temporarily clear crypto_metadata so statistics get included in encrypted blob + let crypto_metadata = column_chunk.column_crypto_metadata.take(); let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?; let aad = create_module_aad( file_encryptor.file_aad(), @@ -781,6 +810,7 @@ impl MetadataObjectWriter { } let ciphertext = column_encryptor.encrypt(&buffer, &aad)?; + column_chunk.column_crypto_metadata = crypto_metadata; column_chunk.encrypted_column_metadata = Some(ciphertext); } } diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index f999abab95d..6679dc3b9b2 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -34,7 +34,7 @@ use parquet::data_type::{ByteArray, ByteArrayType}; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::errors::ParquetError; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::writer::SerializedFileWriter; use parquet::schema::parser::parse_message_type; @@ -719,6 +719,132 @@ fn test_write_uniform_encryption_plaintext_footer() { ); } +#[test] +pub fn test_non_uniform_plaintext_encryption_behaviour() { + let footer_key = b"0123456789012345".to_vec(); // 128bit/16 + let column_key = b"1234567890123450".to_vec(); + + let encryption_properties = FileEncryptionProperties::builder(footer_key.clone()) + .with_plaintext_footer(true) + .with_column_key("x", column_key.clone()) + .with_column_key("y", column_key.clone()) + .build() + .unwrap(); + + let encryption_properties_footer_key = FileEncryptionProperties::builder(footer_key.clone()) + .with_plaintext_footer(true) + .build() + .unwrap(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.clone()) + .with_column_key("x", column_key.clone()) + .build() + .unwrap(); + + let decryption_properties_footer_key = FileDecryptionProperties::builder(footer_key.clone()) + .build() + .unwrap(); + + let props = WriterProperties::builder() + .with_file_encryption_properties(encryption_properties) + .build(); + + // Write partly encrypted data with plaintext footer + let values = Int32Array::from(vec![8, 3, 4, 19, 5]); + let values = Arc::new(values); + let schema = Arc::new(Schema::new(vec![ + Field::new("x", values.data_type().clone(), true), + Field::new("y", values.data_type().clone(), true), + Field::new("z", values.data_type().clone(), true), + ])); + let record_batches = vec![ + RecordBatch::try_new( + schema.clone(), + vec![values.clone(), values.clone(), values.clone()], + ) + .unwrap(), + ]; + + let temp_file = tempfile::tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&temp_file, schema.clone(), Some(props)).unwrap(); + for batch in record_batches.clone() { + writer.write(&batch).unwrap(); + } + let metadata = writer.close().unwrap(); + + let expected_min = 3i32.to_le_bytes(); + let expected_max = 19i32.to_le_bytes(); + + let check_column_stats = |column: &ColumnChunkMetaData, has_stats: bool| { + if has_stats { + assert!(column.page_encoding_stats().is_some()); + assert!(column.statistics().is_some()); + let column_stats = column.statistics().unwrap(); + assert_eq!(column_stats.min_bytes_opt(), Some(expected_min.as_slice())); + assert_eq!(column_stats.max_bytes_opt(), Some(expected_max.as_slice())); + } else { + assert!(column.statistics().is_none()); + assert!(column.page_encoding_stats().is_none()); + } + }; + + // Check column statistics produced at write time are available in full + let row_group = metadata.row_group(0); + check_column_stats(row_group.column(0), true); + check_column_stats(row_group.column(1), true); + check_column_stats(row_group.column(2), true); + + // Check column statistics are read given plaintext footer and available decryption properties + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); + let reader_metadata = ArrowReaderMetadata::load(&temp_file, options).unwrap(); + let metadata = reader_metadata.metadata(); + let row_group = metadata.row_group(0); + // Reader can read plaintext from the unencrypted column + // and column x for which the key is provided, but not column y + // for which no key is provided. + check_column_stats(row_group.column(0), true); + check_column_stats(row_group.column(1), false); + check_column_stats(row_group.column(2), true); + + let options = ArrowReaderOptions::default(); + let reader_metadata = ArrowReaderMetadata::load(&temp_file, options).unwrap(); + let metadata = reader_metadata.metadata(); + let row_group = metadata.row_group(0); + // Reader can only read plaintext from the unencrypted column if no key is provided + check_column_stats(row_group.column(0), false); + check_column_stats(row_group.column(1), false); + check_column_stats(row_group.column(2), true); + + // Check for the uniform encryption case + let props = WriterProperties::builder() + .with_file_encryption_properties(encryption_properties_footer_key) + .build(); + + let temp_file = tempfile::tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&temp_file, schema, Some(props)).unwrap(); + for batch in record_batches.clone() { + writer.write(&batch).unwrap(); + } + let metadata = writer.close().unwrap(); + + // Check column statistics produced at write time are available in full + check_column_stats(metadata.row_group(0).column(0), true); + + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties_footer_key); + let reader_metadata = ArrowReaderMetadata::load(&temp_file, options).unwrap(); + let metadata = reader_metadata.metadata(); + // Reader can read stats from plaintext footer metadata if a footer key is provided + check_column_stats(metadata.row_group(0).column(0), true); + + let options = ArrowReaderOptions::default(); + let reader_metadata = ArrowReaderMetadata::load(&temp_file, options).unwrap(); + let metadata = reader_metadata.metadata(); + // Reader can not read stats from plaintext footer metadata if no key is provided + check_column_stats(metadata.row_group(0).column(0), false); +} + #[test] fn test_write_uniform_encryption() { let testdata = arrow::util::test_util::parquet_test_data();