Skip to content

Commit

Permalink
support def_level=1 but non-null column in reader
Browse files Browse the repository at this point in the history
  • Loading branch information
Ye Yuan committed May 18, 2024
1 parent a126d50 commit 7caee7c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 13 deletions.
56 changes: 43 additions & 13 deletions parquet_derive/src/parquet_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ impl Field {
let write_batch_expr = quote! {
let mut vals = Vec::new();
if let #column_reader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{#ident});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{#ident});
}
Expand Down Expand Up @@ -876,15 +882,21 @@ mod test {
snippet,
(quote! {
{
let mut vals = Vec::new();
if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ counter });
}
for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
r.counter = vals[i] as usize;
}
let mut vals = Vec::new();
if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{counter});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{counter});
}
for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
r.counter = vals[i] as usize;
}
}
})
.to_string()
Expand Down Expand Up @@ -1291,7 +1303,13 @@ mod test {
{
let mut vals = Vec::new();
if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{henceforth});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
}
Expand Down Expand Up @@ -1359,7 +1377,13 @@ mod test {
{
let mut vals = Vec::new();
if let ColumnReader::Int32ColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{henceforth});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
}
Expand Down Expand Up @@ -1427,7 +1451,13 @@ mod test {
{
let mut vals = Vec::new();
if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
let mut definition_levels = Vec::new();
let (total_num, valid_num, decoded_num) = typed.read_records(
num_records, Some(&mut definition_levels), None, &mut vals)?;
if valid_num != decoded_num {
panic!("Support only valid records, found {} null records in column type {}",
decoded_num - valid_num, stringify!{unique_id});
}
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
}
Expand Down
58 changes: 58 additions & 0 deletions parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ struct APartiallyCompleteRecord {
pub byte_vec: Vec<u8>,
}

// This struct has OPTIONAL columns
// If these fields are guaranteed to be valid
// we can load this struct into APartiallyCompleteRecord
#[derive(PartialEq, ParquetRecordWriter, Debug)]
struct APartiallyOptionalRecord {
pub bool: bool,
pub string: String,
pub maybe_i16: Option<i16>,
pub maybe_i32: Option<i32>,
pub maybe_u64: Option<u64>,
pub isize: isize,
pub float: f32,
pub double: f64,
pub now: chrono::NaiveDateTime,
pub date: chrono::NaiveDate,
pub byte_vec: Vec<u8>,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -218,6 +236,46 @@ mod tests {
assert_eq!(drs[0], out[0]);
}

#[test]
fn test_parquet_derive_read_optional_but_valid_column() {
let file = get_temp_file("test_parquet_derive_read_optional", &[]);
let drs: Vec<APartiallyOptionalRecord> = vec![APartiallyOptionalRecord {
bool: true,
string: "a string".into(),
maybe_i16: Some(-45),
maybe_i32: Some(456),
maybe_u64: Some(4563424),
isize: -365,
float: 3.5,
double: std::f64::NAN,
now: chrono::Utc::now().naive_local(),
date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(),
byte_vec: vec![0x65, 0x66, 0x67],
}];

let generated_schema = drs.as_slice().schema().unwrap();

let props = Default::default();
let mut writer =
SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap();

let mut row_group = writer.next_row_group().unwrap();
drs.as_slice().write_to_row_group(&mut row_group).unwrap();
row_group.close().unwrap();
writer.close().unwrap();

use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader};
let reader = SerializedFileReader::new(file).unwrap();
let mut out: Vec<APartiallyCompleteRecord> = Vec::new();

let mut row_group = reader.get_row_group(0).unwrap();
out.read_from_row_group(&mut *row_group, 1).unwrap();

assert_eq!(drs[0].maybe_i16.unwrap(), out[0].i16);
assert_eq!(drs[0].maybe_i32.unwrap(), out[0].i32);
assert_eq!(drs[0].maybe_u64.unwrap(), out[0].u64);
}

/// Returns file handle for a temp file in 'target' directory with a provided content
pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File {
// build tmp path to a file in "target/debug/testdata"
Expand Down

0 comments on commit 7caee7c

Please sign in to comment.