Skip to content

Commit

Permalink
Run End Encoding DataType
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 16, 2023
1 parent ddba53b commit 80c2d78
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 31 deletions.
1 change: 1 addition & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef {
new_null_sized_decimal(data_type, length, std::mem::size_of::<i128>())
}
DataType::Decimal256(_, _) => new_null_sized_decimal(data_type, length, 32),
DataType::RunEndEncodedType(_, _) => todo!(),
}
}

Expand Down
13 changes: 10 additions & 3 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff
],
_ => unreachable!(),
},
DataType::FixedSizeList(_, _) | DataType::Struct(_) => {
[empty_buffer, MutableBuffer::new(0)]
}
DataType::FixedSizeList(_, _)
| DataType::Struct(_)
| DataType::RunEndEncodedType(_, _) => [empty_buffer, MutableBuffer::new(0)],
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => [
MutableBuffer::new(capacity * mem::size_of::<u8>()),
empty_buffer,
Expand Down Expand Up @@ -654,6 +654,12 @@ impl ArrayData {
DataType::Dictionary(_, data_type) => {
vec![Self::new_empty(data_type)]
}
DataType::RunEndEncodedType(run_ends, values) => {
vec![
Self::new_empty(run_ends.data_type()),
Self::new_empty(values.data_type()),
]
}
};

// Data was constructed correctly above
Expand Down Expand Up @@ -1508,6 +1514,7 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
// same as ListType
DataTypeLayout::new_fixed_width(size_of::<i32>())
}
DataType::RunEndEncodedType(_, _) => DataTypeLayout::new_empty(), // all in child data,
}
}

Expand Down
1 change: 1 addition & 0 deletions arrow-data/src/equal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ fn equal_values(
},
DataType::Float16 => primitive_equal::<f16>(lhs, rhs, lhs_start, rhs_start, len),
DataType::Map(_, _) => list_equal::<i32>(lhs, rhs, lhs_start, rhs_start, len),
DataType::RunEndEncodedType(_, _) => todo!(),
}
}

Expand Down
16 changes: 16 additions & 0 deletions arrow-data/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ fn build_extend(array: &ArrayData) -> Extend {
UnionMode::Sparse => union::build_extend_sparse(array),
UnionMode::Dense => union::build_extend_dense(array),
},
DataType::RunEndEncodedType(_, _) => todo!(),
}
}

Expand Down Expand Up @@ -281,6 +282,7 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
UnionMode::Sparse => union::extend_nulls_sparse,
UnionMode::Dense => union::extend_nulls_dense,
},
DataType::RunEndEncodedType(_, _) => todo!(),
})
}

Expand Down Expand Up @@ -473,6 +475,20 @@ impl<'a> MutableArrayData<'a> {
})
.collect::<Vec<_>>(),
},
DataType::RunEndEncodedType(_, _) => {
let run_ends_child = arrays
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
let value_child = arrays
.iter()
.map(|array| &array.child_data()[1])
.collect::<Vec<_>>();
vec![
MutableArrayData::new(run_ends_child, use_nulls, array_capacity),
MutableArrayData::new(value_child, use_nulls, array_capacity),
]
}
DataType::FixedSizeList(_, _) => {
let childs = arrays
.iter()
Expand Down
1 change: 1 addition & 0 deletions arrow-integration-test/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ pub fn data_type_to_json(data_type: &DataType) -> serde_json::Value {
DataType::Map(_, keys_sorted) => {
json!({"name": "map", "keysSorted": keys_sorted})
}
DataType::RunEndEncodedType(_, _) => todo!(),
}
}

Expand Down
1 change: 1 addition & 0 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ pub(crate) fn get_fb_field_type<'a>(
children: Some(fbb.create_vector(&children[..])),
}
}
RunEndEncodedType(_, _) => todo!(),
}
}

Expand Down
17 changes: 17 additions & 0 deletions arrow-schema/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,19 @@ pub enum DataType {
/// child fields may be respectively "entries", "key", and "value", but this is
/// not enforced.
Map(Box<Field>, bool),

/// A run-end encoding (REE) array is a variation of run-length encoding (RLE). These
/// encodings are well-suited for representing data containing sequences of the
/// same value, called runs. Each run is represented as a value and an integer giving
/// the index in the array where the run ends.
///
/// A run-end encoded array has no buffers by itself, but has two child arrays. The
/// first child array, called the run ends array, holds either 16, 32, or 64-bit
/// signed integers. The actual values of each run are held in the second child array.
///
/// These child arrays are prescribed the standard names of "run_ends" and "values"
/// respectively.
RunEndEncodedType(Box<Field>, Box<Field>),
}

/// An absolute length of time in seconds, milliseconds, microseconds or nanoseconds.
Expand Down Expand Up @@ -438,6 +451,10 @@ impl DataType {
+ (std::mem::size_of::<Field>() * fields.capacity())
}
DataType::Dictionary(dt1, dt2) => dt1.size() + dt2.size(),
DataType::RunEndEncodedType(run_ends, values) => {
run_ends.size() - std::mem::size_of_val(run_ends) + values.size()
- std::mem::size_of_val(values)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions arrow-schema/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl Field {
| DataType::List(_)
| DataType::Map(_, _)
| DataType::Dictionary(_, _)
| DataType::RunEndEncodedType(_, _)
| DataType::FixedSizeList(_, _)
| DataType::FixedSizeBinary(_)
| DataType::Utf8
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ fn write_leaves<W: Write>(
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
"Float16 arrays not supported".to_string(),
)),
ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) => {
ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _, _) | ArrowDataType::RunEndEncodedType(_, _) => {
Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow type {:?} to parquet that is not yet implemented",
Expand Down
37 changes: 10 additions & 27 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,9 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
let dict_field = Field::new(name, *value.clone(), field.is_nullable());
arrow_to_parquet_type(&dict_field)
}
DataType::RunEndEncodedType(_, _) => Err(arrow_err!(
"Converting RunEndEncodedType to parquet not supported",
)),
}
}

Expand Down Expand Up @@ -640,7 +643,7 @@ mod tests {
ProjectionMask::all(),
None,
)
.unwrap();
.unwrap();
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
}

Expand Down Expand Up @@ -1342,20 +1345,9 @@ mod tests {
))),
false,
),
Field::new(
"decimal_int32",
DataType::Decimal128(8, 2),
false,
),
Field::new(
"decimal_int64",
DataType::Decimal128(16, 2),
false,
),
Field::new(
"decimal_fix_length",
DataType::Decimal128(30, 2),
false, ),
Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
];

assert_eq!(arrow_fields, converted_arrow_fields);
Expand Down Expand Up @@ -1491,18 +1483,9 @@ mod tests {
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
Field::new(
"decimal_int32",
DataType::Decimal128(8, 2),
false),
Field::new("decimal_int64",
DataType::Decimal128(16, 2),
false),
Field::new(
"decimal_fix_length",
DataType::Decimal128(30, 2),
false,
),
Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
];
let arrow_schema = Schema::new(arrow_fields);
let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema).unwrap();
Expand Down

0 comments on commit 80c2d78

Please sign in to comment.