Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept parquet schemas without explicitly required Map keys #5630

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 40 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,46 @@ mod tests {
assert_eq!(row_count, 300);
}

#[test]
fn test_read_incorrect_map_schema_file() {
let testdata = arrow::util::test_util::parquet_test_data();
// see https://github.com/apache/parquet-testing/pull/47
let path = format!("{testdata}/incorrect_map_schema.parquet");
let file = File::open(path).unwrap();
let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();

let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 1);

let expected_schema = Schema::new(Fields::from(vec![Field::new(
"my_map",
ArrowDataType::Map(
Arc::new(Field::new(
"key_value",
ArrowDataType::Struct(Fields::from(vec![
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Utf8, true),
])),
false,
)),
false.into(),
),
true,
)]));
assert_eq!(batch.schema().as_ref(), &expected_schema);
jupiter marked this conversation as resolved.
Show resolved Hide resolved

assert_eq!(batch.num_rows(), 1);
assert_eq!(batch.column(0).null_count(), 0);
assert_eq!(
batch.column(0).as_map().keys().as_ref(),
&StringArray::from(vec!["parent", "name"])
);
assert_eq!(
batch.column(0).as_map().values().as_ref(),
&StringArray::from(vec!["another", "report"])
);
}

/// Parameters for single_column_reader_test
#[derive(Clone)]
struct TestOptions {
Expand Down
10 changes: 7 additions & 3 deletions parquet/src/arrow/schema/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ impl Visitor {
let map_key = &map_key_value.get_fields()[0];
let map_value = &map_key_value.get_fields()[1];

if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
return Err(arrow_err!("Map keys must be required"));
if map_key.get_basic_info().repetition() == Repetition::REPEATED {
jupiter marked this conversation as resolved.
Show resolved Hide resolved
return Err(arrow_err!("Map keys cannot be repeated"));
}

if map_value.get_basic_info().repetition() == Repetition::REPEATED {
Expand Down Expand Up @@ -346,7 +346,11 @@ impl Visitor {
// Need both columns to be projected
match (maybe_key, maybe_value) {
(Some(key), Some(value)) => {
let key_field = Arc::new(convert_field(map_key, &key, arrow_key));
let key_field = Arc::new(
convert_field(map_key, &key, arrow_key)
// The key is always non-nullable (#5630)
.with_nullable(false),
);
let value_field = Arc::new(convert_field(map_value, &value, arrow_value));
let field_metadata = match arrow_map {
Some(field) => field.metadata().clone(),
Expand Down
24 changes: 24 additions & 0 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,12 @@ mod tests {
OPTIONAL int32 value;
}
}
REQUIRED group my_map4 (MAP) {
REPEATED group map {
OPTIONAL binary key (UTF8);
REQUIRED int32 value;
}
}
}
";

Expand Down Expand Up @@ -1075,6 +1081,24 @@ mod tests {
));
}

// // Map<String, Integer> (non-compliant nullable key)
// group my_map (MAP_KEY_VALUE) {
// repeated group map {
// optional binary key (UTF8);
// required int32 value;
// }
// }
{
arrow_fields.push(Field::new_map(
"my_map4",
"map",
Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
Field::new("value", DataType::Int32, false),
false,
false,
));
}

let parquet_group_type = parse_message_type(message_type).unwrap();

let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
Expand Down