diff --git a/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs index 5b1f534ad78b..0b106849abac 100644 --- a/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs @@ -26,8 +26,8 @@ use apache_avro::{ }; use arrow::array::{ make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef, - BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait, - PrimitiveArray, StringArray, StringBuilder, StringDictionaryBuilder, + BooleanBuilder, LargeBinaryArray, LargeStringArray, ListBuilder, NullArray, + OffsetSizeTrait, PrimitiveArray, StringArray, StringBuilder, StringDictionaryBuilder, }; use arrow::array::{BinaryArray, FixedSizeBinaryArray, GenericListArray}; use arrow::buffer::{Buffer, MutableBuffer}; @@ -48,6 +48,7 @@ use arrow::util::bit_util; use datafusion_common::arrow_err; use datafusion_common::error::{DataFusionError, Result}; use num_traits::NumCast; +use std::borrow::Cow; use std::collections::BTreeMap; use std::io::Read; use std::sync::Arc; @@ -316,14 +317,14 @@ impl AvroArrowArrayReader<'_, R> { if let Some(value) = self.field_lookup(col_name, row) { let value = maybe_resolve_union(value); // value can be an array or a scalar - let vals: Vec> = if let Value::String(v) = value { - vec![Some(v.to_string())] + let vals: Vec> = if let Value::String(v) = value { + vec![Some(v.as_str())] } else if let Value::Array(n) = value { n.iter() .map(resolve_string) - .collect::>>>()? + .collect::>>>()? .into_iter() - .collect::>>() + .collect::>>() } else if let Value::Null = value { vec![None] } else if !matches!(value, Value::Record(_)) { @@ -344,13 +345,8 @@ impl AvroArrowArrayReader<'_, R> { .ok_or_else(||SchemaError( "Cast failed for ListBuilder during nested data parsing".to_string(), ))?; - for val in vals { - if let Some(v) = val { - builder.values().append_value(&v) - } else { - builder.values().append_null() - }; - } + vals.into_iter() + .for_each(|val| builder.values().append_option(val)); // Append to the list builder.append(true); @@ -359,13 +355,15 @@ impl AvroArrowArrayReader<'_, R> { let builder = builder.as_any_mut().downcast_mut::>>().ok_or_else(||SchemaError( "Cast failed for ListBuilder during nested data parsing".to_string(), ))?; - for val in vals { + vals.into_iter().try_for_each(|val| { if let Some(v) = val { - let _ = builder.values().append(&v)?; + builder.values().append(v)?; } else { - builder.values().append_null() - }; - } + builder.values().append_null(); + } + + Result::<_, ArrowError>::Ok(()) + })?; // Append to the list builder.append(true); @@ -538,6 +536,21 @@ impl AvroArrowArrayReader<'_, R> { .into_iter() .collect::() .into_data(), + DataType::Binary => flatten_binary_values(rows) + .into_iter() + .collect::() + .into_data(), + DataType::LargeBinary => flatten_binary_values(rows) + .into_iter() + .collect::() + .into_data(), + DataType::FixedSizeBinary(n) => { + FixedSizeBinaryArray::try_from_sparse_iter_with_size( + flatten_binary_values(rows).into_iter(), + *n, + )? + .into_data() + } DataType::List(field) => { let child = self.build_nested_list_array::( parent_field_name, @@ -732,7 +745,7 @@ impl AvroArrowArrayReader<'_, R> { ))) } }, - DataType::Utf8 | DataType::LargeUtf8 => Arc::new( + DataType::Utf8 => Arc::new( rows.iter() .map(|row| { let maybe_value = self.field_lookup(&field_path, row); @@ -742,17 +755,34 @@ impl AvroArrowArrayReader<'_, R> { } }) .collect::>()?, - ) - as ArrayRef, - DataType::Binary | DataType::LargeBinary => Arc::new( + ) as ArrayRef, + DataType::LargeUtf8 => Arc::new( + rows.iter() + .map(|row| { + let maybe_value = self.field_lookup(&field_path, row); + match maybe_value { + None => Ok(None), + Some(v) => resolve_string(v), + } + }) + .collect::>()?, + ) as ArrayRef, + DataType::Binary => Arc::new( rows.iter() .map(|row| { let maybe_value = self.field_lookup(&field_path, row); maybe_value.and_then(resolve_bytes) }) .collect::(), - ) - as ArrayRef, + ) as ArrayRef, + DataType::LargeBinary => Arc::new( + rows.iter() + .map(|row| { + let maybe_value = self.field_lookup(&field_path, row); + maybe_value.and_then(resolve_bytes) + }) + .collect::(), + ) as ArrayRef, DataType::FixedSizeBinary(ref size) => { Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size( rows.iter().map(|row| { @@ -895,11 +925,31 @@ fn flatten_values<'a>(values: &[&'a Value]) -> Vec<&'a Value> { .collect() } +/// Flattens a list into binary values, dropping Value::Null in the process. +/// This is useful for interpreting any Avro array as bytes, dropping nulls. +/// See `resolve_bytes`. +#[inline] +fn flatten_binary_values<'a>(values: &[&'a Value]) -> Vec>> { + values + .iter() + .flat_map(|row| { + let row = maybe_resolve_union(row); + if let Value::Array(values) = row { + values.iter().map(resolve_bytes).collect::>>() + } else if let Value::Null = row { + vec![] + } else { + vec![resolve_bytes(row)] + } + }) + .collect::>>() +} + /// Flattens a list into string values, dropping Value::Null in the process. /// This is useful for interpreting any Avro array as string, dropping nulls. -/// See `value_as_string`. +/// See `resolve_string`. #[inline] -fn flatten_string_values(values: &[&Value]) -> Vec> { +fn flatten_string_values<'a>(values: &[&'a Value]) -> Vec> { values .iter() .flat_map(|row| { @@ -920,15 +970,14 @@ fn flatten_string_values(values: &[&Value]) -> Vec> { /// Reads an Avro value as a string, regardless of its type. /// This is useful if the expected datatype is a string, in which case we preserve -/// all the values regardless of they type. -fn resolve_string(v: &Value) -> ArrowResult> { +/// all the values regardless of their type. +fn resolve_string(v: &Value) -> ArrowResult> { let v = if let Value::Union(_, b) = v { b } else { v }; match v { - Value::String(s) => Ok(Some(s.clone())), - Value::Bytes(bytes) => String::from_utf8(bytes.to_vec()) - .map_err(|e| AvroError::new(AvroErrorDetails::ConvertToUtf8(e))) + Value::String(s) | Value::Enum(_, s) => Ok(Some(s.as_str())), + Value::Bytes(bytes) | Value::Fixed(_, bytes) => str::from_utf8(bytes.as_slice()) + .map_err(|e| AvroError::new(AvroErrorDetails::ConvertToUtf8Error(e))) .map(Some), - Value::Enum(_, s) => Ok(Some(s.clone())), Value::Null => Ok(None), other => Err(AvroError::new(AvroErrorDetails::GetString(other.clone()))), } @@ -948,16 +997,22 @@ fn resolve_u8(v: &Value) -> Option { } } -fn resolve_bytes(v: &Value) -> Option> { +fn resolve_bytes(v: &'_ Value) -> Option> { let v = match v { Value::Union(_, inner) => inner.as_ref(), _ => v, }; match v { - Value::Bytes(bytes) => Some(bytes.clone()), - Value::String(s) => Some(s.as_bytes().to_vec()), - Value::Array(items) => items.iter().map(resolve_u8).collect::>>(), + Value::Bytes(bytes) | Value::Fixed(_, bytes) => { + Some(Cow::Borrowed(bytes.as_slice())) + } + Value::String(s) => Some(Cow::Borrowed(s.as_bytes())), + Value::Array(items) => items + .iter() + .map(resolve_u8) + .collect::>>() + .map(Cow::Owned), _ => None, } } @@ -1141,42 +1196,149 @@ mod test { }] }], "default": null + }, + { + "name": "byte_list", + "type": ["null", { + "type": "array", + "items": ["null", "bytes"] + }], + "default": null } ] }"#, ) - .unwrap(); - let r1 = apache_avro::to_value(serde_json::json!({ - "headers": [ - { - "name": "a", - "value": "b" - } - ] - })) - .unwrap() - .resolve(&schema) - .unwrap(); + .unwrap(); + + use apache_avro::types::Value; + + // Create records using Avro Value types directly for proper byte handling + let r1 = Value::Record(vec![ + ( + "headers".to_string(), + Value::Union( + 1, + Box::new(Value::Array(vec![Value::Union( + 1, + Box::new(Value::Record(vec![ + ( + "name".to_string(), + Value::Union(1, Box::new(Value::String("a".to_string()))), + ), + ( + "value".to_string(), + Value::Union(1, Box::new(Value::String("b".to_string()))), + ), + ])), + )])), + ), + ), + ( + "byte_list".to_string(), + Value::Union( + 1, + Box::new(Value::Array(vec![ + Value::Union(1, Box::new(Value::Bytes(b"hello".to_vec()))), + Value::Union(1, Box::new(Value::Bytes(b"world".to_vec()))), + ])), + ), + ), + ]); + + let r2 = Value::Record(vec![ + ( + "headers".to_string(), + Value::Union( + 1, + Box::new(Value::Array(vec![ + Value::Union( + 1, + Box::new(Value::Record(vec![ + ( + "name".to_string(), + Value::Union( + 1, + Box::new(Value::String("x".to_string())), + ), + ), + ( + "value".to_string(), + Value::Union( + 1, + Box::new(Value::String("y".to_string())), + ), + ), + ])), + ), + Value::Union( + 1, + Box::new(Value::Record(vec![ + ( + "name".to_string(), + Value::Union( + 1, + Box::new(Value::String("z".to_string())), + ), + ), + ( + "value".to_string(), + Value::Union( + 1, + Box::new(Value::String("w".to_string())), + ), + ), + ])), + ), + ])), + ), + ), + ( + "byte_list".to_string(), + Value::Union(0, Box::new(Value::Null)), + ), + ]); + + let r3 = Value::Record(vec![ + ( + "headers".to_string(), + Value::Union(0, Box::new(Value::Null)), + ), + ( + "byte_list".to_string(), + Value::Union( + 1, + Box::new(Value::Array(vec![Value::Union( + 1, + Box::new(Value::Bytes(b"foo".to_vec())), + )])), + ), + ), + ]); let mut w = apache_avro::Writer::new(&schema, vec![]); w.append(r1).unwrap(); + w.append(r2).unwrap(); + w.append(r3).unwrap(); let bytes = w.into_inner().unwrap(); let mut reader = ReaderBuilder::new() .read_schema() - .with_batch_size(2) + .with_batch_size(3) .build(std::io::Cursor::new(bytes)) .unwrap(); let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_rows(), 1); - assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + let expected = [ - "+-----------------------+", - "| headers |", - "+-----------------------+", - "| [{name: a, value: b}] |", - "+-----------------------+", + "+--------------------------------------------+--------------------------+", + "| headers | byte_list |", + "+--------------------------------------------+--------------------------+", + "| [{name: a, value: b}] | [68656c6c6f, 776f726c64] |", + "| [{name: x, value: y}, {name: z, value: w}] | |", + "| | [666f6f] |", + "+--------------------------------------------+--------------------------+", ]; assert_batches_eq!(expected, &[batch]); } @@ -1804,4 +1966,174 @@ mod test { ]; assert_batches_eq!(expected, &[batch]); } + + #[test] + fn test_avro_fixed_and_list_of_fixed() { + let schema = apache_avro::Schema::parse_str( + r#" + { + "type": "record", + "name": "test_fixed", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "hash", + "type": [ + "null", + { + "type": "fixed", + "name": "md5", + "size": 16 + } + ], + "default": null + }, + { + "name": "hashes", + "type": [ + "null", + { + "type": "array", + "items": [ + "null", + { + "type": "fixed", + "name": "hash_value", + "size": 8 + } + ] + } + ], + "default": null + } + ] + }"#, + ) + .unwrap(); + + use apache_avro::types::Value; + + // Record with both fields populated + let r1 = Value::Record(vec![ + ("id".to_string(), Value::Int(1)), + ( + "hash".to_string(), + Value::Union( + 1, + Box::new(Value::Fixed( + 16, + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], + )), + ), + ), + ( + "hashes".to_string(), + Value::Union( + 1, + Box::new(Value::Array(vec![ + Value::Union( + 1, + Box::new(Value::Fixed(8, vec![1, 2, 3, 4, 5, 6, 7, 8])), + ), + Value::Union( + 1, + Box::new(Value::Fixed(8, vec![8, 7, 6, 5, 4, 3, 2, 1])), + ), + ])), + ), + ), + ]); + + // Record with null hash and populated list + let r2 = Value::Record(vec![ + ("id".to_string(), Value::Int(2)), + ("hash".to_string(), Value::Union(0, Box::new(Value::Null))), + ( + "hashes".to_string(), + Value::Union( + 1, + Box::new(Value::Array(vec![ + Value::Union(1, Box::new(Value::Fixed(8, vec![255; 8]))), + Value::Union(0, Box::new(Value::Null)), + ])), + ), + ), + ]); + + // Record with populated hash and null list + let r3 = Value::Record(vec![ + ("id".to_string(), Value::Int(3)), + ( + "hash".to_string(), + Value::Union(1, Box::new(Value::Fixed(16, vec![0; 16]))), + ), + ("hashes".to_string(), Value::Union(0, Box::new(Value::Null))), + ]); + + // Record with both fields null + let r4 = Value::Record(vec![ + ("id".to_string(), Value::Int(4)), + ("hash".to_string(), Value::Union(0, Box::new(Value::Null))), + ("hashes".to_string(), Value::Union(0, Box::new(Value::Null))), + ]); + + // Record with list containing all nulls + let r5 = Value::Record(vec![ + ("id".to_string(), Value::Int(5)), + ( + "hash".to_string(), + Value::Union( + 1, + Box::new(Value::Fixed( + 16, + vec![170, 187, 204, 221, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], + )), + ), + ), + ( + "hashes".to_string(), + Value::Union( + 1, + Box::new(Value::Array(vec![ + Value::Union(0, Box::new(Value::Null)), + Value::Union(0, Box::new(Value::Null)), + ])), + ), + ), + ]); + + let mut w = apache_avro::Writer::new(&schema, vec![]); + w.append(r1).unwrap(); + w.append(r2).unwrap(); + w.append(r3).unwrap(); + w.append(r4).unwrap(); + w.append(r5).unwrap(); + let bytes = w.into_inner().unwrap(); + + let mut reader = ReaderBuilder::new() + .read_schema() + .with_batch_size(10) + .build(std::io::Cursor::new(bytes)) + .unwrap(); + + let batch = reader.next().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 5); + assert_eq!(batch.num_columns(), 3); + + let expected = [ + "+----+----------------------------------+--------------------------------------+", + "| id | hash | hashes |", + "+----+----------------------------------+--------------------------------------+", + "| 1 | 0102030405060708090a0b0c0d0e0f10 | [0102030405060708, 0807060504030201] |", + "| 2 | | [ffffffffffffffff, ] |", + "| 3 | 00000000000000000000000000000000 | |", + "| 4 | | |", + "| 5 | aabbccdd0102030405060708090a0b0c | [, ] |", + "+----+----------------------------------+--------------------------------------+" + ]; + assert_batches_eq!(expected, &[batch]); + } }