Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 96 additions & 51 deletions avro/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::schema::{InnerDecimalSchema, UuidSchema};
use crate::{
AvroResult, Error,
bigdecimal::deserialize_big_decimal,
Expand Down Expand Up @@ -82,7 +83,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
enclosing_namespace: &Namespace,
reader: &mut R,
) -> AvroResult<Value> {
match *schema {
match schema {
Schema::Null => Ok(Value::Null),
Schema::Boolean => {
let mut buf = [0u8; 1];
Expand All @@ -101,39 +102,70 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
}
}
}
Schema::Decimal(DecimalSchema { ref inner, .. }) => match &**inner {
Schema::Fixed { .. } => {
match decode_internal(inner, names, enclosing_namespace, reader)? {
Schema::Decimal(DecimalSchema { inner, .. }) => match inner {
InnerDecimalSchema::Fixed(fixed) => {
match decode_internal(
&Schema::Fixed(fixed.copy_only_size()),
names,
enclosing_namespace,
reader,
)? {
Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
value => Err(Details::FixedValue(value).into()),
}
}
Schema::Bytes => match decode_internal(inner, names, enclosing_namespace, reader)? {
Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
value => Err(Details::BytesValue(value).into()),
},
schema => Err(Details::ResolveDecimalSchema(schema.into()).into()),
InnerDecimalSchema::Bytes => {
match decode_internal(&Schema::Bytes, names, enclosing_namespace, reader)? {
Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
value => Err(Details::BytesValue(value).into()),
}
}
},
Schema::BigDecimal => {
match decode_internal(&Schema::Bytes, names, enclosing_namespace, reader)? {
Value::Bytes(bytes) => deserialize_big_decimal(&bytes).map(Value::BigDecimal),
value => Err(Details::BytesValue(value).into()),
}
}
Schema::Uuid => {
Schema::Uuid(UuidSchema::String) => {
let Value::String(string) =
decode_internal(&Schema::String, names, enclosing_namespace, reader)?
else {
// decoding a String can also return a Null, indicating EOF
return Err(Error::new(Details::ReadBytes(std::io::Error::from(
ErrorKind::UnexpectedEof,
))));
};
let uuid = Uuid::parse_str(&string).map_err(Details::ConvertStrToUuid)?;
Ok(Value::Uuid(uuid))
}
Schema::Uuid(UuidSchema::Bytes) => {
let Value::Bytes(bytes) =
decode_internal(&Schema::Bytes, names, enclosing_namespace, reader)?
else {
// Calling decode_internal with Schema::Bytes can only return a Value::Bytes or an error
unreachable!();
unreachable!(
"decode_internal(Schema::Bytes) can only return a Value::Bytes or an error"
)
};

let uuid = if bytes.len() == 16 {
Uuid::from_slice(&bytes).map_err(Details::ConvertSliceToUuid)?
} else {
let string = std::str::from_utf8(&bytes).map_err(Details::ConvertToUtf8Error)?;
Uuid::parse_str(string).map_err(Details::ConvertStrToUuid)?
let uuid = Uuid::from_slice(&bytes).map_err(Details::ConvertSliceToUuid)?;
Ok(Value::Uuid(uuid))
}
Schema::Uuid(UuidSchema::Fixed(fixed)) => {
let Value::Fixed(n, bytes) = decode_internal(
&Schema::Fixed(fixed.copy_only_size()),
names,
enclosing_namespace,
reader,
)?
else {
unreachable!(
"decode_internal(Schema::Fixed) can only return a Value::Fixed or an error"
)
};
if n != 16 {
return Err(Details::ConvertFixedToUuid(n).into());
}
let uuid = Uuid::from_slice(&bytes).map_err(Details::ConvertSliceToUuid)?;
Ok(Value::Uuid(uuid))
}
Schema::Int => decode_int(reader),
Expand Down Expand Up @@ -189,13 +221,13 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
}
}
Schema::Fixed(FixedSchema { size, .. }) => {
let mut buf = vec![0u8; size];
let mut buf = vec![0u8; *size];
reader
.read_exact(&mut buf)
.map_err(|e| Details::ReadFixed(e, size))?;
Ok(Value::Fixed(size, buf))
.map_err(|e| Details::ReadFixed(e, *size))?;
Ok(Value::Fixed(*size, buf))
}
Schema::Array(ref inner) => {
Schema::Array(inner) => {
let mut items = Vec::new();

loop {
Expand All @@ -217,7 +249,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(

Ok(Value::Array(items))
}
Schema::Map(ref inner) => {
Schema::Map(inner) => {
let mut items = HashMap::new();

loop {
Expand All @@ -241,7 +273,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(

Ok(Value::Map(items))
}
Schema::Union(ref inner) => match zag_i64(reader).map_err(Error::into_details) {
Schema::Union(inner) => match zag_i64(reader).map_err(Error::into_details) {
Ok(index) => {
let variants = inner.variants();
let variant = variants
Expand All @@ -262,11 +294,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
}
Err(io_err) => Err(Error::new(io_err)),
},
Schema::Record(RecordSchema {
ref name,
ref fields,
..
}) => {
Schema::Record(RecordSchema { name, fields, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
// Benchmarks indicate ~10% improvement using this method.
let mut items = Vec::with_capacity(fields.len());
Expand All @@ -284,7 +312,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
}
Ok(Value::Record(items))
}
Schema::Enum(EnumSchema { ref symbols, .. }) => {
Schema::Enum(EnumSchema { symbols, .. }) => {
Ok(if let Value::Int(raw_index) = decode_int(reader)? {
let index = usize::try_from(raw_index)
.map_err(|e| Details::ConvertI32ToUsize(e, raw_index))?;
Expand All @@ -302,7 +330,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
return Err(Details::GetEnumUnknownIndexValue.into());
})
}
Schema::Ref { ref name } => {
Schema::Ref { name } => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if let Some(resolved) = names.get(&fully_qualified_name) {
decode_internal(
Expand All @@ -321,6 +349,7 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
#[cfg(test)]
#[allow(clippy::expect_fun_call)]
mod tests {
use crate::schema::{InnerDecimalSchema, UuidSchema};
use crate::{
Decimal,
decode::decode,
Expand Down Expand Up @@ -380,14 +409,13 @@ mod tests {
fn test_negative_decimal_value() -> TestResult {
use crate::{encode::encode, schema::Name};
use num_bigint::ToBigInt;
let inner = Box::new(Schema::Fixed(
FixedSchema::builder()
.name(Name::new("decimal")?)
.size(2)
.build(),
));
let schema = Schema::Decimal(DecimalSchema {
inner,
inner: InnerDecimalSchema::Fixed(
FixedSchema::builder()
.name(Name::new("decimal")?)
.size(2)
.build(),
),
precision: 4,
scale: 2,
});
Expand All @@ -408,16 +436,15 @@ mod tests {
fn test_decode_decimal_with_bigger_than_necessary_size() -> TestResult {
use crate::{encode::encode, schema::Name};
use num_bigint::ToBigInt;
let inner = Box::new(Schema::Fixed(FixedSchema {
size: 13,
name: Name::new("decimal")?,
aliases: None,
doc: None,
default: None,
attributes: Default::default(),
}));
let schema = Schema::Decimal(DecimalSchema {
inner,
inner: InnerDecimalSchema::Fixed(FixedSchema {
size: 13,
name: Name::new("decimal")?,
aliases: None,
doc: None,
default: None,
attributes: Default::default(),
}),
precision: 4,
scale: 2,
});
Expand Down Expand Up @@ -844,7 +871,7 @@ mod tests {
let mut buffer = Vec::new();
encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));

let result = decode(&Schema::Uuid, &mut &buffer[..])?;
let result = decode(&Schema::Uuid(UuidSchema::String), &mut &buffer[..])?;
assert_eq!(result, value);

Ok(())
Expand All @@ -854,20 +881,38 @@ mod tests {
fn avro_3926_encode_decode_uuid_to_fixed() -> TestResult {
use crate::encode::encode;

let schema = Schema::Fixed(FixedSchema {
let fixed = FixedSchema {
size: 16,
name: "uuid".into(),
aliases: None,
doc: None,
default: None,
attributes: Default::default(),
});
};

let schema = Schema::Fixed(fixed.clone());
let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);

let mut buffer = Vec::new();
encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));

let result = decode(&Schema::Uuid(UuidSchema::Fixed(fixed)), &mut &buffer[..])?;
assert_eq!(result, value);

Ok(())
}

#[test]
fn encode_decode_uuid_to_bytes() -> TestResult {
use crate::encode::encode;

let schema = Schema::Bytes;
let value = Value::Uuid(Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")?);

let mut buffer = Vec::new();
encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));

let result = decode(&Schema::Uuid, &mut &buffer[..])?;
let result = decode(&Schema::Uuid(UuidSchema::Bytes), &mut &buffer[..])?;
assert_eq!(result, value);

Ok(())
Expand Down
52 changes: 36 additions & 16 deletions avro/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::schema::{InnerDecimalSchema, UuidSchema};
use crate::{
AvroResult,
bigdecimal::serialize_big_decimal,
Expand Down Expand Up @@ -111,17 +112,24 @@ pub(crate) fn encode_internal<W: Write, S: Borrow<Schema>>(
.write(&x.to_le_bytes())
.map_err(|e| Details::WriteBytes(e).into()),
Value::Decimal(decimal) => match schema {
Schema::Decimal(DecimalSchema { inner, .. }) => match *inner.clone() {
Schema::Fixed(FixedSchema { size, .. }) => {
let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
Schema::Decimal(DecimalSchema { inner, .. }) => match inner {
InnerDecimalSchema::Fixed(fixed) => {
let bytes = decimal.to_sign_extended_bytes_with_len(fixed.size)?;
let num_bytes = bytes.len();
if num_bytes != size {
return Err(Details::EncodeDecimalAsFixedError(num_bytes, size).into());
if num_bytes != fixed.size {
return Err(
Details::EncodeDecimalAsFixedError(num_bytes, fixed.size).into()
);
}
encode(&Value::Fixed(size, bytes), inner, writer)
encode(
&Value::Fixed(fixed.size, bytes),
&Schema::Fixed(fixed.copy_only_size()),
writer,
)
}
InnerDecimalSchema::Bytes => {
encode(&Value::Bytes(decimal.try_into()?), &Schema::Bytes, writer)
}
Schema::Bytes => encode(&Value::Bytes(decimal.try_into()?), inner, writer),
_ => Err(Details::ResolveDecimalSchema(SchemaKind::from(*inner.clone())).into()),
},
_ => Err(Details::EncodeValueAsSchemaError {
value_kind: ValueKind::Decimal,
Expand All @@ -136,23 +144,35 @@ pub(crate) fn encode_internal<W: Write, S: Borrow<Schema>>(
.map_err(|e| Details::WriteBytes(e).into())
}
Value::Uuid(uuid) => match *schema {
Schema::Uuid | Schema::String => encode_bytes(
Schema::Uuid(UuidSchema::String) | Schema::String => encode_bytes(
// we need the call .to_string() to properly convert ASCII to UTF-8
#[allow(clippy::unnecessary_to_owned)]
&uuid.to_string(),
writer,
),
Schema::Fixed(FixedSchema { size, .. }) => {
Schema::Uuid(UuidSchema::Bytes) | Schema::Bytes => {
let bytes = uuid.as_bytes();
encode_bytes(bytes, writer)
}
Schema::Uuid(UuidSchema::Fixed(FixedSchema { size, .. }))
| Schema::Fixed(FixedSchema { size, .. }) => {
if size != 16 {
return Err(Details::ConvertFixedToUuid(size).into());
}

let bytes = uuid.as_bytes();
encode_bytes(bytes, writer)
writer
.write(bytes.as_slice())
.map_err(|e| Details::WriteBytes(e).into())
}
_ => Err(Details::EncodeValueAsSchemaError {
value_kind: ValueKind::Uuid,
supported_schema: vec![SchemaKind::Uuid, SchemaKind::Fixed],
supported_schema: vec![
SchemaKind::Uuid,
SchemaKind::Fixed,
SchemaKind::Bytes,
SchemaKind::String,
],
}
.into()),
},
Expand All @@ -163,18 +183,18 @@ pub(crate) fn encode_internal<W: Write, S: Borrow<Schema>>(
.map_err(|e| Details::WriteBytes(e).into())
}
Value::Bytes(bytes) => match *schema {
Schema::Bytes => encode_bytes(bytes, writer),
Schema::Bytes | Schema::Uuid(UuidSchema::Bytes) => encode_bytes(bytes, writer),
Schema::Fixed { .. } => writer
.write(bytes.as_slice())
.map_err(|e| Details::WriteBytes(e).into()),
_ => Err(Details::EncodeValueAsSchemaError {
value_kind: ValueKind::Bytes,
supported_schema: vec![SchemaKind::Bytes, SchemaKind::Fixed],
supported_schema: vec![SchemaKind::Bytes, SchemaKind::Fixed, SchemaKind::Uuid],
}
.into()),
},
Value::String(s) => match *schema {
Schema::String | Schema::Uuid => encode_bytes(s, writer),
Schema::String | Schema::Uuid(UuidSchema::String) => encode_bytes(s, writer),
Schema::Enum(EnumSchema { ref symbols, .. }) => {
if let Some(index) = symbols.iter().position(|item| item == s) {
encode_int(index as i32, writer)
Expand Down Expand Up @@ -937,7 +957,7 @@ pub(crate) mod tests {
#[test]
fn test_avro_3585_encode_uuids() {
let value = Value::String(String::from("00000000-0000-0000-0000-000000000000"));
let schema = Schema::Uuid;
let schema = Schema::Uuid(UuidSchema::String);
let mut buffer = Vec::new();
let encoded = encode(&value, &schema, &mut buffer);
assert!(encoded.is_ok());
Expand Down
Loading
Loading