From f42337edc33e401106f4dab7a4deea74d4363a46 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 1 Mar 2021 09:28:50 +0200 Subject: [PATCH 1/6] use logical types to map to Arrow types (cherry picked from commit 780e9662b0221e66becfbb669040f50e25b8778e) --- rust/parquet/src/arrow/schema.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index fe9f60666bc9d..579a5f435e083 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -792,6 +792,7 @@ impl ParquetTypeConverter<'_> { /// This function takes care of logical type and repetition. fn to_group_type(&self) -> Result> { if self.is_repeated() { + dbg!(self.schema.get_basic_info()); self.to_struct().map(|opt| { opt.map(|dt| { DataType::List(Box::new(Field::new( From 5b8578f343cf8bdd5340b35c7e330d6ddc9478e0 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 15 Mar 2021 03:19:38 +0200 Subject: [PATCH 2/6] address review comments --- rust/parquet/src/arrow/schema.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 579a5f435e083..fe9f60666bc9d 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -792,7 +792,6 @@ impl ParquetTypeConverter<'_> { /// This function takes care of logical type and repetition. fn to_group_type(&self) -> Result> { if self.is_repeated() { - dbg!(self.schema.get_basic_info()); self.to_struct().map(|opt| { opt.map(|dt| { DataType::List(Box::new(Field::new( From b4eb0de2ea4bc31b9eeb13873b9d7b0d4eef0ee6 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 14 Mar 2021 23:11:10 +0200 Subject: [PATCH 3/6] schema printer for logical types --- rust/parquet/src/schema/printer.rs | 365 ++++++++++++++++++++++++++--- 1 file changed, 334 insertions(+), 31 deletions(-) diff --git a/rust/parquet/src/schema/printer.rs b/rust/parquet/src/schema/printer.rs index 81ada8f6f9927..4079534986529 100644 --- a/rust/parquet/src/schema/printer.rs +++ b/rust/parquet/src/schema/printer.rs @@ -45,7 +45,7 @@ use std::{fmt, io}; -use crate::basic::{ConvertedType, Type as PhysicalType}; +use crate::basic::{ConvertedType, LogicalType, TimeUnit, Type as PhysicalType}; use crate::file::metadata::{ ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData, }; @@ -195,6 +195,15 @@ impl<'a> Printer<'a> { } } +#[inline] +fn print_timeunit(unit: &TimeUnit) -> &str { + match unit { + TimeUnit::MILLIS(_) => "MILLIS", + TimeUnit::MICROS(_) => "MICROS", + TimeUnit::NANOS(_) => "NANOS", + } +} + #[allow(unused_must_use)] impl<'a> Printer<'a> { pub fn print(&mut self, tp: &Type) { @@ -215,20 +224,61 @@ impl<'a> Printer<'a> { _ => format!("{}", physical_type), }; // Also print logical type if it is available - let converted_type_str = match basic_info.converted_type() { - ConvertedType::NONE => format!(""), - decimal @ ConvertedType::DECIMAL => { - // For decimal type we should print precision and scale if they - // are > 0, e.g. DECIMAL(9, 2) - - // DECIMAL(9) - DECIMAL - let precision_scale = match (precision, scale) { - (p, s) if p > 0 && s > 0 => format!(" ({}, {})", p, s), - (p, 0) if p > 0 => format!(" ({})", p), - _ => format!(""), - }; - format!(" ({}{})", decimal, precision_scale) + // If there is a logical type, do not print converted type + let logical_type_str = match basic_info.logical_type() { + Some(logical_type) => match logical_type { + LogicalType::INTEGER(t) => { + format!(" (INTEGER({},{}))", t.bit_width, t.is_signed) + } + LogicalType::DECIMAL(t) => { + format!(" (DECIMAL({},{}))", t.precision, t.scale) + } + LogicalType::TIMESTAMP(t) => { + format!( + " (TIMESTAMP({},{}))", + print_timeunit(&t.unit), + t.is_adjusted_to_u_t_c + ) + } + LogicalType::TIME(t) => { + format!( + " (TIME({},{}))", + print_timeunit(&t.unit), + t.is_adjusted_to_u_t_c + ) + } + LogicalType::DATE(_) => " (DATE)".to_string(), + LogicalType::BSON(_) => " (BSON)".to_string(), + LogicalType::JSON(_) => " (JSON)".to_string(), + LogicalType::STRING(_) => " (STRING)".to_string(), + LogicalType::UUID(_) => " (UUID)".to_string(), + LogicalType::ENUM(_) => " (ENUM)".to_string(), + LogicalType::LIST(_) => " (LIST)".to_string(), + LogicalType::MAP(_) => " (MAP)".to_string(), + LogicalType::UNKNOWN(_) => " (UNKNOWN)".to_string(), + }, + None => { + // Also print converted type if it is available + match basic_info.converted_type() { + ConvertedType::NONE => format!(""), + decimal @ ConvertedType::DECIMAL => { + // For decimal type we should print precision and scale if they + // are > 0, e.g. DECIMAL(9, 2) - + // DECIMAL(9) - DECIMAL + let precision_scale = match (precision, scale) { + (p, s) if p > 0 && s > 0 => { + format!(" ({}, {})", p, s) + } + (p, 0) if p > 0 => format!(" ({})", p), + _ => format!(""), + }; + format!(" ({}{})", decimal, precision_scale) + } + other_converted_type => { + format!(" ({})", other_converted_type) + } + } } - other_converted_type => format!(" ({})", other_converted_type), }; write!( self.output, @@ -236,7 +286,7 @@ impl<'a> Printer<'a> { basic_info.repetition(), phys_type_str, basic_info.name(), - converted_type_str + logical_type_str ); } Type::GroupType { @@ -246,7 +296,9 @@ impl<'a> Printer<'a> { if basic_info.has_repetition() { let r = basic_info.repetition(); write!(self.output, "{} group {} ", r, basic_info.name()); - if basic_info.converted_type() != ConvertedType::NONE { + if let Some(logical_type) = basic_info.logical_type() { + write!(self.output, "({:?}) ", logical_type); + } else if basic_info.converted_type() != ConvertedType::NONE { write!(self.output, "({}) ", basic_info.converted_type()); } writeln!(self.output, "{{"); @@ -273,7 +325,11 @@ mod tests { use std::sync::Arc; - use crate::basic::{Repetition, Type as PhysicalType}; + use crate::basic::{ + DateType, DecimalType, IntType, LogicalType, Repetition, TimeType, TimestampType, + Type as PhysicalType, + }; + use crate::errors::Result; use crate::schema::{parser::parse_message_type, types::Type}; fn assert_print_parse_message(message: Type) { @@ -301,18 +357,259 @@ mod tests { assert_eq!(&mut s, "REQUIRED INT32 field (INT_32);"); } + #[inline] + fn build_primitive_type( + name: &str, + physical_type: PhysicalType, + logical_type: Option, + converted_type: ConvertedType, + repetition: Repetition, + ) -> Result { + Type::primitive_type_builder(name, physical_type) + .with_repetition(repetition) + .with_logical_type(logical_type) + .with_converted_type(converted_type) + .build() + } + #[test] - fn test_print_primitive_type_without_logical() { - let mut s = String::new(); - { - let mut p = Printer::new(&mut s); - let field = Type::primitive_type_builder("field", PhysicalType::DOUBLE) - .with_repetition(Repetition::REQUIRED) + fn test_print_logical_types() { + let types_and_strings = vec![ + ( + build_primitive_type( + "field", + PhysicalType::INT32, + Some(LogicalType::INTEGER(IntType { + bit_width: 32, + is_signed: true, + })), + ConvertedType::NONE, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED INT32 field (INTEGER(32,true));", + ), + ( + build_primitive_type( + "field", + PhysicalType::INT32, + Some(LogicalType::INTEGER(IntType { + bit_width: 8, + is_signed: false, + })), + ConvertedType::NONE, + Repetition::OPTIONAL, + ) + .unwrap(), + "OPTIONAL INT32 field (INTEGER(8,false));", + ), + ( + build_primitive_type( + "field", + PhysicalType::INT32, + Some(LogicalType::INTEGER(IntType { + bit_width: 16, + is_signed: true, + })), + ConvertedType::INT_16, + Repetition::REPEATED, + ) + .unwrap(), + "REPEATED INT32 field (INTEGER(16,true));", + ), + ( + build_primitive_type( + "field", + PhysicalType::INT64, + None, + ConvertedType::NONE, + Repetition::REPEATED, + ) + .unwrap(), + "REPEATED INT64 field;", + ), + ( + build_primitive_type( + "field", + PhysicalType::FLOAT, + None, + ConvertedType::NONE, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED FLOAT field;", + ), + ( + build_primitive_type( + "booleans", + PhysicalType::BOOLEAN, + None, + ConvertedType::NONE, + Repetition::OPTIONAL, + ) + .unwrap(), + "OPTIONAL BOOLEAN booleans;", + ), + ( + build_primitive_type( + "field", + PhysicalType::INT64, + Some(LogicalType::TIMESTAMP(TimestampType { + is_adjusted_to_u_t_c: true, + unit: TimeUnit::MILLIS(Default::default()), + })), + ConvertedType::NONE, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED INT64 field (TIMESTAMP(MILLIS,true));", + ), + ( + build_primitive_type( + "field", + PhysicalType::INT32, + Some(LogicalType::DATE(DateType {})), + ConvertedType::NONE, + Repetition::OPTIONAL, + ) + .unwrap(), + "OPTIONAL INT32 field (DATE);", + ), + ( + build_primitive_type( + "field", + PhysicalType::INT32, + Some(LogicalType::TIME(TimeType { + unit: TimeUnit::MILLIS(Default::default()), + is_adjusted_to_u_t_c: false, + })), + ConvertedType::TIME_MILLIS, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED INT32 field (TIME(MILLIS,false));", + ), + ( + build_primitive_type( + "field", + PhysicalType::BYTE_ARRAY, + None, + ConvertedType::NONE, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED BYTE_ARRAY field;", + ), + ( + build_primitive_type( + "field", + PhysicalType::BYTE_ARRAY, + None, + ConvertedType::UTF8, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED BYTE_ARRAY field (UTF8);", + ), + ( + build_primitive_type( + "field", + PhysicalType::BYTE_ARRAY, + Some(LogicalType::JSON(Default::default())), + ConvertedType::JSON, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED BYTE_ARRAY field (JSON);", + ), + ( + build_primitive_type( + "field", + PhysicalType::BYTE_ARRAY, + Some(LogicalType::BSON(Default::default())), + ConvertedType::BSON, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED BYTE_ARRAY field (BSON);", + ), + ( + build_primitive_type( + "field", + PhysicalType::BYTE_ARRAY, + Some(LogicalType::STRING(Default::default())), + ConvertedType::NONE, + Repetition::REQUIRED, + ) + .unwrap(), + "REQUIRED BYTE_ARRAY field (STRING);", + ), + ]; + + types_and_strings.into_iter().for_each(|(field, expected)| { + let mut s = String::new(); + { + let mut p = Printer::new(&mut s); + p.print(&field); + } + assert_eq!(&s, expected) + }); + } + + #[inline] + fn decimal_length_from_precision(precision: usize) -> i32 { + (10.0_f64.powi(precision as i32).log2() / 8.0).ceil() as i32 + } + + #[test] + fn test_print_flba_logical_types() { + let types_and_strings = vec![ + ( + Type::primitive_type_builder("field", PhysicalType::FIXED_LEN_BYTE_ARRAY) + .with_logical_type(None) + .with_converted_type(ConvertedType::INTERVAL) + .with_length(12) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + "REQUIRED FIXED_LEN_BYTE_ARRAY (12) field (INTERVAL);", + ), + ( + Type::primitive_type_builder("field", PhysicalType::FIXED_LEN_BYTE_ARRAY) + .with_logical_type(Some(LogicalType::UUID(Default::default()))) + .with_length(16) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + "REQUIRED FIXED_LEN_BYTE_ARRAY (16) field (UUID);", + ), + ( + Type::primitive_type_builder( + "decimal", + PhysicalType::FIXED_LEN_BYTE_ARRAY, + ) + .with_logical_type(Some(LogicalType::DECIMAL(DecimalType { + precision: 32, + scale: 20, + }))) + .with_precision(32) + .with_scale(20) + .with_length(decimal_length_from_precision(32)) + .with_repetition(Repetition::REPEATED) .build() - .unwrap(); - p.print(&field); - } - assert_eq!(&mut s, "REQUIRED DOUBLE field;"); + .unwrap(), + "REPEATED FIXED_LEN_BYTE_ARRAY (14) decimal (DECIMAL(32,20));", + ), + ]; + + types_and_strings.into_iter().for_each(|(field, expected)| { + let mut s = String::new(); + { + let mut p = Printer::new(&mut s); + p.print(&field); + } + assert_eq!(&s, expected) + }); } #[test] @@ -329,8 +626,12 @@ mod tests { .with_converted_type(ConvertedType::UTF8) .with_id(1) .build(); - let f3 = - Type::primitive_type_builder("f3", PhysicalType::FIXED_LEN_BYTE_ARRAY) + let f3 = Type::primitive_type_builder("f3", PhysicalType::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::STRING(Default::default()))) + .with_id(1) + .build(); + let f4 = + Type::primitive_type_builder("f4", PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_repetition(Repetition::REPEATED) .with_converted_type(ConvertedType::INTERVAL) .with_length(12) @@ -339,6 +640,7 @@ mod tests { let mut struct_fields = Vec::new(); struct_fields.push(Arc::new(f1.unwrap())); struct_fields.push(Arc::new(f2.unwrap())); + struct_fields.push(Arc::new(f3.unwrap())); let field = Type::group_type_builder("field") .with_repetition(Repetition::OPTIONAL) .with_fields(&mut struct_fields) @@ -347,7 +649,7 @@ mod tests { .unwrap(); let mut fields = Vec::new(); fields.push(Arc::new(field)); - fields.push(Arc::new(f3.unwrap())); + fields.push(Arc::new(f4.unwrap())); let message = Type::group_type_builder("schema") .with_fields(&mut fields) .with_id(2) @@ -359,8 +661,9 @@ mod tests { OPTIONAL group field { REQUIRED INT32 f1 (INT_32); OPTIONAL BYTE_ARRAY f2 (UTF8); + OPTIONAL BYTE_ARRAY f3 (STRING); } - REPEATED FIXED_LEN_BYTE_ARRAY (12) f3 (INTERVAL); + REPEATED FIXED_LEN_BYTE_ARRAY (12) f4 (INTERVAL); }"; assert_eq!(&mut s, expected); } From 117b26a1976ad92431217df75c4eb7e88c90b81b Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 15 Mar 2021 02:58:49 +0200 Subject: [PATCH 4/6] implement logical type parser --- rust/parquet/src/basic.rs | 73 +---- rust/parquet/src/schema/parser.rs | 410 ++++++++++++++++++++++++++++- rust/parquet/src/schema/printer.rs | 172 +++++++----- 3 files changed, 512 insertions(+), 143 deletions(-) diff --git a/rust/parquet/src/basic.rs b/rust/parquet/src/basic.rs index 9e37516b8fbb4..6b69bfb3bfec5 100644 --- a/rust/parquet/src/basic.rs +++ b/rust/parquet/src/basic.rs @@ -847,84 +847,27 @@ impl str::FromStr for LogicalType { fn from_str(s: &str) -> result::Result { match s { - "INTEGER(8,true)" => Ok(LogicalType::INTEGER(IntType { + // The type is a placeholder that gets updated elsewhere + "INTEGER" => Ok(LogicalType::INTEGER(IntType { bit_width: 8, - is_signed: true, - })), - "INTEGER(16,true)" => Ok(LogicalType::INTEGER(IntType { - bit_width: 16, - is_signed: true, - })), - "INTEGER(32,true)" => Ok(LogicalType::INTEGER(IntType { - bit_width: 32, - is_signed: true, - })), - "INTEGER(64,true)" => Ok(LogicalType::INTEGER(IntType { - bit_width: 64, - is_signed: true, - })), - "INTEGER(8,false)" => Ok(LogicalType::INTEGER(IntType { - bit_width: 8, - is_signed: false, - })), - "INTEGER(16,false)" => Ok(LogicalType::INTEGER(IntType { - bit_width: 16, - is_signed: false, - })), - "INTEGER(32,false)" => Ok(LogicalType::INTEGER(IntType { - bit_width: 32, - is_signed: false, - })), - "INTEGER(64,false)" => Ok(LogicalType::INTEGER(IntType { - bit_width: 64, is_signed: false, })), "MAP" => Ok(LogicalType::MAP(MapType {})), "LIST" => Ok(LogicalType::LIST(ListType {})), "ENUM" => Ok(LogicalType::ENUM(EnumType {})), - // TODO: ARROW-11365 - // "DECIMAL" => Ok(LogicalType::DECIMAL), - "DATE" => Ok(LogicalType::DATE(DateType {})), - "TIME(MILLIS,true)" => Ok(LogicalType::TIME(TimeType { - is_adjusted_to_u_t_c: true, - unit: TimeUnit::MILLIS(parquet::MilliSeconds {}), - })), - "TIME(MILLIS,false)" => Ok(LogicalType::TIME(TimeType { - is_adjusted_to_u_t_c: false, - unit: TimeUnit::MILLIS(parquet::MilliSeconds {}), + "DECIMAL" => Ok(LogicalType::DECIMAL(DecimalType { + precision: -1, + scale: -1, })), - "TIME(MICROS,true)" => Ok(LogicalType::TIME(TimeType { - is_adjusted_to_u_t_c: true, - unit: TimeUnit::MICROS(parquet::MicroSeconds {}), - })), - "TIME(MICROS,false)" => Ok(LogicalType::TIME(TimeType { + "DATE" => Ok(LogicalType::DATE(DateType {})), + "TIME" => Ok(LogicalType::TIME(TimeType { is_adjusted_to_u_t_c: false, - unit: TimeUnit::MICROS(parquet::MicroSeconds {}), - })), - "TIMESTAMP(MILLIS,true)" => Ok(LogicalType::TIMESTAMP(TimestampType { - is_adjusted_to_u_t_c: true, unit: TimeUnit::MILLIS(parquet::MilliSeconds {}), })), - "TIMESTAMP(MILLIS,false)" => Ok(LogicalType::TIMESTAMP(TimestampType { + "TIMESTAMP" => Ok(LogicalType::TIMESTAMP(TimestampType { is_adjusted_to_u_t_c: false, unit: TimeUnit::MILLIS(parquet::MilliSeconds {}), })), - "TIMESTAMP(MICROS,true)" => Ok(LogicalType::TIMESTAMP(TimestampType { - is_adjusted_to_u_t_c: true, - unit: TimeUnit::MICROS(parquet::MicroSeconds {}), - })), - "TIMESTAMP(MICROS,false)" => Ok(LogicalType::TIMESTAMP(TimestampType { - is_adjusted_to_u_t_c: false, - unit: TimeUnit::MICROS(parquet::MicroSeconds {}), - })), - "TIMESTAMP(NANOS,true)" => Ok(LogicalType::TIMESTAMP(TimestampType { - is_adjusted_to_u_t_c: true, - unit: TimeUnit::MICROS(parquet::MicroSeconds {}), - })), - "TIMESTAMP(NANOS,false)" => Ok(LogicalType::TIMESTAMP(TimestampType { - is_adjusted_to_u_t_c: false, - unit: TimeUnit::MICROS(parquet::MicroSeconds {}), - })), "STRING" => Ok(LogicalType::STRING(StringType {})), "JSON" => Ok(LogicalType::JSON(JsonType {})), "BSON" => Ok(LogicalType::BSON(BsonType {})), diff --git a/rust/parquet/src/schema/parser.rs b/rust/parquet/src/schema/parser.rs index 1f7db5b410d6f..af1dc52be1a41 100644 --- a/rust/parquet/src/schema/parser.rs +++ b/rust/parquet/src/schema/parser.rs @@ -44,7 +44,10 @@ use std::sync::Arc; -use crate::basic::{ConvertedType, Repetition, Type as PhysicalType}; +use crate::basic::{ + ConvertedType, DecimalType, IntType, LogicalType, Repetition, TimeType, TimeUnit, + TimestampType, Type as PhysicalType, +}; use crate::errors::{ParquetError, Result}; use crate::schema::types::{Type, TypePtr}; @@ -150,6 +153,7 @@ fn assert_token(token: Option<&str>, expected: &str) -> Result<()> { } // Utility function to parse i32 or return general error. +#[inline] fn parse_i32( value: Option<&str>, not_found_msg: &str, @@ -160,6 +164,38 @@ fn parse_i32( .and_then(|v| v.parse::().map_err(|_| general_err!(parse_fail_msg))) } +// Utility function to parse boolean or return general error. +#[inline] +fn parse_bool( + value: Option<&str>, + not_found_msg: &str, + parse_fail_msg: &str, +) -> Result { + value + .ok_or_else(|| general_err!(not_found_msg)) + .and_then(|v| { + v.to_lowercase() + .parse::() + .map_err(|_| general_err!(parse_fail_msg)) + }) +} + +// Utility function to parse TimeUnit or return general error. +fn parse_timeunit( + value: Option<&str>, + not_found_msg: &str, + parse_fail_msg: &str, +) -> Result { + value + .ok_or_else(|| general_err!(not_found_msg)) + .and_then(|v| match v.to_uppercase().as_str() { + "MILLIS" => Ok(TimeUnit::MILLIS(Default::default())), + "MICROS" => Ok(TimeUnit::MICROS(Default::default())), + "NANOS" => Ok(TimeUnit::NANOS(Default::default())), + _ => Err(general_err!(parse_fail_msg)), + }) +} + impl<'a> Parser<'a> { // Entry function to parse message type, uses internal tokenizer. fn parse_message_type(&mut self) -> Result { @@ -222,18 +258,29 @@ impl<'a> Parser<'a> { .next() .ok_or_else(|| general_err!("Expected name, found None"))?; - // Parse converted type if exists - let converted_type = if let Some("(") = self.tokenizer.next() { + // Parse logical or converted type if exists + let (logical_type, converted_type) = if let Some("(") = self.tokenizer.next() { let tpe = self .tokenizer .next() .ok_or_else(|| general_err!("Expected converted type, found None")) - .and_then(|v| v.to_uppercase().parse::())?; + .and_then(|v| { + // Try logical type first + let upper = v.to_uppercase(); + let logical = upper.parse::(); + match logical { + Ok(logical) => Ok(( + Some(logical.clone()), + ConvertedType::from(Some(logical)), + )), + Err(_) => Ok((None, upper.parse::()?)), + } + })?; assert_token(self.tokenizer.next(), ")")?; tpe } else { self.tokenizer.backtrack(); - ConvertedType::NONE + (None, ConvertedType::NONE) }; // Parse optional id @@ -246,6 +293,7 @@ impl<'a> Parser<'a> { let mut fields = self.parse_child_types()?; let mut builder = Type::group_type_builder(name) + .with_logical_type(logical_type) .with_converted_type(converted_type) .with_fields(&mut fields); if let Some(rep) = repetition { @@ -281,19 +329,142 @@ impl<'a> Parser<'a> { .ok_or_else(|| general_err!("Expected name, found None"))?; // Parse converted type - let (converted_type, precision, scale) = if let Some("(") = self.tokenizer.next() + let (logical_type, converted_type, precision, scale) = if let Some("(") = + self.tokenizer.next() { - let tpe = self + let (mut logical, mut converted) = self .tokenizer .next() - .ok_or_else(|| general_err!("Expected converted type, found None")) - .and_then(|v| v.to_uppercase().parse::())?; + .ok_or_else(|| { + general_err!("Expected logical or converted type, found None") + }) + .and_then(|v| { + let upper = v.to_uppercase(); + let logical = upper.parse::(); + match logical { + Ok(logical) => Ok(( + Some(logical.clone()), + ConvertedType::from(Some(logical)), + )), + Err(_) => Ok((None, upper.parse::()?)), + } + })?; // Parse precision and scale for decimals let mut precision: i32 = -1; let mut scale: i32 = -1; - if tpe == ConvertedType::DECIMAL { + // Parse the concrete logical type + if let Some(tpe) = &logical { + match tpe { + LogicalType::DECIMAL(_) => { + if let Some("(") = self.tokenizer.next() { + precision = parse_i32( + self.tokenizer.next(), + "Expected precision, found None", + "Failed to parse precision for DECIMAL type", + )?; + if let Some(",") = self.tokenizer.next() { + scale = parse_i32( + self.tokenizer.next(), + "Invalid boolean found", + "Failure to parse timezone info for TIME type", + )?; // TODO: this might not cater for the case of no scale correctly + assert_token(self.tokenizer.next(), ")")?; + logical = Some(LogicalType::DECIMAL(DecimalType { + precision, + scale, + })); + converted = ConvertedType::from(logical.clone()); + } else { + scale = 0; + logical = Some(LogicalType::DECIMAL(DecimalType { + precision, + scale, + })); + converted = ConvertedType::from(logical.clone()); + } + } + } + LogicalType::TIME(_) => { + if let Some("(") = self.tokenizer.next() { + let unit = parse_timeunit( + self.tokenizer.next(), + "Invalid timeunit found", + "Failed to parse timeunit for TIME type", + )?; + if let Some(",") = self.tokenizer.next() { + let is_adjusted_to_u_t_c = parse_bool( + self.tokenizer.next(), + "Invalid boolean found", + "Failure to parse timezone info for TIME type", + )?; + assert_token(self.tokenizer.next(), ")")?; + logical = Some(LogicalType::TIME(TimeType { + unit, + is_adjusted_to_u_t_c, + })); + converted = ConvertedType::from(logical.clone()); + } else { + // Invalid token for unit + self.tokenizer.backtrack(); + } + } + } + LogicalType::TIMESTAMP(_) => { + if let Some("(") = self.tokenizer.next() { + let unit = parse_timeunit( + self.tokenizer.next(), + "Invalid timeunit found", + "Failed to parse timeunit for TIMESTAMP type", + )?; + if let Some(",") = self.tokenizer.next() { + let is_adjusted_to_u_t_c = parse_bool( + self.tokenizer.next(), + "Invalid boolean found", + "Failure to parse timezone info for TIMESTAMP type", + )?; + assert_token(self.tokenizer.next(), ")")?; + logical = Some(LogicalType::TIMESTAMP(TimestampType { + unit, + is_adjusted_to_u_t_c, + })); + converted = ConvertedType::from(logical.clone()); + } else { + // Invalid token for unit + self.tokenizer.backtrack(); + } + } + } + LogicalType::INTEGER(_) => { + if let Some("(") = self.tokenizer.next() { + let bit_width = parse_i32( + self.tokenizer.next(), + "Invalid bit_width found", + "Failed to parse bit_width for INTEGER type", + )? as i8; + // TODO: check the unit against the physical type + if let Some(",") = self.tokenizer.next() { + let is_signed = parse_bool( + self.tokenizer.next(), + "Invalid boolean found", + "Failure to parse timezone info for TIMESTAMP type", + )?; + assert_token(self.tokenizer.next(), ")")?; + logical = Some(LogicalType::INTEGER(IntType { + bit_width, + is_signed, + })); + converted = ConvertedType::from(logical.clone()); + } else { + // Invalid token for unit + self.tokenizer.backtrack(); + } + } + } + _ => {} + } + } else if converted == ConvertedType::DECIMAL { if let Some("(") = self.tokenizer.next() { // Parse precision precision = parse_i32( @@ -322,10 +493,10 @@ impl<'a> Parser<'a> { } assert_token(self.tokenizer.next(), ")")?; - (tpe, precision, scale) + (logical, converted, precision, scale) } else { self.tokenizer.backtrack(); - (ConvertedType::NONE, -1, -1) + (None, ConvertedType::NONE, -1, -1) }; // Parse optional id @@ -339,6 +510,7 @@ impl<'a> Parser<'a> { let mut builder = Type::primitive_type_builder(name, physical_type) .with_repetition(repetition) + .with_logical_type(logical_type) .with_converted_type(converted_type) .with_length(length) .with_precision(precision) @@ -503,6 +675,129 @@ mod tests { assert!(result.is_ok()); } + #[test] + fn test_parse_message_type_integer() { + // Invalid integer syntax + let schema = " + message root { + optional int64 f1 (INTEGER()); + } + "; + let mut iter = Tokenizer::from_str(schema); + let result = Parser { + tokenizer: &mut iter, + } + .parse_message_type(); + assert!(result.is_err()); + + // Invalid integer syntax, needs both bit-width and UTC sign + let schema = " + message root { + optional int64 f1 (INTEGER(32,)); + } + "; + let mut iter = Tokenizer::from_str(schema); + let result = Parser { + tokenizer: &mut iter, + } + .parse_message_type(); + assert!(result.is_err()); + + // Invalid integer because of non-numeric bit width + let schema = " + message root { + optional int32 f1 (INTEGER(eight,true)); + } + "; + let mut iter = Tokenizer::from_str(schema); + let result = Parser { + tokenizer: &mut iter, + } + .parse_message_type(); + assert!(result.is_err()); + + // Valid types + let schema = " + message root { + optional int32 f1 (INTEGER(8,false)); + optional int32 f2 (INTEGER(8,true)); + optional int32 f3 (INTEGER(16,false)); + optional int32 f4 (INTEGER(16,true)); + optional int32 f5 (INTEGER(32,false)); + optional int32 f6 (INTEGER(32,true)); + optional int64 f7 (INTEGER(64,false)); + optional int64 f7 (INTEGER(64,true)); + } + "; + let mut iter = Tokenizer::from_str(schema); + let result = Parser { + tokenizer: &mut iter, + } + .parse_message_type(); + assert!(result.is_ok()); + } + + #[test] + fn test_parse_message_type_temporal() { + // Invalid timestamp syntax + let schema = " + message root { + optional int64 f1 (TIMESTAMP(); + } + "; + let mut iter = Tokenizer::from_str(schema); + let result = Parser { + tokenizer: &mut iter, + } + .parse_message_type(); + assert!(result.is_err()); + + // Invalid timestamp syntax, needs both unit and UTC adjustment + let schema = " + message root { + optional int64 f1 (TIMESTAMP(MILLIS,)); + } + "; + let mut iter = Tokenizer::from_str(schema); + let result = Parser { + tokenizer: &mut iter, + } + .parse_message_type(); + assert!(result.is_err()); + + // Invalid timestamp because of unknown unit + let schema = " + message root { + optional int64 f1 (TIMESTAMP(YOCTOS,)); + } + "; + let mut iter = Tokenizer::from_str(schema); + let result = Parser { + tokenizer: &mut iter, + } + .parse_message_type(); + assert!(result.is_err()); + + // Valid types + let schema = " + message root { + optional int32 f1 (DATE); + optional int32 f2 (TIME(MILLIS,true)); + optional int64 f3 (TIME(MICROS,false)); + optional int64 f4 (TIME(NANOS,true)); + optional int64 f5 (TIMESTAMP(MILLIS,true)); + optional int64 f6 (TIMESTAMP(MICROS,true)); + optional int64 f7 (TIMESTAMP(NANOS,false)); + } + "; + let mut iter = Tokenizer::from_str(schema); + let result = Parser { + tokenizer: &mut iter, + } + .parse_message_type(); + assert!(result.is_ok()); + } + #[test] fn test_parse_message_type_decimal() { // It is okay for decimal to omit precision and scale with right syntax. @@ -598,6 +893,10 @@ mod tests { "f1", PhysicalType::FIXED_LEN_BYTE_ARRAY, ) + .with_logical_type(Some(LogicalType::DECIMAL(DecimalType { + precision: 9, + scale: 3, + }))) .with_converted_type(ConvertedType::DECIMAL) .with_length(5) .with_precision(9) @@ -610,6 +909,10 @@ mod tests { "f2", PhysicalType::FIXED_LEN_BYTE_ARRAY, ) + .with_logical_type(Some(LogicalType::DECIMAL(DecimalType { + precision: 38, + scale: 18, + }))) .with_converted_type(ConvertedType::DECIMAL) .with_length(16) .with_precision(38) @@ -657,6 +960,9 @@ mod tests { Arc::new( Type::group_type_builder("a1") .with_repetition(Repetition::OPTIONAL) + .with_logical_type(Some(LogicalType::LIST( + Default::default(), + ))) .with_converted_type(ConvertedType::LIST) .with_fields(&mut vec![Arc::new( Type::primitive_type_builder( @@ -674,6 +980,9 @@ mod tests { Arc::new( Type::group_type_builder("b1") .with_repetition(Repetition::OPTIONAL) + .with_logical_type(Some(LogicalType::LIST( + Default::default(), + ))) .with_converted_type(ConvertedType::LIST) .with_fields(&mut vec![Arc::new( Type::group_type_builder("b2") @@ -760,6 +1069,7 @@ mod tests { ), Arc::new( Type::primitive_type_builder("_5", PhysicalType::INT32) + .with_logical_type(Some(LogicalType::DATE(Default::default()))) .with_converted_type(ConvertedType::DATE) .build() .unwrap(), @@ -778,4 +1088,80 @@ mod tests { .unwrap(); assert_eq!(message, expected); } + + #[test] + fn test_parse_message_type_compare_4() { + let schema = " + message root { + required int32 _1 (INTEGER(8,true)); + required int32 _2 (INTEGER(16,false)); + required float _3; + required double _4; + optional int32 _5 (TIME(MILLIS,false)); + optional binary _6 (STRING); + } + "; + let mut iter = Tokenizer::from_str(schema); + let message = Parser { + tokenizer: &mut iter, + } + .parse_message_type() + .unwrap(); + + let mut fields = vec![ + Arc::new( + Type::primitive_type_builder("_1", PhysicalType::INT32) + .with_repetition(Repetition::REQUIRED) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 8, + is_signed: true, + }))) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_2", PhysicalType::INT32) + .with_repetition(Repetition::REQUIRED) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 16, + is_signed: false, + }))) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_3", PhysicalType::FLOAT) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_4", PhysicalType::DOUBLE) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_5", PhysicalType::INT32) + .with_logical_type(Some(LogicalType::TIME(TimeType { + unit: TimeUnit::MILLIS(Default::default()), + is_adjusted_to_u_t_c: false, + }))) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_6", PhysicalType::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::STRING(Default::default()))) + .build() + .unwrap(), + ), + ]; + + let expected = Type::group_type_builder("root") + .with_fields(&mut fields) + .build() + .unwrap(); + assert_eq!(message, expected); + } } diff --git a/rust/parquet/src/schema/printer.rs b/rust/parquet/src/schema/printer.rs index 4079534986529..e57513a22cea9 100644 --- a/rust/parquet/src/schema/printer.rs +++ b/rust/parquet/src/schema/printer.rs @@ -204,6 +204,70 @@ fn print_timeunit(unit: &TimeUnit) -> &str { } } +#[inline] +fn print_logical_and_converted( + logical_type: &Option, + converted_type: ConvertedType, + precision: i32, + scale: i32, +) -> String { + match logical_type { + Some(logical_type) => match logical_type { + LogicalType::INTEGER(t) => { + format!("INTEGER({},{})", t.bit_width, t.is_signed) + } + LogicalType::DECIMAL(t) => { + format!("DECIMAL({},{})", t.precision, t.scale) + } + LogicalType::TIMESTAMP(t) => { + format!( + "TIMESTAMP({},{})", + print_timeunit(&t.unit), + t.is_adjusted_to_u_t_c + ) + } + LogicalType::TIME(t) => { + format!( + "TIME({},{})", + print_timeunit(&t.unit), + t.is_adjusted_to_u_t_c + ) + } + LogicalType::DATE(_) => "DATE".to_string(), + LogicalType::BSON(_) => "BSON".to_string(), + LogicalType::JSON(_) => "JSON".to_string(), + LogicalType::STRING(_) => "STRING".to_string(), + LogicalType::UUID(_) => "UUID".to_string(), + LogicalType::ENUM(_) => "ENUM".to_string(), + LogicalType::LIST(_) => "LIST".to_string(), + LogicalType::MAP(_) => "MAP".to_string(), + LogicalType::UNKNOWN(_) => "UNKNOWN".to_string(), + }, + None => { + // Also print converted type if it is available + match converted_type { + ConvertedType::NONE => format!(""), + decimal @ ConvertedType::DECIMAL => { + // For decimal type we should print precision and scale if they + // are > 0, e.g. DECIMAL(9, 2) - + // DECIMAL(9) - DECIMAL + let precision_scale = match (precision, scale) { + (p, s) if p > 0 && s > 0 => { + format!("{}, {}", p, s) + } + (p, 0) if p > 0 => format!("{}", p), + _ => format!(""), + }; + format!("{}{}", decimal, precision_scale) + } + other_converted_type => { + format!("{}", other_converted_type) + } + } + } + } +} + #[allow(unused_must_use)] impl<'a> Printer<'a> { pub fn print(&mut self, tp: &Type) { @@ -225,69 +289,30 @@ impl<'a> Printer<'a> { }; // Also print logical type if it is available // If there is a logical type, do not print converted type - let logical_type_str = match basic_info.logical_type() { - Some(logical_type) => match logical_type { - LogicalType::INTEGER(t) => { - format!(" (INTEGER({},{}))", t.bit_width, t.is_signed) - } - LogicalType::DECIMAL(t) => { - format!(" (DECIMAL({},{}))", t.precision, t.scale) - } - LogicalType::TIMESTAMP(t) => { - format!( - " (TIMESTAMP({},{}))", - print_timeunit(&t.unit), - t.is_adjusted_to_u_t_c - ) - } - LogicalType::TIME(t) => { - format!( - " (TIME({},{}))", - print_timeunit(&t.unit), - t.is_adjusted_to_u_t_c - ) - } - LogicalType::DATE(_) => " (DATE)".to_string(), - LogicalType::BSON(_) => " (BSON)".to_string(), - LogicalType::JSON(_) => " (JSON)".to_string(), - LogicalType::STRING(_) => " (STRING)".to_string(), - LogicalType::UUID(_) => " (UUID)".to_string(), - LogicalType::ENUM(_) => " (ENUM)".to_string(), - LogicalType::LIST(_) => " (LIST)".to_string(), - LogicalType::MAP(_) => " (MAP)".to_string(), - LogicalType::UNKNOWN(_) => " (UNKNOWN)".to_string(), - }, - None => { - // Also print converted type if it is available - match basic_info.converted_type() { - ConvertedType::NONE => format!(""), - decimal @ ConvertedType::DECIMAL => { - // For decimal type we should print precision and scale if they - // are > 0, e.g. DECIMAL(9, 2) - - // DECIMAL(9) - DECIMAL - let precision_scale = match (precision, scale) { - (p, s) if p > 0 && s > 0 => { - format!(" ({}, {})", p, s) - } - (p, 0) if p > 0 => format!(" ({})", p), - _ => format!(""), - }; - format!(" ({}{})", decimal, precision_scale) - } - other_converted_type => { - format!(" ({})", other_converted_type) - } - } - } - }; - write!( - self.output, - "{} {} {}{};", - basic_info.repetition(), - phys_type_str, - basic_info.name(), - logical_type_str + let logical_type_str = print_logical_and_converted( + &basic_info.logical_type(), + basic_info.converted_type(), + scale, + precision, ); + if logical_type_str.is_empty() { + write!( + self.output, + "{} {} {};", + basic_info.repetition(), + phys_type_str, + basic_info.name() + ); + } else { + write!( + self.output, + "{} {} {} ({});", + basic_info.repetition(), + phys_type_str, + basic_info.name(), + logical_type_str + ); + } } Type::GroupType { ref basic_info, @@ -296,10 +321,14 @@ impl<'a> Printer<'a> { if basic_info.has_repetition() { let r = basic_info.repetition(); write!(self.output, "{} group {} ", r, basic_info.name()); - if let Some(logical_type) = basic_info.logical_type() { - write!(self.output, "({:?}) ", logical_type); - } else if basic_info.converted_type() != ConvertedType::NONE { - write!(self.output, "({}) ", basic_info.converted_type()); + let logical_str = print_logical_and_converted( + &basic_info.logical_type(), + basic_info.converted_type(), + 0, + 0, + ); + if !logical_str.is_empty() { + write!(self.output, "({}) ", logical_str); } writeln!(self.output, "{{"); } else { @@ -338,6 +367,7 @@ mod tests { let mut p = Printer::new(&mut s); p.print(&message); } + println!("{}", &s); let parsed = parse_message_type(&s).unwrap(); assert_eq!(message, parsed); } @@ -678,6 +708,7 @@ mod tests { let a1 = Type::group_type_builder("a1") .with_repetition(Repetition::OPTIONAL) + .with_logical_type(Some(LogicalType::LIST(Default::default()))) .with_converted_type(ConvertedType::LIST) .with_fields(&mut vec![Arc::new(a2)]) .build() @@ -702,6 +733,7 @@ mod tests { let b1 = Type::group_type_builder("b1") .with_repetition(Repetition::OPTIONAL) + .with_logical_type(Some(LogicalType::LIST(Default::default()))) .with_converted_type(ConvertedType::LIST) .with_fields(&mut vec![Arc::new(b2)]) .build() @@ -760,6 +792,10 @@ mod tests { fn test_print_and_parse_decimal() { let f1 = Type::primitive_type_builder("f1", PhysicalType::INT32) .with_repetition(Repetition::OPTIONAL) + .with_logical_type(Some(LogicalType::DECIMAL(DecimalType { + precision: 9, + scale: 2, + }))) .with_converted_type(ConvertedType::DECIMAL) .with_precision(9) .with_scale(2) @@ -768,6 +804,10 @@ mod tests { let f2 = Type::primitive_type_builder("f2", PhysicalType::INT32) .with_repetition(Repetition::OPTIONAL) + .with_logical_type(Some(LogicalType::DECIMAL(DecimalType { + precision: 9, + scale: 0, + }))) .with_converted_type(ConvertedType::DECIMAL) .with_precision(9) .with_scale(0) From ffab87f144d40711c08a86a27f6997185c4e6e21 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Thu, 18 Mar 2021 20:44:23 +0200 Subject: [PATCH 5/6] test logical types, remove TODO --- rust/parquet/src/arrow/schema.rs | 51 +++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index fe9f60666bc9d..e8be9c3937c71 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -942,11 +942,14 @@ mod tests { REQUIRED BOOLEAN boolean; REQUIRED INT32 int8 (INT_8); REQUIRED INT32 int16 (INT_16); + REQUIRED INT32 uint8 (INTEGER(8,false)); + REQUIRED INT32 uint16 (INTEGER(16,false)); REQUIRED INT32 int32; REQUIRED INT64 int64 ; OPTIONAL DOUBLE double; OPTIONAL FLOAT float; OPTIONAL BINARY string (UTF8); + OPTIONAL BINARY string_2 (STRING); } "; let parquet_group_type = parse_message_type(message_type).unwrap(); @@ -959,11 +962,14 @@ mod tests { Field::new("boolean", DataType::Boolean, false), Field::new("int8", DataType::Int8, false), Field::new("int16", DataType::Int16, false), + Field::new("uint8", DataType::UInt8, false), + Field::new("uint16", DataType::UInt16, false), Field::new("int32", DataType::Int32, false), Field::new("int64", DataType::Int64, false), Field::new("double", DataType::Float64, true), Field::new("float", DataType::Float32, true), Field::new("string", DataType::Utf8, true), + Field::new("string_2", DataType::Utf8, true), ]; assert_eq!(&arrow_fields, converted_arrow_schema.fields()); @@ -1508,9 +1514,11 @@ mod tests { message test_schema { REQUIRED BOOLEAN boolean; REQUIRED INT32 int8 (INT_8); + REQUIRED INT32 uint8 (INTEGER(8,false)); REQUIRED INT32 int16 (INT_16); + REQUIRED INT32 uint16 (INTEGER(16,false)); REQUIRED INT32 int32; - REQUIRED INT64 int64 ; + REQUIRED INT64 int64; OPTIONAL DOUBLE double; OPTIONAL FLOAT float; OPTIONAL BINARY string (UTF8); @@ -1518,8 +1526,10 @@ mod tests { OPTIONAL INT32 date (DATE); OPTIONAL INT32 time_milli (TIME_MILLIS); OPTIONAL INT64 time_micro (TIME_MICROS); + OPTIONAL INT64 time_nano (TIME(NANOS,false)); OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS); REQUIRED INT64 ts_micro (TIMESTAMP_MICROS); + REQUIRED INT64 ts_nano (TIMESTAMP(NANOS,true)); } "; let parquet_group_type = parse_message_type(message_type).unwrap(); @@ -1534,7 +1544,9 @@ mod tests { let arrow_fields = vec![ Field::new("boolean", DataType::Boolean, false), Field::new("int8", DataType::Int8, false), + Field::new("uint8", DataType::UInt8, false), Field::new("int16", DataType::Int16, false), + Field::new("uint16", DataType::UInt16, false), Field::new("int32", DataType::Int32, false), Field::new("int64", DataType::Int64, false), Field::new("double", DataType::Float64, true), @@ -1548,6 +1560,7 @@ mod tests { Field::new("date", DataType::Date32, true), Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true), Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true), + Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true), Field::new( "ts_milli", DataType::Timestamp(TimeUnit::Millisecond, None), @@ -1558,24 +1571,28 @@ mod tests { DataType::Timestamp(TimeUnit::Microsecond, None), false, ), + Field::new( + "ts_nano", + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_string())), + false, + ), ]; assert_eq!(arrow_fields, converted_arrow_fields); } #[test] - #[ignore = "To be addressed as part of ARROW-11365"] fn test_field_to_column_desc() { let message_type = " message arrow_schema { REQUIRED BOOLEAN boolean; REQUIRED INT32 int8 (INT_8); - REQUIRED INT32 int16 (INT_16); + REQUIRED INT32 int16 (INTEGER(16,true)); REQUIRED INT32 int32; REQUIRED INT64 int64; OPTIONAL DOUBLE double; OPTIONAL FLOAT float; - OPTIONAL BINARY string (UTF8); + OPTIONAL BINARY string (STRING); OPTIONAL GROUP bools (LIST) { REPEATED GROUP list { OPTIONAL BOOLEAN element; @@ -1587,20 +1604,20 @@ mod tests { } } OPTIONAL INT32 date (DATE); - OPTIONAL INT32 time_milli (TIME_MILLIS); + OPTIONAL INT32 time_milli (TIME(MILLIS,false)); OPTIONAL INT64 time_micro (TIME_MICROS); OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS); - REQUIRED INT64 ts_micro (TIMESTAMP_MICROS); + REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false)); REQUIRED GROUP struct { REQUIRED BOOLEAN bools; - REQUIRED INT32 uint32 (UINT_32); + REQUIRED INT32 uint32 (INTEGER(32,false)); REQUIRED GROUP int32 (LIST) { REPEATED GROUP list { OPTIONAL INT32 element; } } } - REQUIRED BINARY dictionary_strings (UTF8); + REQUIRED BINARY dictionary_strings (STRING); } "; let parquet_group_type = parse_message_type(message_type).unwrap(); @@ -1674,8 +1691,20 @@ mod tests { .iter() .zip(converted_arrow_schema.columns()) .for_each(|(a, b)| { - // TODO: ARROW-11365: If parsing v1 format, there should be no logical type - assert_eq!(a, b); + // Only check logical type if it's set on the Parquet side. + // This is because the Arrow conversion always sets logical type, + // even if there wasn't originally one. + // This is not an issue, but is an inconvenience for this test. + match a.logical_type() { + Some(_) => { + assert_eq!(a, b) + } + None => { + assert_eq!(a.name(), b.name()); + assert_eq!(a.physical_type(), b.physical_type()); + assert_eq!(a.converted_type(), b.converted_type()); + } + }; }); } @@ -1694,7 +1723,7 @@ mod tests { fn test_metadata() { let message_type = " message test_schema { - OPTIONAL BINARY string (UTF8); + OPTIONAL BINARY string (STRING); } "; let parquet_group_type = parse_message_type(message_type).unwrap(); From 6aaf4fae3322b7dfdece25123063acea00a19166 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Wed, 24 Mar 2021 14:03:17 +0200 Subject: [PATCH 6/6] address review comments --- rust/parquet/src/arrow/schema.rs | 2 +- rust/parquet/src/schema/parser.rs | 106 +++++++++++++++++++++++++----- 2 files changed, 91 insertions(+), 17 deletions(-) diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index e8be9c3937c71..b15bb7e414028 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -1514,7 +1514,7 @@ mod tests { message test_schema { REQUIRED BOOLEAN boolean; REQUIRED INT32 int8 (INT_8); - REQUIRED INT32 uint8 (INTEGER(8,false)); + REQUIRED INT32 uint8 (INTEGER(8,false)); REQUIRED INT32 int16 (INT_16); REQUIRED INT32 uint16 (INTEGER(16,false)); REQUIRED INT32 int32; diff --git a/rust/parquet/src/schema/parser.rs b/rust/parquet/src/schema/parser.rs index af1dc52be1a41..3ce347c8745b7 100644 --- a/rust/parquet/src/schema/parser.rs +++ b/rust/parquet/src/schema/parser.rs @@ -367,9 +367,9 @@ impl<'a> Parser<'a> { if let Some(",") = self.tokenizer.next() { scale = parse_i32( self.tokenizer.next(), - "Invalid boolean found", - "Failure to parse timezone info for TIME type", - )?; // TODO: this might not cater for the case of no scale correctly + "Expected scale, found None", + "Failed to parse scale for DECIMAL type", + )?; assert_token(self.tokenizer.next(), ")")?; logical = Some(LogicalType::DECIMAL(DecimalType { precision, @@ -397,7 +397,7 @@ impl<'a> Parser<'a> { let is_adjusted_to_u_t_c = parse_bool( self.tokenizer.next(), "Invalid boolean found", - "Failure to parse timezone info for TIME type", + "Failed to parse timezone info for TIME type", )?; assert_token(self.tokenizer.next(), ")")?; logical = Some(LogicalType::TIME(TimeType { @@ -422,7 +422,7 @@ impl<'a> Parser<'a> { let is_adjusted_to_u_t_c = parse_bool( self.tokenizer.next(), "Invalid boolean found", - "Failure to parse timezone info for TIMESTAMP type", + "Failed to parse timezone info for TIMESTAMP type", )?; assert_token(self.tokenizer.next(), ")")?; logical = Some(LogicalType::TIMESTAMP(TimestampType { @@ -443,12 +443,29 @@ impl<'a> Parser<'a> { "Invalid bit_width found", "Failed to parse bit_width for INTEGER type", )? as i8; - // TODO: check the unit against the physical type + match physical_type { + PhysicalType::INT32 => { + match bit_width { + 8 | 16 | 32 => {} + _ => { + return Err(general_err!("Incorrect bit width {} for INT32", bit_width)) + } + } + } + PhysicalType::INT64 => { + if bit_width != 64 { + return Err(general_err!("Incorrect bit width {} for INT64", bit_width)) + } + } + _ => { + return Err(general_err!("Logical type INTEGER cannot be used with physical type {}", physical_type)) + } + } if let Some(",") = self.tokenizer.next() { let is_signed = parse_bool( self.tokenizer.next(), "Invalid boolean found", - "Failure to parse timezone info for TIMESTAMP type", + "Failed to parse is_signed for INTEGER type", )?; assert_token(self.tokenizer.next(), ")")?; logical = Some(LogicalType::INTEGER(IntType { @@ -688,7 +705,10 @@ mod tests { tokenizer: &mut iter, } .parse_message_type(); - assert!(result.is_err()); + assert_eq!( + result, + Err(general_err!("Failed to parse bit_width for INTEGER type")) + ); // Invalid integer syntax, needs both bit-width and UTC sign let schema = " @@ -701,7 +721,10 @@ mod tests { tokenizer: &mut iter, } .parse_message_type(); - assert!(result.is_err()); + assert_eq!( + result, + Err(general_err!("Incorrect bit width 32 for INT64")) + ); // Invalid integer because of non-numeric bit width let schema = " @@ -714,7 +737,10 @@ mod tests { tokenizer: &mut iter, } .parse_message_type(); - assert!(result.is_err()); + assert_eq!( + result, + Err(general_err!("Failed to parse bit_width for INTEGER type")) + ); // Valid types let schema = " @@ -750,7 +776,10 @@ mod tests { tokenizer: &mut iter, } .parse_message_type(); - assert!(result.is_err()); + assert_eq!( + result, + Err(general_err!("Failed to parse timeunit for TIMESTAMP type")) + ); // Invalid timestamp syntax, needs both unit and UTC adjustment let schema = " @@ -763,7 +792,12 @@ mod tests { tokenizer: &mut iter, } .parse_message_type(); - assert!(result.is_err()); + assert_eq!( + result, + Err(general_err!( + "Failed to parse timezone info for TIMESTAMP type" + )) + ); // Invalid timestamp because of unknown unit let schema = " @@ -776,7 +810,10 @@ mod tests { tokenizer: &mut iter, } .parse_message_type(); - assert!(result.is_err()); + assert_eq!( + result, + Err(general_err!("Failed to parse timeunit for TIMESTAMP type")) + ); // Valid types let schema = " @@ -1097,8 +1134,12 @@ mod tests { required int32 _2 (INTEGER(16,false)); required float _3; required double _4; - optional int32 _5 (TIME(MILLIS,false)); - optional binary _6 (STRING); + optional int32 _5 (DATE); + optional int32 _6 (TIME(MILLIS,false)); + optional int64 _7 (TIME(MICROS,true)); + optional int64 _8 (TIMESTAMP(MILLIS,true)); + optional int64 _9 (TIMESTAMP(NANOS,false)); + optional binary _10 (STRING); } "; let mut iter = Tokenizer::from_str(schema); @@ -1143,6 +1184,12 @@ mod tests { ), Arc::new( Type::primitive_type_builder("_5", PhysicalType::INT32) + .with_logical_type(Some(LogicalType::DATE(Default::default()))) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_6", PhysicalType::INT32) .with_logical_type(Some(LogicalType::TIME(TimeType { unit: TimeUnit::MILLIS(Default::default()), is_adjusted_to_u_t_c: false, @@ -1151,7 +1198,34 @@ mod tests { .unwrap(), ), Arc::new( - Type::primitive_type_builder("_6", PhysicalType::BYTE_ARRAY) + Type::primitive_type_builder("_7", PhysicalType::INT64) + .with_logical_type(Some(LogicalType::TIME(TimeType { + unit: TimeUnit::MICROS(Default::default()), + is_adjusted_to_u_t_c: true, + }))) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_8", PhysicalType::INT64) + .with_logical_type(Some(LogicalType::TIMESTAMP(TimestampType { + unit: TimeUnit::MILLIS(Default::default()), + is_adjusted_to_u_t_c: true, + }))) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_9", PhysicalType::INT64) + .with_logical_type(Some(LogicalType::TIMESTAMP(TimestampType { + unit: TimeUnit::NANOS(Default::default()), + is_adjusted_to_u_t_c: false, + }))) + .build() + .unwrap(), + ), + Arc::new( + Type::primitive_type_builder("_10", PhysicalType::BYTE_ARRAY) .with_logical_type(Some(LogicalType::STRING(Default::default()))) .build() .unwrap(),