Skip to content
12 changes: 10 additions & 2 deletions parquet/src/file/metadata/thrift/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,18 @@ fn row_group_from_encrypted_thrift(
}
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
let column_name = crypto_metadata.path_in_schema.join(".");
decryptor.get_column_metadata_decryptor(
// Try to get the decryptor - if it fails, we don't have the key
match decryptor.get_column_metadata_decryptor(
column_name.as_str(),
crypto_metadata.key_metadata.as_deref(),
)?
) {
Ok(dec) => dec,
Err(_) => {
// We don't have the key for this column, so we can't decrypt its metadata.
columns.push(c);
continue;
}
}
}
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
decryptor.get_footer_decryptor()?
Expand Down
40 changes: 28 additions & 12 deletions parquet/src/file/metadata/thrift/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,13 +1197,30 @@ pub(super) fn serialize_column_meta_data<W: Write>(
if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
last_field_id = dictionary_page_offset.write_thrift_field(w, 11, last_field_id)?;
}
// PageStatistics is the same as thrift Statistics, but writable
let stats = page_stats_to_thrift(column_chunk.statistics());
if let Some(stats) = stats {
last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;

// Only write statistics to plaintext footer if column is not encrypted

#[cfg(feature = "encryption")]
if column_chunk.crypto_metadata().is_none() {
Copy link
Contributor

@adamreeve adamreeve Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contents of these two blocks is the same, so we could tidy this up by adding a new function to decide whether to write stats and remove the inline #[cfg()] here, eg:

#[cfg(feature = "encryption")]
fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
    // If there is encrypted column metadata present,
    // the column is encrypted with a different key to the footer or a plaintext footer is used,
    // so the statistics are sensitive and shouldn't be written.
    column_chunk.encrypted_column_metadata.is_none()
}

#[cfg(not(feature = "encryption"))]
fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
    true
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After refactoring the tests and adding some extra test cases, I realised this logic isn't correct in the case of uniform encryption with an encrypted footer. In that scenario, the column chunk has crypto_metadata present but no encrypted_column_metadata, and we do need to write the statistics here.

I think the fix is just to check for encrypted_column_metadata instead of crypto_metadata, like the code was doing previously. I've edited my comment above to address that.

// PageStatistics is the same as thrift Statistics, but writable
let stats = page_stats_to_thrift(column_chunk.statistics());
if let Some(stats) = stats {
last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
}
if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
}
}
if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
#[cfg(not(feature = "encryption"))]
{
// PageStatistics is the same as thrift Statistics, but writable
let stats = page_stats_to_thrift(column_chunk.statistics());
if let Some(stats) = stats {
last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
}
if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
last_field_id = page_encoding_stats.write_thrift_field(w, 13, last_field_id)?;
}
}
if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
last_field_id = bloom_filter_offset.write_thrift_field(w, 14, last_field_id)?;
Expand Down Expand Up @@ -1459,12 +1476,11 @@ impl WriteThrift for ColumnChunkMetaData {

#[cfg(feature = "encryption")]
{
// only write the ColumnMetaData if we haven't already encrypted it
if self.encrypted_column_metadata.is_none() {
writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
serialize_column_meta_data(self, writer)?;
last_field_id = 3;
}
// Always write the ColumnMetaData struct
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now duplicates the block below so these can be merged and the #[cfg... removed

// Statistics are conditionally excluded based on crypto_metadata in serialize_column_meta_data
writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
serialize_column_meta_data(self, writer)?;
last_field_id = 3;
}
#[cfg(not(feature = "encryption"))]
{
Expand Down
30 changes: 30 additions & 0 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,13 +758,42 @@ impl MetadataObjectWriter {
match column_chunk.column_crypto_metadata.as_deref() {
None => {}
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
use crate::file::metadata::thrift::serialize_column_meta_data;
// When uniform encryption is used the footer is already encrypted,
// so the column chunk does not need additional encryption.
// Except if we're in plaintext footer mode, then we need to encrypt
// the column metadata here.
let is_footer_encrypted = file_encryptor.properties().encrypt_footer();

if !is_footer_encrypted {
// Temporarily clear crypto_metadata so statistics get included in encrypted blob
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this step if we check for encrypted_column_metadata instead of crypto_metadata in serialize_column_meta_data (https://github.com/apache/arrow-rs/pull/8305/files#r2543948275)

let crypto_metadata = column_chunk.column_crypto_metadata.take();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nearly identical to the block below except for the different encryptors used. This should be able to be refactored to avoid this, eg. by changing the match to return an Option<Box<dyn BlockEncryptor>> and then have a separate match on that result that does the encryption.

let mut encryptor = file_encryptor.get_footer_encryptor()?;
let aad = create_module_aad(
file_encryptor.file_aad(),
ModuleType::ColumnMetaData,
row_group_index,
column_index,
None,
)?;
// create temp ColumnMetaData that we can encrypt
let mut buffer: Vec<u8> = vec![];
{
let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
serialize_column_meta_data(&column_chunk, &mut prot)?;
}
let ciphertext = encryptor.encrypt(&buffer, &aad)?;

column_chunk.column_crypto_metadata = crypto_metadata;
column_chunk.encrypted_column_metadata = Some(ciphertext);
}
}
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
use crate::file::metadata::thrift::serialize_column_meta_data;

let column_path = col_key.path_in_schema.join(".");
// Temporarily clear crypto_metadata so statistics get included in encrypted blob
let crypto_metadata = column_chunk.column_crypto_metadata.take();
let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
let aad = create_module_aad(
file_encryptor.file_aad(),
Expand All @@ -781,6 +810,7 @@ impl MetadataObjectWriter {
}
let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;

column_chunk.column_crypto_metadata = crypto_metadata;
column_chunk.encrypted_column_metadata = Some(ciphertext);
}
}
Expand Down
128 changes: 127 additions & 1 deletion parquet/tests/encryption/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use parquet::data_type::{ByteArray, ByteArrayType};
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;
use parquet::errors::ParquetError;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;
use parquet::schema::parser::parse_message_type;
Expand Down Expand Up @@ -719,6 +719,132 @@ fn test_write_uniform_encryption_plaintext_footer() {
);
}

#[test]
pub fn test_non_uniform_plaintext_encryption_behaviour() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this test a bit hard to follow, I'll see if I can suggest a better structure and make a PR to your branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR here: rok#6

let footer_key = b"0123456789012345".to_vec(); // 128bit/16
let column_key = b"1234567890123450".to_vec();

let encryption_properties = FileEncryptionProperties::builder(footer_key.clone())
.with_plaintext_footer(true)
.with_column_key("x", column_key.clone())
.with_column_key("y", column_key.clone())
.build()
.unwrap();

let encryption_properties_footer_key = FileEncryptionProperties::builder(footer_key.clone())
.with_plaintext_footer(true)
.build()
.unwrap();

let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
.with_column_key("x", column_key.clone())
.build()
.unwrap();

let decryption_properties_footer_key = FileDecryptionProperties::builder(footer_key.clone())
.build()
.unwrap();

let props = WriterProperties::builder()
.with_file_encryption_properties(encryption_properties)
.build();

// Write partly encrypted data with plaintext footer
let values = Int32Array::from(vec![8, 3, 4, 19, 5]);
let values = Arc::new(values);
let schema = Arc::new(Schema::new(vec![
Field::new("x", values.data_type().clone(), true),
Field::new("y", values.data_type().clone(), true),
Field::new("z", values.data_type().clone(), true),
]));
let record_batches = vec![
RecordBatch::try_new(
schema.clone(),
vec![values.clone(), values.clone(), values.clone()],
)
.unwrap(),
];

let temp_file = tempfile::tempfile().unwrap();
let mut writer = ArrowWriter::try_new(&temp_file, schema.clone(), Some(props)).unwrap();
for batch in record_batches.clone() {
writer.write(&batch).unwrap();
}
let metadata = writer.close().unwrap();

let expected_min = 3i32.to_le_bytes();
let expected_max = 19i32.to_le_bytes();

let check_column_stats = |column: &ColumnChunkMetaData, has_stats: bool| {
if has_stats {
assert!(column.page_encoding_stats().is_some());
assert!(column.statistics().is_some());
let column_stats = column.statistics().unwrap();
assert_eq!(column_stats.min_bytes_opt(), Some(expected_min.as_slice()));
assert_eq!(column_stats.max_bytes_opt(), Some(expected_max.as_slice()));
} else {
assert!(column.statistics().is_none());
assert!(column.page_encoding_stats().is_none());
}
};

// Check column statistics produced at write time are available in full
let row_group = metadata.row_group(0);
check_column_stats(row_group.column(0), true);
check_column_stats(row_group.column(1), true);
check_column_stats(row_group.column(2), true);

// Check column statistics are read given plaintext footer and available decryption properties
let options =
ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties);
let reader_metadata = ArrowReaderMetadata::load(&temp_file, options).unwrap();
let metadata = reader_metadata.metadata();
let row_group = metadata.row_group(0);
// Reader can read plaintext from the unencrypted column
// and column x for which the key is provided, but not column y
// for which no key is provided.
check_column_stats(row_group.column(0), true);
check_column_stats(row_group.column(1), false);
check_column_stats(row_group.column(2), true);

let options = ArrowReaderOptions::default();
let reader_metadata = ArrowReaderMetadata::load(&temp_file, options).unwrap();
let metadata = reader_metadata.metadata();
let row_group = metadata.row_group(0);
// Reader can only read plaintext from the unencrypted column if no key is provided
check_column_stats(row_group.column(0), false);
check_column_stats(row_group.column(1), false);
check_column_stats(row_group.column(2), true);

// Check for the uniform encryption case
let props = WriterProperties::builder()
.with_file_encryption_properties(encryption_properties_footer_key)
.build();

let temp_file = tempfile::tempfile().unwrap();
let mut writer = ArrowWriter::try_new(&temp_file, schema, Some(props)).unwrap();
for batch in record_batches.clone() {
writer.write(&batch).unwrap();
}
let metadata = writer.close().unwrap();

// Check column statistics produced at write time are available in full
check_column_stats(metadata.row_group(0).column(0), true);

let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties_footer_key);
let reader_metadata = ArrowReaderMetadata::load(&temp_file, options).unwrap();
let metadata = reader_metadata.metadata();
// Reader can read stats from plaintext footer metadata if a footer key is provided
check_column_stats(metadata.row_group(0).column(0), true);

let options = ArrowReaderOptions::default();
let reader_metadata = ArrowReaderMetadata::load(&temp_file, options).unwrap();
let metadata = reader_metadata.metadata();
// Reader can not read stats from plaintext footer metadata if no key is provided
check_column_stats(metadata.row_group(0).column(0), false);
}

#[test]
fn test_write_uniform_encryption() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down
Loading