diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 93a13a3b..45881168 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,7 +44,7 @@ jobs: - name: Install cargo-deny uses: taiki-e/install-action@v2 with: - tool: cargo-deny@0.14.22 + tool: cargo-deny@0.19.0 - name: Check dependency licenses (Apache-compatible) run: cargo deny check licenses diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 4bd16ad7..6643e3ba 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -56,7 +56,7 @@ snafu = "0.9.0" typed-builder = "^0.19" opendal = { version = "0.55", features = ["services-fs"] } pretty_assertions = "1" -serde_avro_fast = { version = "2.0.2", features = ["snappy", "zstandard"] } +apache-avro = { version = "0.21", features = ["snappy", "zstandard"] } indexmap = "2.5.0" roaring = "0.11" crc32fast = "1" diff --git a/crates/paimon/src/arrow/format/avro.rs b/crates/paimon/src/arrow/format/avro.rs index 08c9aba3..bbf32c2e 100644 --- a/crates/paimon/src/arrow/format/avro.rs +++ b/crates/paimon/src/arrow/format/avro.rs @@ -21,6 +21,8 @@ use crate::io::FileRead; use crate::spec::{DataField, DataType, MapType, RowType}; use crate::table::{ArrowRecordBatchStream, RowRange}; use crate::Error; +use apache_avro::types::Value; +use apache_avro::Reader; use arrow_array::{ BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, MapArray, RecordBatch, StringArray, @@ -31,165 +33,8 @@ use arrow_schema::SchemaRef; use async_stream::try_stream; use async_trait::async_trait; use futures::StreamExt; -use std::collections::HashMap; use std::sync::Arc; -// --------------------------------------------------------------------------- -// AvroValue: a serde_json::Value replacement that handles Avro bytes -// --------------------------------------------------------------------------- - -/// Lightweight value type that can represent all Avro primitives including bytes. -/// `serde_json::Value` rejects byte arrays, so we need our own. -#[derive(Debug, Clone, PartialEq)] -enum AvroValue { - Null, - Bool(bool), - Int(i64), - Float(f64), - String(String), - Bytes(Vec), - /// Avro array / sequence. - Array(Vec), - /// Union wrapper or record: `{"type": value}` produced by serde_avro_fast. - Object(HashMap), -} - -impl AvroValue { - fn as_bool(&self) -> Option { - match self { - AvroValue::Bool(b) => Some(*b), - _ => None, - } - } - - fn as_i64(&self) -> Option { - match self { - AvroValue::Int(n) => Some(*n), - _ => None, - } - } - - fn as_f64(&self) -> Option { - match self { - AvroValue::Float(f) => Some(*f), - AvroValue::Int(n) => Some(*n as f64), - _ => None, - } - } - - fn as_str(&self) -> Option<&str> { - match self { - AvroValue::String(s) => Some(s), - _ => None, - } - } - - fn as_bytes(&self) -> Option<&[u8]> { - match self { - AvroValue::Bytes(b) => Some(b), - AvroValue::String(s) => Some(s.as_bytes()), - _ => None, - } - } -} - -impl<'de> serde::Deserialize<'de> for AvroValue { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct AvroValueVisitor; - - impl<'de> serde::de::Visitor<'de> for AvroValueVisitor { - type Value = AvroValue; - - fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.write_str("any Avro value") - } - - fn visit_bool(self, v: bool) -> Result { - Ok(AvroValue::Bool(v)) - } - - fn visit_i8(self, v: i8) -> Result { - Ok(AvroValue::Int(v as i64)) - } - - fn visit_i16(self, v: i16) -> Result { - Ok(AvroValue::Int(v as i64)) - } - - fn visit_i32(self, v: i32) -> Result { - Ok(AvroValue::Int(v as i64)) - } - - fn visit_i64(self, v: i64) -> Result { - Ok(AvroValue::Int(v)) - } - - fn visit_u64(self, v: u64) -> Result { - Ok(AvroValue::Int(v as i64)) - } - - fn visit_f32(self, v: f32) -> Result { - Ok(AvroValue::Float(v as f64)) - } - - fn visit_f64(self, v: f64) -> Result { - Ok(AvroValue::Float(v)) - } - - fn visit_str(self, v: &str) -> Result { - Ok(AvroValue::String(v.to_owned())) - } - - fn visit_string(self, v: String) -> Result { - Ok(AvroValue::String(v)) - } - - fn visit_bytes(self, v: &[u8]) -> Result { - Ok(AvroValue::Bytes(v.to_vec())) - } - - fn visit_byte_buf(self, v: Vec) -> Result { - Ok(AvroValue::Bytes(v)) - } - - fn visit_none(self) -> Result { - Ok(AvroValue::Null) - } - - fn visit_unit(self) -> Result { - Ok(AvroValue::Null) - } - - fn visit_map(self, mut map: A) -> Result - where - A: serde::de::MapAccess<'de>, - { - let mut m = HashMap::new(); - while let Some((k, v)) = map.next_entry::()? { - m.insert(k, v); - } - Ok(AvroValue::Object(m)) - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: serde::de::SeqAccess<'de>, - { - let mut v = Vec::new(); - while let Some(elem) = seq.next_element::()? { - v.push(elem); - } - Ok(AvroValue::Array(v)) - } - } - - deserializer.deserialize_any(AvroValueVisitor) - } -} - pub(crate) struct AvroFormatReader; const DEFAULT_BATCH_SIZE: usize = 8192; @@ -213,25 +58,20 @@ impl FormatFileReader for AvroFormatReader { let target_schema = build_target_arrow_schema(&read_fields)?; let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); - // Deserialize all Avro records from the OCF file. - let mut reader = - serde_avro_fast::object_container_file_encoding::Reader::from_slice(&file_bytes) - .map_err(|e| Error::UnexpectedError { - message: format!("Failed to open Avro file: {e}"), - source: Some(Box::new(e)), - })?; - - let mut all_records: Vec> = Vec::new(); - for result in reader.deserialize_borrowed::>() { - let record = result.map_err(|e| Error::UnexpectedError { + // Collect Avro records directly as apache_avro::Value, avoiding intermediate conversion. + let all_records: Vec = Reader::new(&file_bytes[..]) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to open Avro file: {e}"), + source: Some(Box::new(e)), + })? + .collect::, _>>() + .map_err(|e| Error::UnexpectedError { message: format!("Failed to deserialize Avro record: {e}"), source: Some(Box::new(e)), })?; - all_records.push(record); - } // Apply row selection filtering. - let records: Vec> = match row_selection { + let records: Vec = match row_selection { Some(ref ranges) => { let total_rows = all_records.len(); let mask = ranges_to_mask(total_rows, ranges); @@ -255,6 +95,88 @@ impl FormatFileReader for AvroFormatReader { } } +// --------------------------------------------------------------------------- +// Value access helpers — work directly with apache_avro::types::Value +// --------------------------------------------------------------------------- + +/// Find the position of a named field from the first record. +/// All records in an Avro file share the same schema, so the index is valid +/// for every record. +fn field_index(records: &[Value], name: &str) -> Option { + match records.first() { + Some(Value::Record(fields)) => fields.iter().position(|(n, _)| n == name), + _ => None, + } +} + +/// Look up a field by cached index, unwrapping unions. +/// If `idx` is `None` (field not in schema or empty records), returns `None` directly +/// without linear search, since all records share the same schema. +fn get_field_at(record: &Value, idx: Option) -> Option<&Value> { + match (record, idx) { + (Value::Record(fields), Some(i)) => fields.get(i).map(|(_, v)| v).and_then(unwrap_value), + _ => None, + } +} + +/// Unwrap Avro union/null values, returning `None` for null. +fn unwrap_value(v: &Value) -> Option<&Value> { + match v { + Value::Null => None, + Value::Union(_, inner) => unwrap_value(inner), + other => Some(other), + } +} + +fn value_as_bool(v: &Value) -> Option { + match v { + Value::Boolean(b) => Some(*b), + _ => None, + } +} + +fn value_as_i64(v: &Value) -> Option { + match v { + Value::Int(n) => Some(i64::from(*n)), + Value::Long(n) => Some(*n), + Value::Date(n) | Value::TimeMillis(n) => Some(i64::from(*n)), + Value::TimeMicros(n) + | Value::TimestampMillis(n) + | Value::TimestampMicros(n) + | Value::TimestampNanos(n) + | Value::LocalTimestampMillis(n) + | Value::LocalTimestampMicros(n) + | Value::LocalTimestampNanos(n) => Some(*n), + _ => None, + } +} + +fn value_as_f64(v: &Value) -> Option { + match v { + Value::Float(f) => Some(f64::from(*f)), + Value::Double(f) => Some(*f), + Value::Int(n) => Some(f64::from(*n)), + Value::Long(n) => Some(*n as f64), + _ => None, + } +} + +fn value_as_str(v: &Value) -> Option<&str> { + match v { + Value::String(s) => Some(s), + Value::Enum(_, s) => Some(s), + _ => None, + } +} + +fn value_as_bytes(v: &Value) -> Option<&[u8]> { + match v { + Value::Bytes(b) | Value::Fixed(_, b) => Some(b), + Value::String(s) => Some(s.as_bytes()), + _ => None, + } +} + // --------------------------------------------------------------------------- // Row ranges → boolean mask // --------------------------------------------------------------------------- @@ -280,7 +202,7 @@ fn ranges_to_mask(total_rows: usize, ranges: &[RowRange]) -> Vec { // --------------------------------------------------------------------------- fn records_to_batch( - records: &[HashMap], + records: &[Value], fields: &[DataField], schema: &SchemaRef, ) -> crate::Result { @@ -308,23 +230,26 @@ fn records_to_batch( } fn build_column( - records: &[HashMap], + records: &[Value], name: &str, data_type: &DataType, num_rows: usize, ) -> crate::Result> { + // Pre-compute field position once; O(1) per-row access thereafter. + let idx = field_index(records, name); + Ok(match data_type { DataType::Boolean(_) => { let arr: BooleanArray = (0..num_rows) - .map(|i| get_field(&records[i], name).and_then(|v| v.as_bool())) + .map(|i| get_field_at(&records[i], idx).and_then(value_as_bool)) .collect(); Arc::new(arr) } DataType::TinyInt(_) => { let arr: Int8Array = (0..num_rows) .map(|i| { - get_field(&records[i], name) - .and_then(|v| v.as_i64()) + get_field_at(&records[i], idx) + .and_then(value_as_i64) .map(|v| v as i8) }) .collect(); @@ -333,8 +258,8 @@ fn build_column( DataType::SmallInt(_) => { let arr: Int16Array = (0..num_rows) .map(|i| { - get_field(&records[i], name) - .and_then(|v| v.as_i64()) + get_field_at(&records[i], idx) + .and_then(value_as_i64) .map(|v| v as i16) }) .collect(); @@ -343,8 +268,8 @@ fn build_column( DataType::Int(_) => { let arr: Int32Array = (0..num_rows) .map(|i| { - get_field(&records[i], name) - .and_then(|v| v.as_i64()) + get_field_at(&records[i], idx) + .and_then(value_as_i64) .map(|v| v as i32) }) .collect(); @@ -352,15 +277,15 @@ fn build_column( } DataType::BigInt(_) => { let arr: Int64Array = (0..num_rows) - .map(|i| get_field(&records[i], name).and_then(|v| v.as_i64())) + .map(|i| get_field_at(&records[i], idx).and_then(value_as_i64)) .collect(); Arc::new(arr) } DataType::Float(_) => { let arr: Float32Array = (0..num_rows) .map(|i| { - get_field(&records[i], name) - .and_then(|v| v.as_f64()) + get_field_at(&records[i], idx) + .and_then(value_as_f64) .map(|v| v as f32) }) .collect(); @@ -368,19 +293,19 @@ fn build_column( } DataType::Double(_) => { let arr: Float64Array = (0..num_rows) - .map(|i| get_field(&records[i], name).and_then(|v| v.as_f64())) + .map(|i| get_field_at(&records[i], idx).and_then(value_as_f64)) .collect(); Arc::new(arr) } DataType::Char(_) | DataType::VarChar(_) => { let arr: StringArray = (0..num_rows) - .map(|i| get_field(&records[i], name).and_then(|v| v.as_str())) + .map(|i| get_field_at(&records[i], idx).and_then(value_as_str)) .collect(); Arc::new(arr) } DataType::Binary(_) | DataType::VarBinary(_) => { let values: Vec> = (0..num_rows) - .map(|i| get_field(&records[i], name).and_then(|v| v.as_bytes())) + .map(|i| get_field_at(&records[i], idx).and_then(value_as_bytes)) .collect(); let arr: BinaryArray = values.into_iter().collect(); Arc::new(arr) @@ -388,8 +313,8 @@ fn build_column( DataType::Date(_) => { let arr: Date32Array = (0..num_rows) .map(|i| { - get_field(&records[i], name) - .and_then(|v| v.as_i64()) + get_field_at(&records[i], idx) + .and_then(value_as_i64) .map(|v| v as i32) }) .collect(); @@ -404,12 +329,12 @@ fn build_column( })?; let arr: Decimal128Array = (0..num_rows) .map(|i| { - get_field(&records[i], name).and_then(|v| match v { - // Avro decimal is encoded as big-endian two's complement bytes. - AvroValue::Bytes(b) => Some(bytes_to_i128_be(b)), - AvroValue::Int(n) => Some(*n as i128), - // serde_avro_fast may deserialize decimal as string representation. - AvroValue::String(s) => parse_decimal_string(s, scale), + get_field_at(&records[i], idx).and_then(|v| match v { + Value::Bytes(b) | Value::Fixed(_, b) => Some(bytes_to_i128_be(b)), + Value::Decimal(d) => Vec::::try_from(d.clone()) + .ok() + .map(|b| bytes_to_i128_be(&b)), + Value::BigDecimal(bd) => parse_decimal_string(&bd.to_string(), scale), _ => None, }) }) @@ -445,14 +370,15 @@ fn build_column( } fn build_timestamp_column( - records: &[HashMap], + records: &[Value], name: &str, num_rows: usize, precision: u32, tz: Option>, ) -> Arc { + let idx = field_index(records, name); let values: Vec> = (0..num_rows) - .map(|i| get_field(&records[i], name).and_then(|v| v.as_i64())) + .map(|i| get_field_at(&records[i], idx).and_then(value_as_i64)) .collect(); match precision { 0..=3 => Arc::new(TimestampMillisecondArray::from(values).with_timezone_opt(tz)), @@ -462,7 +388,7 @@ fn build_timestamp_column( } fn build_array_column( - records: &[HashMap], + records: &[Value], name: &str, element_type: &DataType, num_rows: usize, @@ -471,16 +397,16 @@ fn build_array_column( let arrow_element_field = arrow_schema::Field::new("element", arrow_element_type, element_type.is_nullable()); + let idx = field_index(records, name); let mut offsets = vec![0i32]; - let mut element_records: Vec> = Vec::new(); + let mut element_records: Vec = Vec::new(); for record in records.iter().take(num_rows) { - match get_field(record, name) { - Some(AvroValue::Array(arr)) => { + match get_field_at(record, idx) { + Some(Value::Array(arr)) => { for elem in arr { - let mut m = HashMap::new(); - m.insert("element".to_string(), elem.clone()); - element_records.push(m); + element_records + .push(Value::Record(vec![("element".to_string(), elem.clone())])); } offsets.push(offsets.last().unwrap() + arr.len() as i32); } @@ -500,7 +426,7 @@ fn build_array_column( let offsets_buf = OffsetBuffer::new(ScalarBuffer::from(offsets)); let nulls = NullBuffer::new(BooleanBuffer::from( (0..num_rows) - .map(|i| get_field(&records[i], name).is_some()) + .map(|i| get_field_at(&records[i], idx).is_some()) .collect::>(), )); @@ -518,7 +444,7 @@ fn build_array_column( } fn build_map_column( - records: &[HashMap], + records: &[Value], name: &str, map_type: &MapType, num_rows: usize, @@ -526,20 +452,20 @@ fn build_map_column( let arrow_key_type = crate::arrow::paimon_type_to_arrow(map_type.key_type())?; let arrow_value_type = crate::arrow::paimon_type_to_arrow(map_type.value_type())?; + let idx = field_index(records, name); let mut offsets = vec![0i32]; - let mut key_records: Vec> = Vec::new(); - let mut value_records: Vec> = Vec::new(); + let mut key_records: Vec = Vec::new(); + let mut value_records: Vec = Vec::new(); for record in records.iter().take(num_rows) { - match get_field_raw(record, name) { - Some(AvroValue::Object(map)) => { + match get_field_at(record, idx) { + Some(Value::Map(map)) => { for (k, v) in map { - let mut km = HashMap::new(); - km.insert("key".to_string(), AvroValue::String(k.clone())); - key_records.push(km); - let mut vm = HashMap::new(); - vm.insert("value".to_string(), v.clone()); - value_records.push(vm); + key_records.push(Value::Record(vec![( + "key".to_string(), + Value::String(k.clone()), + )])); + value_records.push(Value::Record(vec![("value".to_string(), v.clone())])); } offsets.push(offsets.last().unwrap() + map.len() as i32); } @@ -584,7 +510,7 @@ fn build_map_column( let offsets_buf = OffsetBuffer::new(ScalarBuffer::from(offsets)); let nulls = NullBuffer::new(BooleanBuffer::from( (0..num_rows) - .map(|i| get_field_raw(&records[i], name).is_some()) + .map(|i| get_field_at(&records[i], idx).is_some()) .collect::>(), )); @@ -603,15 +529,16 @@ fn build_map_column( } fn build_row_column( - records: &[HashMap], + records: &[Value], name: &str, row_type: &RowType, num_rows: usize, ) -> crate::Result> { - let sub_records: Vec> = (0..num_rows) - .map(|i| match get_field_raw(&records[i], name) { - Some(AvroValue::Object(obj)) => obj.clone(), - _ => HashMap::new(), + let idx = field_index(records, name); + let sub_records: Vec = (0..num_rows) + .map(|i| match get_field_at(&records[i], idx) { + Some(v @ Value::Record(_)) => v.clone(), + _ => Value::Record(vec![]), }) .collect(); @@ -631,7 +558,7 @@ fn build_row_column( let nulls = NullBuffer::new(BooleanBuffer::from( (0..num_rows) - .map(|i| get_field_raw(&records[i], name).is_some()) + .map(|i| get_field_at(&records[i], idx).is_some()) .collect::>(), )); @@ -653,7 +580,7 @@ fn parse_decimal_string(s: &str, scale: i8) -> Option { None => (s, ""), }; let frac_len = frac_part.len() as i8; - let combined = format!("{}{}", integer_part, frac_part); + let combined = format!("{integer_part}{frac_part}"); let unscaled: i128 = combined.parse().ok()?; // Adjust if the fractional digits differ from the target scale. let result = if frac_len < scale { @@ -679,36 +606,6 @@ fn bytes_to_i128_be(bytes: &[u8]) -> i128 { i128::from_be_bytes(buf) } -/// Look up a field in an Avro record, unwrapping union encoding. -fn get_field<'a>(record: &'a HashMap, name: &str) -> Option<&'a AvroValue> { - record.get(name).and_then(unwrap_avro_union) -} - -/// Look up a field without union unwrapping — for Map/Row types whose Object -/// shape overlaps with union wrappers. -fn get_field_raw<'a>(record: &'a HashMap, name: &str) -> Option<&'a AvroValue> { - record.get(name).and_then(|v| match v { - AvroValue::Null => None, - other => Some(other), - }) -} - -/// Unwrap Avro union encoding: `{"type": value}` → `value`, or `"null"` → `None`. -fn unwrap_avro_union(v: &AvroValue) -> Option<&AvroValue> { - match v { - AvroValue::Null => None, - AvroValue::Object(map) if map.len() == 1 => { - let (key, inner) = map.iter().next().unwrap(); - if key == "null" { - None - } else { - Some(inner) - } - } - other => Some(other), - } -} - #[cfg(test)] mod tests { use super::*; @@ -718,78 +615,95 @@ mod tests { }; use arrow_array::Array; - // Helper to build AvroValue variants concisely in tests. - fn av_int(v: i64) -> AvroValue { - AvroValue::Int(v) + // Helper to build Value variants concisely in tests. + fn av_int(v: i64) -> Value { + Value::Long(v) } - fn av_float(v: f64) -> AvroValue { - AvroValue::Float(v) + fn av_float(v: f64) -> Value { + Value::Double(v) } - fn av_bool(v: bool) -> AvroValue { - AvroValue::Bool(v) + fn av_bool(v: bool) -> Value { + Value::Boolean(v) } - fn av_str(v: &str) -> AvroValue { - AvroValue::String(v.to_string()) + fn av_str(v: &str) -> Value { + Value::String(v.to_string()) } - fn av_bytes(v: &[u8]) -> AvroValue { - AvroValue::Bytes(v.to_vec()) + fn av_bytes(v: &[u8]) -> Value { + Value::Bytes(v.to_vec()) } - fn av_null() -> AvroValue { - AvroValue::Null + fn av_null() -> Value { + Value::Null } - fn av_union(key: &str, val: AvroValue) -> AvroValue { - AvroValue::Object(HashMap::from([(key.to_string(), val)])) + fn av_union(val: Value) -> Value { + Value::Union(1, Box::new(val)) } // ----------------------------------------------------------------------- - // unwrap_avro_union + // unwrap_value // ----------------------------------------------------------------------- #[test] - fn test_unwrap_avro_union_null() { - assert!(unwrap_avro_union(&av_null()).is_none()); + fn test_unwrap_value_null() { + assert!(unwrap_value(&av_null()).is_none()); } #[test] - fn test_unwrap_avro_union_plain_value() { + fn test_unwrap_value_plain() { let v = av_int(42); - assert_eq!(unwrap_avro_union(&v), Some(&av_int(42))); + assert_eq!(unwrap_value(&v), Some(&Value::Long(42))); } #[test] - fn test_unwrap_avro_union_wrapped_value() { - let v = av_union("int", av_int(42)); - assert_eq!(unwrap_avro_union(&v), Some(&av_int(42))); + fn test_unwrap_value_union() { + let v = av_union(av_int(42)); + assert_eq!(unwrap_value(&v), Some(&Value::Long(42))); } #[test] - fn test_unwrap_avro_union_null_key() { - let v = av_union("null", av_null()); - assert!(unwrap_avro_union(&v).is_none()); + fn test_unwrap_value_union_null() { + let v = Value::Union(0, Box::new(Value::Null)); + assert!(unwrap_value(&v).is_none()); } // ----------------------------------------------------------------------- - // get_field + // get_field_at // ----------------------------------------------------------------------- + fn make_record(fields: Vec<(&str, Value)>) -> Value { + Value::Record( + fields + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(), + ) + } + #[test] - fn test_get_field_present() { - let mut record = HashMap::new(); - record.insert("name".to_string(), av_str("alice")); - assert_eq!(get_field(&record, "name"), Some(&av_str("alice"))); + fn test_get_field_at_present() { + let record = make_record(vec![("name", av_str("alice"))]); + assert_eq!( + get_field_at(&record, Some(0)), + Some(&Value::String("alice".to_string())) + ); } #[test] - fn test_get_field_missing() { - let record: HashMap = HashMap::new(); - assert!(get_field(&record, "name").is_none()); + fn test_get_field_at_missing() { + let record = make_record(vec![]); + assert!(get_field_at(&record, None).is_none()); } #[test] - fn test_get_field_union_wrapped() { - let mut record = HashMap::new(); - record.insert("age".to_string(), av_union("int", av_int(30))); - assert_eq!(get_field(&record, "age"), Some(&av_int(30))); + fn test_get_field_at_union_wrapped() { + let record = make_record(vec![("age", av_union(av_int(30)))]); + assert_eq!(get_field_at(&record, Some(0)), Some(&Value::Long(30))); + } + + #[test] + fn test_get_field_at_no_index() { + let record = make_record(vec![("x", av_int(1)), ("y", av_int(2))]); + // idx is None — returns None directly without linear search. + assert!(get_field_at(&record, None).is_none()); } // ----------------------------------------------------------------------- @@ -821,13 +735,15 @@ mod tests { // build_column + records_to_batch // ----------------------------------------------------------------------- - fn make_records(rows: Vec>) -> Vec> { + fn make_records(rows: Vec>) -> Vec { rows.into_iter() .map(|fields| { - fields - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect() + Value::Record( + fields + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(), + ) }) .collect() } @@ -966,6 +882,35 @@ mod tests { assert_eq!(bytes_to_i128_be(&[0x00, 0x01]), 1); } + #[test] + fn test_build_column_map_with_union() { + use std::collections::HashMap; + // Map field wrapped in a union (nullable map), as Avro encodes nullable types. + let mut map1 = HashMap::new(); + map1.insert("k1".to_string(), av_int(10)); + map1.insert("k2".to_string(), av_int(20)); + let records = vec![ + Value::Record(vec![( + "m".to_string(), + Value::Union(1, Box::new(Value::Map(map1))), + )]), + Value::Record(vec![( + "m".to_string(), + Value::Union(0, Box::new(av_null())), + )]), + ]; + let map_type = MapType::new( + DataType::VarChar(VarCharType::new(100).unwrap()), + DataType::Int(IntType::new()), + ); + let col = build_map_column(&records, "m", &map_type, 2).unwrap(); + let arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(arr.len(), 2); + // First row has 2 entries, second row is null. + assert!(!arr.is_null(0)); + assert!(arr.is_null(1)); + } + #[test] fn test_records_to_batch_basic() { let fields = vec![ @@ -994,7 +939,7 @@ mod tests { DataType::Int(IntType::new()), )]; let schema = crate::arrow::build_target_arrow_schema(&fields).unwrap(); - let records: Vec> = vec![]; + let records: Vec = vec![]; let batch = records_to_batch(&records, &fields, &schema).unwrap(); assert_eq!(batch.num_rows(), 0); } diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index 65402eb4..56a4d3c3 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -68,6 +68,14 @@ pub enum Error { display("Paimon hitting invalid config: {}", message) )] ConfigInvalid { message: String }, + #[snafu( + visibility(pub(crate)), + display("Paimon hitting unexpected avro error {}: {:?}", message, source) + )] + DataUnexpected { + message: String, + source: Box, + }, #[snafu( visibility(pub(crate)), display("Paimon hitting invalid file index format: {}", message) @@ -119,6 +127,15 @@ impl From for Error { } } +impl From for Error { + fn from(source: apache_avro::Error) -> Self { + Error::DataUnexpected { + message: "Failed to process Avro data".to_string(), + source: Box::new(source), + } + } +} + impl From for Error { fn from(source: parquet::errors::ParquetError) -> Self { Error::ParquetDataUnexpected { diff --git a/crates/paimon/src/spec/index_manifest.rs b/crates/paimon/src/spec/index_manifest.rs index 64948a6f..8ca5b865 100644 --- a/crates/paimon/src/spec/index_manifest.rs +++ b/crates/paimon/src/spec/index_manifest.rs @@ -19,8 +19,6 @@ use crate::io::FileIO; use crate::spec::manifest_common::FileKind; use crate::spec::IndexFileMeta; use serde::{Deserialize, Serialize}; -use serde_avro_fast::object_container_file_encoding::Reader; -use snafu::ResultExt; use std::fmt::{Display, Formatter}; use crate::Result; @@ -129,12 +127,7 @@ impl IndexManifest { /// Read index manifest entries from Avro-encoded bytes. pub fn read_from_bytes(bytes: &[u8]) -> Result> { - let mut reader = Reader::from_slice(bytes) - .whatever_context::<_, crate::Error>("read index manifest avro")?; - reader - .deserialize::() - .collect::, _>>() - .whatever_context::<_, crate::Error>("deserialize index manifest entry") + crate::spec::from_avro_bytes(bytes) } /// Write index manifest entries to a file. @@ -147,6 +140,7 @@ impl IndexManifest { #[cfg(test)] mod tests { + use apache_avro::{from_avro_datum, from_value, to_avro_datum, to_value, types::Value, Schema}; use indexmap::IndexMap; use super::*; @@ -210,7 +204,7 @@ mod tests { }, }; - let schema: serde_avro_fast::Schema = r#"["null", { + let schema = Schema::parse_str(r#"["null", { "type": "record", "name": "org.apache.paimon.avro.generated.record", "fields": [ @@ -241,12 +235,16 @@ mod tests { } ] }]"# - .parse().unwrap(); - - let serializer_config = &mut serde_avro_fast::ser::SerializerConfig::new(&schema); - let encoded = serde_avro_fast::to_single_object_vec(&sample, serializer_config).unwrap(); - let decoded: IndexManifestEntry = - serde_avro_fast::from_single_object_slice(encoded.as_slice(), &schema).unwrap(); + ) + .unwrap(); + + let value = to_value(&sample).unwrap().resolve(&schema).unwrap(); + let encoded = to_avro_datum(&schema, value).unwrap(); + let decoded_value = from_avro_datum(&schema, &mut encoded.as_slice(), None).unwrap(); + let decoded: IndexManifestEntry = match decoded_value { + Value::Union(_, inner) => from_value(inner.as_ref()).unwrap(), + other => from_value(&other).unwrap(), + }; assert_eq!(sample, decoded); } } diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index 84f36291..193359e5 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -18,8 +18,6 @@ use crate::io::FileIO; use crate::spec::manifest_entry::ManifestEntry; use crate::spec::manifest_entry::MANIFEST_ENTRY_SCHEMA; -use serde_avro_fast::object_container_file_encoding::Reader; -use snafu::ResultExt; use crate::Result; @@ -46,12 +44,7 @@ impl Manifest { /// Read manifest entries from bytes. fn read_from_bytes(bytes: &[u8]) -> Result> { - let mut reader = - Reader::from_slice(bytes).whatever_context::<_, crate::Error>("read manifest avro")?; - reader - .deserialize::() - .collect::, _>>() - .whatever_context::<_, crate::Error>("deserialize manifest entry") + crate::spec::from_avro_bytes(bytes) } /// Write manifest entries to a file. diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs index 864ea002..887dfeef 100644 --- a/crates/paimon/src/spec/objects_file.rs +++ b/crates/paimon/src/spec/objects_file.rs @@ -15,18 +15,17 @@ // specific language governing permissions and limitations // under the License. +use apache_avro::{from_value, to_value, Codec, Reader, Schema, Writer}; use serde::de::DeserializeOwned; use serde::Serialize; -use serde_avro_fast::object_container_file_encoding::{Compression, Reader}; -use snafu::ResultExt; pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { - let mut reader = Reader::from_slice(bytes) - .whatever_context::<_, crate::Error>("read avro object container")?; - reader - .deserialize::() - .collect::, _>>() - .whatever_context::<_, crate::Error>("deserialize avro records") + Reader::new(bytes)? + .map(|r| { + let value = r?; + from_value::(&value).map_err(crate::Error::from) + }) + .collect() } /// Serialize records into Avro Object Container File bytes. @@ -34,25 +33,13 @@ pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result(schema_json: &str, records: &[T]) -> crate::Result> { - let schema: serde_avro_fast::Schema = - schema_json - .parse() - .map_err( - |e: serde_avro_fast::schema::SchemaError| crate::Error::DataInvalid { - message: format!("invalid avro schema: {e}"), - source: Some(Box::new(e)), - }, - )?; - serde_avro_fast::object_container_file_encoding::write_all( - &schema, - Compression::Null, - Vec::new(), - records.iter(), - ) - .map_err(|e| crate::Error::DataInvalid { - message: format!("avro serialization failed: {e}"), - source: Some(Box::new(e)), - }) + let schema = Schema::parse_str(schema_json)?; + let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null); + for record in records { + let value = to_value(record).and_then(|v| v.resolve(&schema))?; + writer.append(value)?; + } + Ok(writer.into_inner()?) } #[cfg(test)]