From 7caee7c7a10f6b87c1de48492dfbcc6b34482933 Mon Sep 17 00:00:00 2001 From: Ye Yuan Date: Sat, 18 May 2024 23:25:43 +0800 Subject: [PATCH] support def_level=1 but non-null column in reader --- parquet_derive/src/parquet_field.rs | 56 +++++++++++++++++++++------- parquet_derive_test/src/lib.rs | 58 +++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 13 deletions(-) diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 9fff76c42d1d..ee1fa49ddaa2 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -248,7 +248,13 @@ impl Field { let write_batch_expr = quote! { let mut vals = Vec::new(); if let #column_reader(mut typed) = column_reader { - typed.read_records(num_records, None, None, &mut vals)?; + let mut definition_levels = Vec::new(); + let (total_num, valid_num, decoded_num) = typed.read_records( + num_records, Some(&mut definition_levels), None, &mut vals)?; + if valid_num != decoded_num { + panic!("Support only valid records, found {} null records in column type {}", + decoded_num - valid_num, stringify!{#ident}); + } } else { panic!("Schema and struct disagree on type for {}", stringify!{#ident}); } @@ -876,15 +882,21 @@ mod test { snippet, (quote! { { - let mut vals = Vec::new(); - if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, &mut vals)?; - } else { - panic!("Schema and struct disagree on type for {}", stringify!{ counter }); - } - for (i, r) in &mut records[..num_records].iter_mut().enumerate() { - r.counter = vals[i] as usize; - } + let mut vals = Vec::new(); + if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { + let mut definition_levels = Vec::new(); + let (total_num, valid_num, decoded_num) = typed.read_records( + num_records, Some(&mut definition_levels), None, &mut vals)?; + if valid_num != decoded_num { + panic!("Support only valid records, found {} null records in column type {}", + decoded_num - valid_num, stringify!{counter}); + } + } else { + panic!("Schema and struct disagree on type for {}", stringify!{counter}); + } + for (i, r) in &mut records[..num_records].iter_mut().enumerate() { + r.counter = vals[i] as usize; + } } }) .to_string() @@ -1291,7 +1303,13 @@ mod test { { let mut vals = Vec::new(); if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, &mut vals)?; + let mut definition_levels = Vec::new(); + let (total_num, valid_num, decoded_num) = typed.read_records( + num_records, Some(&mut definition_levels), None, &mut vals)?; + if valid_num != decoded_num { + panic!("Support only valid records, found {} null records in column type {}", + decoded_num - valid_num, stringify!{henceforth}); + } } else { panic!("Schema and struct disagree on type for {}", stringify!{ henceforth }); } @@ -1359,7 +1377,13 @@ mod test { { let mut vals = Vec::new(); if let ColumnReader::Int32ColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, &mut vals)?; + let mut definition_levels = Vec::new(); + let (total_num, valid_num, decoded_num) = typed.read_records( + num_records, Some(&mut definition_levels), None, &mut vals)?; + if valid_num != decoded_num { + panic!("Support only valid records, found {} null records in column type {}", + decoded_num - valid_num, stringify!{henceforth}); + } } else { panic!("Schema and struct disagree on type for {}", stringify!{ henceforth }); } @@ -1427,7 +1451,13 @@ mod test { { let mut vals = Vec::new(); if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, &mut vals)?; + let mut definition_levels = Vec::new(); + let (total_num, valid_num, decoded_num) = typed.read_records( + num_records, Some(&mut definition_levels), None, &mut vals)?; + if valid_num != decoded_num { + panic!("Support only valid records, found {} null records in column type {}", + decoded_num - valid_num, stringify!{unique_id}); + } } else { panic!("Schema and struct disagree on type for {}", stringify!{ unique_id }); } diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 3743c6b55c7c..0860359b2df8 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -66,6 +66,24 @@ struct APartiallyCompleteRecord { pub byte_vec: Vec, } +// This struct has OPTIONAL columns +// If these fields are guaranteed to be valid +// we can load this struct into APartiallyCompleteRecord +#[derive(PartialEq, ParquetRecordWriter, Debug)] +struct APartiallyOptionalRecord { + pub bool: bool, + pub string: String, + pub maybe_i16: Option, + pub maybe_i32: Option, + pub maybe_u64: Option, + pub isize: isize, + pub float: f32, + pub double: f64, + pub now: chrono::NaiveDateTime, + pub date: chrono::NaiveDate, + pub byte_vec: Vec, +} + #[cfg(test)] mod tests { use super::*; @@ -218,6 +236,46 @@ mod tests { assert_eq!(drs[0], out[0]); } + #[test] + fn test_parquet_derive_read_optional_but_valid_column() { + let file = get_temp_file("test_parquet_derive_read_optional", &[]); + let drs: Vec = vec![APartiallyOptionalRecord { + bool: true, + string: "a string".into(), + maybe_i16: Some(-45), + maybe_i32: Some(456), + maybe_u64: Some(4563424), + isize: -365, + float: 3.5, + double: std::f64::NAN, + now: chrono::Utc::now().naive_local(), + date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(), + byte_vec: vec![0x65, 0x66, 0x67], + }]; + + let generated_schema = drs.as_slice().schema().unwrap(); + + let props = Default::default(); + let mut writer = + SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap(); + + let mut row_group = writer.next_row_group().unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + row_group.close().unwrap(); + writer.close().unwrap(); + + use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader}; + let reader = SerializedFileReader::new(file).unwrap(); + let mut out: Vec = Vec::new(); + + let mut row_group = reader.get_row_group(0).unwrap(); + out.read_from_row_group(&mut *row_group, 1).unwrap(); + + assert_eq!(drs[0].maybe_i16.unwrap(), out[0].i16); + assert_eq!(drs[0].maybe_i32.unwrap(), out[0].i32); + assert_eq!(drs[0].maybe_u64.unwrap(), out[0].u64); + } + /// Returns file handle for a temp file in 'target' directory with a provided content pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File { // build tmp path to a file in "target/debug/testdata"