Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
nevi-me committed Jan 20, 2021
1 parent df553ca commit a59613b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 53 deletions.
13 changes: 7 additions & 6 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1389,12 +1389,13 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
let item_reader_type = item_reader.get_data_type().clone();

match item_reader_type {
ArrowType::FixedSizeList(_, _) | ArrowType::Dictionary(_, _) => {
Err(ArrowError(format!(
"reading List({:?}) into arrow not supported yet",
item_type
)))
}
ArrowType::List(_)
| ArrowType::FixedSizeList(_, _)
| ArrowType::Struct(_)
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
"reading List({:?}) into arrow not supported yet",
item_type
))),
_ => {
let arrow_type = self
.arrow_schema
Expand Down
3 changes: 1 addition & 2 deletions rust/parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
let slice = get_numeric_array_slice::<Int32Type, _>(&array, &indices);
typed.write_batch(
slice.as_slice(),
get_numeric_array_slice::<Int32Type, _>(&array, &indices).as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
)?
Expand Down
92 changes: 47 additions & 45 deletions rust/parquet/src/arrow/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,37 @@
//! Parquet definition and repetition levels
//!
//! Contains the algorithm for computing definition and repetition levels.
//! The algorithm works by tracking the slots of an array that should ultimately be populated when
//! writing to Parquet.
//! Parquet achieves nesting through definition levels and repetition levels [1].
//! Definition levels specify how many optional fields in the part for the column are defined.
//! Repetition levels specify at what repeated field (list) in the path a column is defined.
//! The algorithm works by tracking the slots of an array that should
//! ultimately be populated when writing to Parquet.
//! Parquet achieves nesting through definition levels and repetition levels \[1\].
//! Definition levels specify how many optional fields in the part for the column
//! are defined.
//! Repetition levels specify at what repeated field (list) in the path a column
//! is defined.
//!
//! In a nested data structure such as `a.b.c`, one can see levels as defining whether a record is
//! defined at `a`, `a.b`, or `a.b.c`. Optional fields are nullable fields, thus if all 3 fiedls
//! are nullable, the maximum definition will be = 3.
//! In a nested data structure such as `a.b.c`, one can see levels as defining
//! whether a record is defined at `a`, `a.b`, or `a.b.c`.
//! Optional fields are nullable fields, thus if all 3 fields
//! are nullable, the maximum definition could be = 3 if there are no lists.
//!
//! The algorithm in this module computes the necessary information to enable the writer to keep
//! track of which columns are at which levels, and to ultimately extract the correct values at
//! the correct slots from Arrow arrays.
//! The algorithm in this module computes the necessary information to enable
//! the writer to keep track of which columns are at which levels, and to extract
//! the correct values at the correct slots from Arrow arrays.
//!
//! It works by walking a record batch's arrays, keeping track of what values are non-null, their
//! positions and computing what their levels are.
//! We use an eager approach that increments definition levels where incrementable, and decrements
//! if a value being checked is null.
//! It works by walking a record batch's arrays, keeping track of what values
//! are non-null, their positions and computing what their levels are.
//!
//! [1] https://github.com/apache/parquet-format#nested-encoding
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)

use arrow::array::{make_array, ArrayRef, StructArray};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;

/// Keeps track of the level information per array that is needed to write an Arrow aray to Parquet.
/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
///
/// When a nested schema is traversed, intermediate [LevelInfo] structs are created to track
/// the state of parent arrays. When a primitive Arrow array is encountered, a final [LevelInfo]
/// is created, and this is what is used to index into the array when writing data to Parquet.
///
/// Note: for convenience, the final primitive array's level info can omit some values below if
/// none of that array's parents were repetitive (i.e `is_list` is false)
#[derive(Debug, Eq, PartialEq, Clone)]
pub(crate) struct LevelInfo {
/// Array's definition levels
Expand All @@ -59,11 +57,11 @@ pub(crate) struct LevelInfo {
pub repetition: Option<Vec<i16>>,
/// Array's offsets, 64-bit is used to accommodate large offset arrays
pub array_offsets: Vec<i64>,
// TODO: Convert to an Arrow Buffer after ARROW-10766 is merged.
/// Array's logical validity mask, whcih gets unpacked for list children.
/// If the parent of an array is null, all children are logically treated as
/// null. This mask keeps track of that.
///
/// TODO: Convert to an Arrow Buffer after ARROW-10766 is merged.
pub array_mask: Vec<bool>,
/// The maximum definition at this level, 0 at the record batch
pub max_definition: i16,
Expand All @@ -80,7 +78,7 @@ impl LevelInfo {
pub(crate) fn new_from_batch(batch: &RecordBatch) -> Self {
let num_rows = batch.num_rows();
Self {
// a batch is treated as all-defined
// a batch has no definition level yet
definition: vec![0; num_rows],
// a batch has no repetition as it is not a list
repetition: None,
Expand Down Expand Up @@ -111,7 +109,6 @@ impl LevelInfo {
repetition: self.repetition.clone(),
array_offsets: self.array_offsets.clone(),
array_mask,
// nulls will have all definitions being 0, so max value is reduced
max_definition: self.max_definition.max(1),
is_list: self.is_list,
is_nullable: true, // always nullable as all values are nulls
Expand Down Expand Up @@ -140,8 +137,6 @@ impl LevelInfo {
| DataType::Binary
| DataType::LargeBinary => {
// we return a vector of 1 value to represent the primitive
// it is safe to inherit the parent level's repetition, but we have to calculate
// the child's own definition levels
vec![self.calculate_child_levels(
array_offsets,
array_mask,
Expand All @@ -152,21 +147,21 @@ impl LevelInfo {
DataType::FixedSizeBinary(_) => unimplemented!(),
DataType::Decimal(_, _) => unimplemented!(),
DataType::List(list_field) | DataType::LargeList(list_field) => {
let array_data = array.data();
let child_data = array_data.child_data().get(0).unwrap();
// // get list offsets
let child_array = make_array(child_data.clone());
let (child_offsets, child_mask) =
Self::get_array_offsets_and_masks(&child_array);

// Calculate the list level
let list_level = self.calculate_child_levels(
array_offsets,
array_mask,
true,
field.is_nullable(),
);

// if datatype is a primitive, we can construct levels of the child array
// Construct the child array of the list, and get its offset + mask
let array_data = array.data();
let child_data = array_data.child_data().get(0).unwrap();
let child_array = make_array(child_data.clone());
let (child_offsets, child_mask) =
Self::get_array_offsets_and_masks(&child_array);

match child_array.data_type() {
// TODO: The behaviour of a <list<null>> is untested
DataType::Null => vec![list_level],
Expand Down Expand Up @@ -203,13 +198,10 @@ impl LevelInfo {
}
DataType::FixedSizeBinary(_) => unimplemented!(),
DataType::Decimal(_, _) => unimplemented!(),
DataType::List(_) | DataType::LargeList(_) => {
DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
list_level.calculate_array_levels(&child_array, list_field)
}
DataType::FixedSizeList(_, _) => unimplemented!(),
DataType::Struct(_) => {
list_level.calculate_array_levels(&child_array, list_field)
}
DataType::Union(_) => unimplemented!(),
}
}
Expand Down Expand Up @@ -259,29 +251,34 @@ impl LevelInfo {
/// - a value is optional or required (is_nullable)
/// - a list value is repeated + optional or required (is_list)
///
/// *Examples:*
///
/// A record batch always starts at a populated definition = level 0.
/// When a batch only has a primitive, i.e. `<batch<primitive[a]>>, column `a`
/// can only have a maximum level of 1 if it is not null.
/// If it is not null, we increment by 1, such that the null slots will = level 1.
/// The above applies to types that have no repetition (anything not a list or map).
///
/// If a batch has lists, then we increment by up to 2 levels:
/// - 1 level for the list
/// - 1 level if the list itself is nullable
/// - 1 level for the list (repeated)
/// - 1 level if the list itself is nullable (optional)
///
/// A list's child then gets incremented using the above rules.
///
/// A special case is when at the root of the schema. We always increment the
/// *Exceptions*
///
/// There are 2 exceptions from the above rules:
///
/// 1. When at the root of the schema: We always increment the
/// level regardless of whether the child is nullable or not. If we do not do
/// this, we could have a non-nullable array having a definition of 0.
///
/// 2. List parent, non-list child: We always increment the level in this case,
/// regardless of whether the child is nullable or not.
///
/// *Examples*
///
/// A batch with only a primitive that's non-nullable. `<primitive[required]>`:
/// * We don't increment the definition level as the array is not optional.
/// * This would leave us with a definition of 0, so the special case applies.
/// * This would leave us with a definition of 0, so the first exception applies.
/// * The definition level becomes 1.
///
/// A batch with only a primitive that's nullable. `<primitive[optional]>`:
Expand All @@ -291,7 +288,7 @@ impl LevelInfo {
/// * We calculate the level twice, for the list, and for the child.
/// * At the list, the level becomes 1, where 0 indicates that the list is
/// empty, and 1 says it's not (determined through offsets).
/// * At the primitive level
/// * At the primitive level, the second exception applies. The level becomes 2.
fn calculate_child_levels(
&self,
// we use 64-bit offsets to also accommodate large arrays
Expand All @@ -307,8 +304,12 @@ impl LevelInfo {
// determine the total level increment based on data types
let max_definition = match is_list {
false => {
// first exception, start of a batch, and not list
if self.max_definition == 0 {
1
} else if self.is_list {
// second exception, always increment after a list
self.max_definition + 1
} else {
self.max_definition + is_nullable as i16
}
Expand Down Expand Up @@ -568,7 +569,8 @@ impl LevelInfo {
}

/// Get the offsets of an array as 64-bit values, and validity masks as booleans
/// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained from validity bitmap
/// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained
/// from validity bitmap
/// - List array offsets will be the value offsets, masks are computed from offsets
fn get_array_offsets_and_masks(array: &ArrayRef) -> (Vec<i64>, Vec<bool>) {
match array.data_type() {
Expand Down

0 comments on commit a59613b

Please sign in to comment.