Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update parquet::basic::LogicalType to be more idomatic #1612

Merged
merged 14 commits into from
Apr 24, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 49 additions & 49 deletions parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use crate::file::{metadata::KeyValue, properties::WriterProperties};
use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr};
use crate::{
basic::{
ConvertedType, DecimalType, IntType, LogicalType, Repetition, TimeType,
TimeUnit as ParquetTimeUnit, TimestampType, Type as PhysicalType,
ConvertedType, LogicalType, Repetition,
TimeUnit as ParquetTimeUnit, Type as PhysicalType,
},
errors::ParquetError,
};
Expand Down Expand Up @@ -324,24 +324,24 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
// create type from field
match field.data_type() {
DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::UNKNOWN(Default::default())))
.with_logical_type(Some(LogicalType::Unknown))
.with_repetition(repetition)
.build(),
DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
.with_repetition(repetition)
.build(),
DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
.with_logical_type(Some(LogicalType::Integer {
bit_width: 8,
is_signed: true,
})))
}))
.with_repetition(repetition)
.build(),
DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
.with_logical_type(Some(LogicalType::Integer {
bit_width: 16,
is_signed: true,
})))
}))
.with_repetition(repetition)
.build(),
DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
Expand All @@ -351,31 +351,31 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_repetition(repetition)
.build(),
DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
.with_logical_type(Some(LogicalType::Integer {
bit_width: 8,
is_signed: false,
})))
}))
.with_repetition(repetition)
.build(),
DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
.with_logical_type(Some(LogicalType::Integer {
bit_width: 16,
is_signed: false,
})))
}))
.with_repetition(repetition)
.build(),
DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
.with_logical_type(Some(LogicalType::Integer {
bit_width: 32,
is_signed: false,
})))
}))
.with_repetition(repetition)
.build(),
DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::INTEGER(IntType {
.with_logical_type(Some(LogicalType::Integer {
bit_width: 64,
is_signed: false,
})))
}))
.with_repetition(repetition)
.build(),
DataType::Float16 => Err(ArrowError("Float16 arrays not supported".to_string())),
Expand All @@ -389,42 +389,42 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
name,
PhysicalType::INT64,
)
.with_logical_type(Some(LogicalType::TIMESTAMP(TimestampType {
.with_logical_type(Some(LogicalType::Timestamp {
is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()),
unit: match time_unit {
TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()),
TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
},
})))
}))
.with_repetition(repetition)
.build(),
DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::DATE(Default::default())))
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.build(),
// date64 is cast to date32
DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::DATE(Default::default())))
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.build(),
DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::TIME(TimeType {
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: ParquetTimeUnit::MILLIS(Default::default()),
})))
}))
.with_repetition(repetition)
.build(),
DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
.with_logical_type(Some(LogicalType::TIME(TimeType {
.with_logical_type(Some(LogicalType::Time {
is_adjusted_to_u_t_c: false,
unit: match unit {
TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
u => unreachable!("Invalid unit for Time64: {:?}", u),
},
})))
}))
.with_repetition(repetition)
.build(),
DataType::Duration(_) => Err(ArrowError(
Expand Down Expand Up @@ -465,17 +465,17 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
.with_length(decimal_length_from_precision(*precision) as i32)
.with_logical_type(Some(LogicalType::DECIMAL(DecimalType {
.with_logical_type(Some(LogicalType::Decimal {
scale: *scale as i32,
precision: *precision as i32,
})))
}))
.with_precision(*precision as i32)
.with_scale(*scale as i32)
.build()
}
DataType::Utf8 | DataType::LargeUtf8 => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type(Some(LogicalType::STRING(Default::default())))
.with_logical_type(Some(LogicalType::String))
.with_repetition(repetition)
.build()
}
Expand All @@ -487,7 +487,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_repetition(Repetition::REPEATED)
.build()?,
)])
.with_logical_type(Some(LogicalType::LIST(Default::default())))
.with_logical_type(Some(LogicalType::List))
.with_repetition(repetition)
.build()
}
Expand Down Expand Up @@ -527,7 +527,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_repetition(Repetition::REPEATED)
.build()?,
)])
.with_logical_type(Some(LogicalType::MAP(Default::default())))
.with_logical_type(Some(LogicalType::Map))
.with_repetition(repetition)
.build()
} else {
Expand Down Expand Up @@ -669,7 +669,7 @@ impl ParquetTypeConverter<'_> {
self.schema.get_basic_info().converted_type(),
) {
(None, ConvertedType::NONE) => Ok(DataType::Int32),
(Some(LogicalType::INTEGER(t)), _) => match (t.bit_width, t.is_signed) {
(Some(LogicalType::Integer { bit_width, is_signed }), _) => match (bit_width, is_signed) {
(8, true) => Ok(DataType::Int8),
(16, true) => Ok(DataType::Int16),
(32, true) => Ok(DataType::Int32),
Expand All @@ -678,20 +678,20 @@ impl ParquetTypeConverter<'_> {
(32, false) => Ok(DataType::UInt32),
_ => Err(ArrowError(format!(
"Cannot create INT32 physical type from {:?}",
t
self.schema.get_basic_info().logical_type(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use t @ LogicalType::Integer { ... } in the match expression, so that you can use t here

Copy link
Contributor Author

@tfeda tfeda Apr 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've spent some time looking into this. This syntax causes t to be evaluated as a LogicalType rather than a LogicalType::Integer. It then can't use it for match (t.bit_width, t.is_signed) . I also tried t @ LogicalType::Integer { bit_width, is_signed }, but then I run into borrow issues. Is there some order-of-op trick I'm missing?

Copy link
Contributor

@tustvold tustvold Apr 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this help - https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ec783c91c37289091cce85107031b3df

Btw you can drop the "ref" if you make LogicalType implement Copy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhh I missed the ref, thanks. Adding a ref seemed better than deriving Copy for LogicalType, TimeUnit, MilliSeconds, etc.

))),
},
(Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()),
(Some(LogicalType::DATE(_)), _) => Ok(DataType::Date32),
(Some(LogicalType::TIME(t)), _) => match t.unit {
(Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
(Some(LogicalType::Date), _) => Ok(DataType::Date32),
(Some(LogicalType::Time { unit, .. }), _) => match unit {
ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
_ => Err(ArrowError(format!(
"Cannot create INT32 physical type from {:?}",
t.unit
unit
))),
},
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
(Some(LogicalType::UNKNOWN(_)), _) => Ok(DataType::Null),
(Some(LogicalType::Unknown), _) => Ok(DataType::Null),
(None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
(None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
(None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
Expand All @@ -717,26 +717,26 @@ impl ParquetTypeConverter<'_> {
self.schema.get_basic_info().converted_type(),
) {
(None, ConvertedType::NONE) => Ok(DataType::Int64),
(Some(LogicalType::INTEGER(t)), _) if t.bit_width == 64 => {
match t.is_signed {
(Some(LogicalType::Integer { bit_width, is_signed }), _) if bit_width == 64 => {
match is_signed {
true => Ok(DataType::Int64),
false => Ok(DataType::UInt64),
}
}
(Some(LogicalType::TIME(t)), _) => match t.unit {
(Some(LogicalType::Time { unit, .. }), _) => match unit {
ParquetTimeUnit::MILLIS(_) => Err(ArrowError(
"Cannot create INT64 from MILLIS time unit".to_string(),
)),
ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
},
(Some(LogicalType::TIMESTAMP(t)), _) => Ok(DataType::Timestamp(
match t.unit {
(Some(LogicalType::Timestamp { is_adjusted_to_u_t_c, unit }), _) => Ok(DataType::Timestamp(
match unit {
ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
},
if t.is_adjusted_to_u_t_c {
if is_adjusted_to_u_t_c {
Some("UTC".to_string())
} else {
None
Expand All @@ -753,7 +753,7 @@ impl ParquetTypeConverter<'_> {
(None, ConvertedType::TIMESTAMP_MICROS) => {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}
(Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()),
(Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
(None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
(logical, converted) => Err(ArrowError(format!(
"Unable to convert parquet INT64 logical type {:?} or converted type {}",
Expand All @@ -768,7 +768,7 @@ impl ParquetTypeConverter<'_> {
self.schema.get_basic_info().logical_type(),
self.schema.get_basic_info().converted_type(),
) {
(Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()),
(Some(LogicalType::Decimal {..}), _) => Ok(self.to_decimal()),
(None, ConvertedType::DECIMAL) => Ok(self.to_decimal()),
(None, ConvertedType::INTERVAL) => {
// There is currently no reliable way of determining which IntervalUnit
Expand Down Expand Up @@ -804,10 +804,10 @@ impl ParquetTypeConverter<'_> {
#[allow(clippy::wrong_self_convention)]
fn from_byte_array(&self) -> Result<DataType> {
match (self.schema.get_basic_info().logical_type(), self.schema.get_basic_info().converted_type()) {
(Some(LogicalType::STRING(_)), _) => Ok(DataType::Utf8),
(Some(LogicalType::JSON(_)), _) => Ok(DataType::Binary),
(Some(LogicalType::BSON(_)), _) => Ok(DataType::Binary),
(Some(LogicalType::ENUM(_)), _) => Ok(DataType::Binary),
(Some(LogicalType::String), _) => Ok(DataType::Utf8),
(Some(LogicalType::Json), _) => Ok(DataType::Binary),
(Some(LogicalType::Bson), _) => Ok(DataType::Binary),
(Some(LogicalType::Enum), _) => Ok(DataType::Binary),
(None, ConvertedType::NONE) => Ok(DataType::Binary),
(None, ConvertedType::JSON) => Ok(DataType::Binary),
(None, ConvertedType::BSON) => Ok(DataType::Binary),
Expand All @@ -830,8 +830,8 @@ impl ParquetTypeConverter<'_> {
self.schema.get_basic_info().logical_type(),
self.schema.get_basic_info().converted_type(),
) {
(Some(LogicalType::LIST(_)), _) | (_, ConvertedType::LIST) => self.to_list(),
(Some(LogicalType::MAP(_)), _)
(Some(LogicalType::List), _) | (_, ConvertedType::LIST) => self.to_list(),
(Some(LogicalType::Map), _)
| (_, ConvertedType::MAP)
| (_, ConvertedType::MAP_KEY_VALUE) => self.to_map(),
(_, _) => {
Expand Down