Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 91 additions & 7 deletions datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,8 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
}
}
AvroSchema::Array(schema) => {
let sub_parent_field_name = format!("{parent_field_name}.element");
Self::child_schema_lookup(
&sub_parent_field_name,
parent_field_name,
&schema.items,
schema_lookup,
)?;
Expand Down Expand Up @@ -596,10 +595,7 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
})
.collect();

let sub_parent_field_name =
format!("{}.{}", parent_field_name, list_field.name());
let arrays =
self.build_struct_array(&rows, &sub_parent_field_name, fields)?;
let arrays = self.build_struct_array(&rows, parent_field_name, fields)?;
let data_type = DataType::Struct(fields.clone());
ArrayDataBuilder::new(data_type)
.len(rows.len())
Expand Down Expand Up @@ -1038,7 +1034,7 @@ where
mod test {
use crate::avro_to_arrow::{Reader, ReaderBuilder};
use arrow::array::Array;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, Fields};
use arrow::datatypes::{Field, TimeUnit};
use datafusion_common::assert_batches_eq;
use datafusion_common::cast::{
Expand Down Expand Up @@ -1720,4 +1716,92 @@ mod test {
assert_eq!(2, num_batches);
assert_eq!(28, sum_id);
}

#[test]
fn test_list_of_structs_with_custom_field_name() {
let schema = apache_avro::Schema::parse_str(
r#"
{
"type": "record",
"name": "root",
"fields": [
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "item_record",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
}
}
]
}"#,
)
.unwrap();

let r1 = apache_avro::to_value(serde_json::json!({
"items": [
{
"id": 1,
"name": "first"
},
{
"id": 2,
"name": "second"
}
]
}))
.unwrap()
.resolve(&schema)
.unwrap();

let mut w = apache_avro::Writer::new(&schema, vec![]);
w.append(r1).unwrap();
let bytes = w.into_inner().unwrap();

// Create an Arrow schema where the list field is NOT named "element"
let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![Field::new(
"items",
DataType::List(Arc::new(Field::new(
"item", // This is NOT "element"
DataType::Struct(Fields::from(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
])),
false,
))),
false,
)]));

let mut reader = ReaderBuilder::new()
.with_schema(arrow_schema)
.with_batch_size(10)
.build(std::io::Cursor::new(bytes))
.unwrap();

// This used to fail because schema_lookup would have "items.element.id" and "items.element.name"
// but build_struct_array will try to look up "items.item.id" and "items.item.name",
// Now it it is simply "items.id" and "items.name"
let batch = reader.next().unwrap().unwrap();

let expected = [
"+-----------------------------------------------+",
"| items |",
"+-----------------------------------------------+",
"| [{id: 1, name: first}, {id: 2, name: second}] |",
"+-----------------------------------------------+",
];
assert_batches_eq!(expected, &[batch]);
}
}