Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rust/arrow/src/array/equal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ impl PartialEq for BooleanArray {
}
}

impl PartialEq for StringArray {
fn eq(&self, other: &Self) -> bool {
self.equals(other)
}
}

impl ArrayEqual for ListArray {
fn equals(&self, other: &dyn Array) -> bool {
if !base_equal(&self.data(), &other.data()) {
Expand Down
200 changes: 190 additions & 10 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::cmp::min;
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::mem::size_of;
Expand All @@ -34,14 +34,15 @@ use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{DataType as ArrowType, Field};

use crate::arrow::converter::{
BooleanConverter, Converter, Float32Converter, Float64Converter, Int16Converter,
BoolConverter, Converter, Float32Converter, Float64Converter, Int16Converter,
Int32Converter, Int64Converter, Int8Converter, UInt16Converter, UInt32Converter,
UInt64Converter, UInt8Converter,
UInt64Converter, UInt8Converter, Utf8Converter,
};
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{Repetition, Type as PhysicalType};
use crate::basic::{LogicalType, Repetition, Type as PhysicalType};
use crate::column::page::PageIterator;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FloatType, Int32Type, Int64Type,
Int96Type,
Expand Down Expand Up @@ -151,7 +152,7 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
// convert to arrays
let array = match (&self.data_type, T::get_physical_type()) {
(ArrowType::Boolean, PhysicalType::BOOLEAN) => unsafe {
BooleanConverter::convert(transmute::<
BoolConverter::convert(transmute::<
&mut RecordReader<T>,
&mut RecordReader<BoolType>,
>(&mut self.record_reader))
Expand Down Expand Up @@ -238,6 +239,182 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}
}

/// Primitive array readers are leaves of array reader tree. They accept page iterator
/// and read them into primitive arrays.
pub struct ComplexObjectArrayReader<T, C>
where
T: DataType,
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Vec<i16>>,
rep_levels_buffer: Option<Vec<i16>>,
column_desc: ColumnDescPtr,
column_reader: Option<ColumnReaderImpl<T>>,
_parquet_type_marker: PhantomData<T>,
_converter_marker: PhantomData<C>,
}

impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
where
T: DataType,
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
{
fn as_any(&self) -> &dyn Any {
self
}

fn get_data_type(&self) -> &ArrowType {
&self.data_type
}

fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
// Try to initialized column reader
if self.column_reader.is_none() {
let init_result = self.next_column_reader()?;
if !init_result {
return Err(general_err!("No page left!"));
}
}

assert!(self.column_reader.is_some());

let mut data_buffer: Vec<T::T> = Vec::with_capacity(batch_size);
data_buffer.resize_with(batch_size, T::T::default);

let mut def_levels_buffer = if self.column_desc.max_def_level() > 0 {
let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
buf.resize_with(batch_size, || 0);
Some(buf)
} else {
None
};

let mut rep_levels_buffer = if self.column_desc.max_rep_level() > 0 {
let mut buf: Vec<i16> = Vec::with_capacity(batch_size);
buf.resize_with(batch_size, || 0);
Some(buf)
} else {
None
};

let mut num_read = 0;

while num_read < batch_size {
let num_to_read = batch_size - num_read;
let cur_data_buf = &mut data_buffer[num_read..];
let cur_def_levels_buf =
def_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
let cur_rep_levels_buf =
rep_levels_buffer.as_mut().map(|b| &mut b[num_read..]);
let (data_read, levels_read) =
self.column_reader.as_mut().unwrap().read_batch(
num_to_read,
cur_def_levels_buf,
cur_rep_levels_buf,
cur_data_buf,
)?;

// Fill space
if levels_read > data_read {
def_levels_buffer.iter().for_each(|cur_def_levels_buf| {
let (mut level_pos, mut data_pos) = (levels_read, data_read);
while level_pos > 0 && data_pos > 0 {
if cur_def_levels_buf[level_pos - 1]
== self.column_desc.max_def_level()
{
cur_data_buf.swap(level_pos - 1, data_pos - 1);
level_pos -= 1;
data_pos -= 1;
} else {
level_pos -= 1;
}
}
});
}

let values_read = max(levels_read, data_read);
num_read += values_read;
// current page exhausted && page iterator exhausted
if values_read < num_to_read && !self.next_column_reader()? {
break;
}
}

data_buffer.truncate(num_read);
def_levels_buffer
.iter_mut()
.for_each(|buf| buf.truncate(num_read));
rep_levels_buffer
.iter_mut()
.for_each(|buf| buf.truncate(num_read));

self.def_levels_buffer = def_levels_buffer;
self.rep_levels_buffer = rep_levels_buffer;

let data: Vec<Option<T::T>> = if self.def_levels_buffer.is_some() {
data_buffer
.into_iter()
.zip(self.def_levels_buffer.as_ref().unwrap().iter())
.map(|(t, def_level)| {
if *def_level == self.column_desc.max_def_level() {
Some(t)
} else {
None
}
})
.collect()
} else {
data_buffer.into_iter().map(|t| Some(t)).collect()
};

C::convert(data)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|t| t.as_slice())
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer.as_ref().map(|t| t.as_slice())
}
}

impl<T, C> ComplexObjectArrayReader<T, C>
where
T: DataType,
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
{
fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> Result<Self> {
let data_type = parquet_to_arrow_field(column_desc.clone())?
.data_type()
.clone();

Ok(Self {
data_type,
pages,
def_levels_buffer: None,
rep_levels_buffer: None,
column_desc,
column_reader: None,
_parquet_type_marker: PhantomData,
_converter_marker: PhantomData,
})
}

fn next_column_reader(&mut self) -> Result<bool> {
Ok(match self.pages.next() {
Some(page) => {
self.column_reader =
Some(ColumnReaderImpl::<T>::new(self.column_desc.clone(), page?));
true
}
None => false,
})
}
}

/// Implementation of struct array reader.
pub struct StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
Expand Down Expand Up @@ -653,11 +830,14 @@ impl<'a> ArrayReaderBuilder {
PhysicalType::DOUBLE => Ok(Box::new(
PrimitiveArrayReader::<DoubleType>::new(page_iterator, column_desc)?,
)),
PhysicalType::BYTE_ARRAY => Ok(Box::new(PrimitiveArrayReader::<
ByteArrayType,
>::new(
page_iterator, column_desc
)?)),
PhysicalType::BYTE_ARRAY
if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 =>
{
Ok(Box::new(ComplexObjectArrayReader::<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiousity, would the ComplexObjectArrayReader allow us to implement arrow::BinaryArray and arrow::FixedSizeBinaryArray in future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we just need converter for those types.

ByteArrayType,
Utf8Converter,
>::new(page_iterator, column_desc)?))
}
other => Err(ArrowError(format!(
"Unable to create primite array reader for parquet physical type {}",
other
Expand Down
Loading