Skip to content

Commit

Permalink
ARROW-7932: [Rust] implement array_reader for temporal types
Browse files Browse the repository at this point in the history
Add date/time/timestamp/interval/duration LogicalType support to array_reader

Closes #6489 from mcassels/read_datetime_from_parquet and squashes the following commits:

715e3e1 <Morgan Cassels> code cleanup
4f50624 <Morgan Cassels> cargo fmt
e27a01b <Morgan Cassels> finish adding tests
0970647 <Morgan Cassels> add tests
c93396f <Morgan Cassels> support reading datetime arrow types from parquet

Authored-by: Morgan Cassels <morgan@urbanlogiq.com>
Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
  • Loading branch information
Morgan Cassels authored and nealrichardson committed Mar 4, 2020
1 parent d60b6d7 commit 4b83c45
Showing 1 changed file with 158 additions and 4 deletions.
162 changes: 158 additions & 4 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use arrow::array::{
Int16BufferBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{DataType as ArrowType, Field};
use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit};

use crate::arrow::converter::{
BinaryConverter, BoolConverter, Converter, Float32Converter, Float64Converter,
Expand Down Expand Up @@ -217,6 +217,54 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
&mut RecordReader<DoubleType>,
>(&mut self.record_reader))
},
(ArrowType::Timestamp(_, _), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Date32(_), PhysicalType::INT32) => unsafe {
UInt32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Date64(_), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Time32(_), PhysicalType::INT32) => unsafe {
UInt32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Time64(_), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => unsafe {
UInt32Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int32Type>,
>(&mut self.record_reader))
},
(ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(ArrowType::Duration(_), PhysicalType::INT64) => unsafe {
UInt64Converter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<Int64Type>,
>(&mut self.record_reader))
},
(arrow_type, physical_type) => Err(general_err!(
"Reading {:?} type from parquet {:?} is not supported yet.",
arrow_type,
Expand Down Expand Up @@ -902,17 +950,20 @@ mod tests {
use crate::arrow::array_reader::{
build_array_reader, ArrayReader, PrimitiveArrayReader, StructArrayReader,
};
use crate::basic::Encoding;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::Page;
use crate::data_type::{DataType, Int32Type};
use crate::data_type::{DataType, Int32Type, Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
use crate::util::test_common::page_util::InMemoryPageIterator;
use crate::util::test_common::{get_test_file, make_pages};
use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray};
use arrow::datatypes::{DataType as ArrowType, Field, Int32Type as ArrowInt32};
use arrow::datatypes::{
DataType as ArrowType, Field, Int32Type as ArrowInt32, UInt32Type as ArrowUInt32,
UInt64Type as ArrowUInt64,
};
use rand::distributions::range::SampleRange;
use std::any::Any;
use std::collections::VecDeque;
Expand Down Expand Up @@ -1050,6 +1101,109 @@ mod tests {
}
}

macro_rules! test_primitive_array_reader_one_type {
($arrow_parquet_type:ty, $physical_type:expr, $logical_type_str:expr, $result_arrow_type:ty, $result_primitive_type:ty) => {{
let message_type = format!(
"
message test_schema {{
REQUIRED {:?} leaf ({});
}}
",
$physical_type, $logical_type_str
);
let schema = parse_message_type(&message_type)
.map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t))))
.unwrap();

let column_desc = schema.column(0);

// Construct page iterator
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chuncks::<$arrow_parquet_type>(
column_desc.clone(),
Encoding::PLAIN,
100,
1,
200,
&mut Vec::new(),
&mut Vec::new(),
&mut data,
&mut page_lists,
true,
2,
);
let page_iterator = InMemoryPageIterator::new(
schema.clone(),
column_desc.clone(),
page_lists,
);
let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
Box::new(page_iterator),
column_desc.clone(),
)
.unwrap();

let array = array_reader.next_batch(50).unwrap();

let array = array
.as_any()
.downcast_ref::<PrimitiveArray<$result_arrow_type>>()
.unwrap();

assert_eq!(
&PrimitiveArray::<$result_arrow_type>::from(
data[0..50]
.iter()
.map(|x| *x as $result_primitive_type)
.collect::<Vec<$result_primitive_type>>()
),
array
);
}
}};
}

#[test]
fn test_primitive_array_reader_temporal_types() {
test_primitive_array_reader_one_type!(
Int32Type,
PhysicalType::INT32,
"DATE",
ArrowUInt32,
u32
);
test_primitive_array_reader_one_type!(
Int32Type,
PhysicalType::INT32,
"TIME_MILLIS",
ArrowUInt32,
u32
);
test_primitive_array_reader_one_type!(
Int64Type,
PhysicalType::INT64,
"TIME_MICROS",
ArrowUInt64,
u64
);
test_primitive_array_reader_one_type!(
Int64Type,
PhysicalType::INT64,
"TIMESTAMP_MILLIS",
ArrowUInt64,
u64
);
test_primitive_array_reader_one_type!(
Int64Type,
PhysicalType::INT64,
"TIMESTAMP_MICROS",
ArrowUInt64,
u64
);
}

#[test]
fn test_primitive_array_reader_def_and_rep_levels() {
// Construct column schema
Expand Down

0 comments on commit 4b83c45

Please sign in to comment.