From aac1844ce707e3744595472b0357c101a91b5749 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Sat, 29 Jan 2022 15:53:10 +0000 Subject: [PATCH] Fix NullArrayReader (#1245) (#1246) --- parquet/src/arrow/array_reader.rs | 4 +++ parquet/src/arrow/arrow_reader.rs | 42 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index 01e54f67fa6..e8d3dffb6fe 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -214,6 +214,10 @@ where // save definition and repetition buffers self.def_levels_buffer = self.record_reader.consume_def_levels()?; self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + + // Must consume bitmap buffer + self.record_reader.consume_bitmap_buffer()?; + self.record_reader.reset(); Ok(Arc::new(array)) } diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 259a3c08e58..0dbc11853a3 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -327,6 +327,48 @@ mod tests { compare_batch_json(&mut record_batch_reader, projected_json_values, max_len); } + #[test] + fn test_null_column_reader_test() { + let mut file = tempfile::tempfile().unwrap(); + + let schema = " + message message { + OPTIONAL INT32 int32; + } + "; + let schema = Arc::new(parse_message_type(schema).unwrap()); + + let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]]; + generate_single_column_file_with_data::( + &[vec![], vec![]], + Some(&def_levels), + file.try_clone().unwrap(), // Cannot use &mut File (#1163) + schema, + Some(Field::new("int32", ArrowDataType::Null, true)), + &Default::default(), + ) + .unwrap(); + + file.rewind().unwrap(); + + let parquet_reader = SerializedFileReader::try_from(file).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); + let record_reader = arrow_reader.get_record_reader(2).unwrap(); + + let batches = record_reader.collect::>>().unwrap(); + + assert_eq!(batches.len(), 4); + for batch in &batches[0..3] { + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.column(0).null_count(), 2); + } + + assert_eq!(batches[3].num_rows(), 1); + assert_eq!(batches[3].num_columns(), 1); + assert_eq!(batches[3].column(0).null_count(), 1); + } + #[test] fn test_primitive_single_column_reader_test() { run_single_column_reader_tests::(