Skip to content

Commit

Permalink
Make avro maps' Debug implementation deterministic
Browse files Browse the repository at this point in the history
  • Loading branch information
umanwizard committed Jun 16, 2021
1 parent 4802872 commit c2e51da
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 26 deletions.
8 changes: 4 additions & 4 deletions src/avro/src/decode.rs
Expand Up @@ -34,7 +34,7 @@ use crate::schema::{
RecordField, ResolvedDefaultValueField, ResolvedRecordField, SchemaNode, SchemaPiece,
SchemaPieceOrNamed,
};
use crate::types::{Scalar, Value};
use crate::types::{AvroMap, Scalar, Value};
use crate::{
util::{safe_len, zag_i32, zag_i64, TsUnit},
TrivialDecoder, ValueDecoder,
Expand Down Expand Up @@ -704,7 +704,7 @@ pub mod public_decoders {

use super::{AvroDecodable, AvroMapAccess, StatefulAvroDecodable};
use crate::error::{DecodeError, Error as AvroError};
use crate::types::{DecimalValue, Scalar, Value};
use crate::types::{AvroMap, DecimalValue, Scalar, Value};
use crate::{
AvroArrayAccess, AvroDecode, AvroDeserializer, AvroRead, AvroRecordAccess, ValueOrReader,
};
Expand Down Expand Up @@ -1177,7 +1177,7 @@ pub mod public_decoders {
let val = a.decode_field(d)?;
entries.insert(name, val);
}
Ok(Value::Map(entries))
Ok(Value::Map(AvroMap(entries)))
}
}
}
Expand Down Expand Up @@ -1230,7 +1230,7 @@ pub fn give_value<D: AvroDecode>(d: D, v: &Value) -> Result<D::Out, AvroError> {
let mut a = ValueArrayAccess::new(val);
d.array(&mut a)
}
Value::Map(val) => {
Value::Map(AvroMap(val)) => {
let vals: Vec<_> = val.clone().into_iter().collect();
let mut m = ValueMapAccess::new(vals.as_slice());
d.map(&mut m)
Expand Down
5 changes: 3 additions & 2 deletions src/avro/src/encode.rs
Expand Up @@ -25,6 +25,7 @@ use std::convert::TryInto;
use std::mem::transmute;

use crate::schema::{Schema, SchemaNode, SchemaPiece};
use crate::types::AvroMap;
use crate::types::{DecimalValue, Value};
use crate::util::{zig_i32, zig_i64};

Expand Down Expand Up @@ -140,7 +141,7 @@ pub fn encode_ref(value: &Value, schema: SchemaNode, buffer: &mut Vec<u8>) {
buffer.push(0u8);
}
}
Value::Map(items) => {
Value::Map(AvroMap(items)) => {
if let SchemaPiece::Map(inner) = schema.inner {
if !items.is_empty() {
encode_long(items.len() as i64, buffer);
Expand Down Expand Up @@ -200,7 +201,7 @@ mod tests {
let mut buf = Vec::new();
let empty: HashMap<String, Value> = HashMap::new();
encode(
&Value::Map(empty),
&Value::Map(AvroMap(empty)),
&r#"{"type": "map", "values": "int"}"#.parse().unwrap(),
&mut buf,
);
Expand Down
4 changes: 2 additions & 2 deletions src/avro/src/reader.rs
Expand Up @@ -35,7 +35,7 @@ use crate::schema::{
SchemaPieceRefOrNamed,
};
use crate::schema::{ResolvedRecordField, Schema};
use crate::types::Value;
use crate::types::{AvroMap, Value};
use crate::util::{self};
use crate::{Codec, SchemaResolutionError};

Expand Down Expand Up @@ -64,7 +64,7 @@ impl Header {
return Err(AvroError::Decode(DecodeError::WrongHeaderMagic(buf)));
}

if let Value::Map(meta) = decode(meta_schema.top_node(), reader)? {
if let Value::Map(AvroMap(meta)) = decode(meta_schema.top_node(), reader)? {
// TODO: surface original parse schema errors instead of coalescing them here
let json = meta
.get("avro.schema")
Expand Down
3 changes: 2 additions & 1 deletion src/avro/src/schema.rs
Expand Up @@ -46,6 +46,7 @@ use types::{DecimalValue, Value as AvroValue};
use crate::error::Error as AvroError;
use crate::reader::SchemaResolver;
use crate::types;
use crate::types::AvroMap;
use crate::util::MapHelper;

pub fn resolve_schemas(
Expand Down Expand Up @@ -1739,7 +1740,7 @@ impl<'a> SchemaNode<'a> {
.iter()
.map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
.collect::<Result<HashMap<_, _>, ParseSchemaError>>()?;
AvroValue::Map(map)
AvroValue::Map(AvroMap(map))
}
(String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
AvroValue::Fixed(*size, s.clone().into_bytes())
Expand Down
30 changes: 22 additions & 8 deletions src/avro/src/types.rs
Expand Up @@ -91,6 +91,20 @@ impl From<Scalar> for Value {
}
}

/// The values stored in an Avro map.
// This simple wrapper exists so we can Debug-print the values deterministically, i.e. in sorted order
// by keys.
#[derive(Clone, PartialEq)]
pub struct AvroMap(pub HashMap<String, Value>);

impl fmt::Debug for AvroMap {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut entries = self.0.clone().into_iter().collect::<Vec<_>>();
entries.sort_by_key(|(k, _)| k.clone());
f.debug_map().entries(entries).finish()
}
}

/// Represents any valid Avro value
/// More information about Avro values can be found in the
/// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas)
Expand Down Expand Up @@ -151,7 +165,7 @@ pub enum Value {
/// An `array` Avro value.
Array(Vec<Value>),
/// A `map` Avro value.
Map(HashMap<String, Value>),
Map(AvroMap),
/// A `record` Avro value.
///
/// A Record is represented by a vector of (`<field name>`, `value`).
Expand Down Expand Up @@ -221,11 +235,11 @@ where
T: ToAvro,
{
fn avro(self) -> Value {
Value::Map(
Value::Map(AvroMap(
self.into_iter()
.map(|(key, value)| (key, value.avro()))
.collect::<_>(),
)
))
}
}

Expand All @@ -234,11 +248,11 @@ where
T: ToAvro,
{
fn avro(self) -> Value {
Value::Map(
Value::Map(AvroMap(
self.into_iter()
.map(|(key, value)| (key.to_owned(), value.avro()))
.collect::<_>(),
)
))
}
}

Expand Down Expand Up @@ -342,12 +356,12 @@ impl ToAvro for JsonValue {
JsonValue::Array(items) => {
Value::Array(items.into_iter().map(|item| item.avro()).collect::<_>())
}
JsonValue::Object(items) => Value::Map(
JsonValue::Object(items) => Value::Map(AvroMap(
items
.into_iter()
.map(|(key, value)| (key, value.avro()))
.collect::<_>(),
),
)),
}
}
}
Expand Down Expand Up @@ -415,7 +429,7 @@ impl Value {
let node = schema.step(&**inner);
items.iter().all(|item| item.validate(node))
}
(&Value::Map(ref items), SchemaPiece::Map(inner)) => {
(&Value::Map(AvroMap(ref items)), SchemaPiece::Map(inner)) => {
let node = schema.step(&**inner);
items.iter().all(|(_, value)| value.validate(node))
}
Expand Down
9 changes: 5 additions & 4 deletions src/avro/tests/io.rs
Expand Up @@ -29,6 +29,7 @@ use std::str::FromStr;
use chrono::{NaiveDate, NaiveDateTime};
use lazy_static::lazy_static;
use mz_avro::schema::resolve_schemas;
use mz_avro::types::AvroMap;
use mz_avro::{
error::Error as AvroError,
from_avro_datum, to_avro_datum,
Expand All @@ -49,7 +50,7 @@ lazy_static! {
(r#"{"type": "fixed", "name": "Test", "size": 1}"#, Value::Fixed(1, vec![b'B'])),
(r#"{"type": "enum", "name": "Test", "symbols": ["A", "B"]}"#, Value::Enum(1, "B".to_string())),
(r#"{"type": "array", "items": "long"}"#, Value::Array(vec![Value::Long(1), Value::Long(3), Value::Long(2)])),
(r#"{"type": "map", "values": "long"}"#, Value::Map([("a".to_string(), Value::Long(1i64)), ("b".to_string(), Value::Long(3i64)), ("c".to_string(), Value::Long(2i64))].iter().cloned().collect())),
(r#"{"type": "map", "values": "long"}"#, Value::Map(AvroMap([("a".to_string(), Value::Long(1i64)), ("b".to_string(), Value::Long(3i64)), ("c".to_string(), Value::Long(2i64))].iter().cloned().collect()))),
(r#"["string", "null", "long"]"#, Value::Union{
index: 1,
inner: Box::new(Value::Null),
Expand Down Expand Up @@ -83,7 +84,7 @@ lazy_static! {
//(r#"{"type": "fixed", "name": "F", "size": 2}"#, r#""\u00FF\u00FF""#, Value::Bytes(vec![0xff, 0xff])),
//(r#"{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}"#, r#""FOO""#, Value::Enum(0, "FOO".to_string())),
(r#"{"type": "array", "items": "int"}"#, "[1, 2, 3]", Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)])),
(r#"{"type": "map", "values": "int"}"#, r#"{"a": 1, "b": 2}"#, Value::Map([("a".to_string(), Value::Int(1)), ("b".to_string(), Value::Int(2))].iter().cloned().collect())),
(r#"{"type": "map", "values": "int"}"#, r#"{"a": 1, "b": 2}"#, Value::Map(AvroMap([("a".to_string(), Value::Int(1)), ("b".to_string(), Value::Int(2))].iter().cloned().collect()))),
//(r#"["int", "null"]"#, "5", Value::Union(Box::new(Value::Int(5)))),
(r#"{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}"#, r#"{"A": 5}"#,Value::Record(vec![("A".to_string(), Value::Int(5))])),
];
Expand Down Expand Up @@ -802,11 +803,11 @@ fn test_complex_resolutions() {
),
(
"f0_2".to_owned(),
Value::Map(
Value::Map(AvroMap(
vec![("a".to_string(), Value::Long(42))]
.into_iter()
.collect(),
),
)),
),
]),
),
Expand Down
7 changes: 4 additions & 3 deletions src/avro/tests/schema.rs
Expand Up @@ -29,6 +29,7 @@ use std::str::FromStr;

use chrono::{NaiveDate, NaiveDateTime};
use lazy_static::lazy_static;
use mz_avro::types::AvroMap;
use mz_avro::{types::DecimalValue, types::Value, Schema};

lazy_static! {
Expand Down Expand Up @@ -118,9 +119,9 @@ lazy_static! {
(r#"{"type": "array",
"items": {"type": "enum", "name": "Test", "symbols": ["A", "B"]}}"#, Value::Array(vec![Value::Enum(0, "A".to_owned())])),
// Map examples
(r#"{"type": "map", "values": "long"}"#, Value::Map(HashMap::new())),
(r#"{"type": "map", "values": "long"}"#, Value::Map(AvroMap(HashMap::new()))),
(r#"{"type": "map",
"values": {"type": "enum", "name": "Test", "symbols": ["A", "B"]}}"#, Value::Map(HashMap::new())),
"values": {"type": "enum", "name": "Test", "symbols": ["A", "B"]}}"#, Value::Map(AvroMap(HashMap::new()))),
// Union examples
(r#"["null", "int"]"#, Value::Union{index:0, inner:Box::new(Value::Null),n_variants:2,null_variant:Some(0)}),
(r#"["null", "int"]"#, Value::Union{index:1, inner:Box::new(Value::Int(42)),n_variants:2,null_variant:Some(0)}),
Expand Down Expand Up @@ -161,7 +162,7 @@ lazy_static! {
("bytesField".into(), Value::Bytes(vec![0])),
("nullField".into(), Value::Null),
("arrayField".into(), Value::Array(vec![Value::Double(0.0)])),
("mapField".into(), Value::Map(HashMap::new())),
("mapField".into(), Value::Map(AvroMap(HashMap::new()))),
("unionField".into(), Value::Union{index:1, inner:Box::new(Value::Double(0.0)),n_variants:3,null_variant:None}),
("enumField".into(), Value::Enum(1, "B".into())),
("fixedField".into(), Value::Fixed(4, vec![0, 0, 0, 0])),
Expand Down
3 changes: 2 additions & 1 deletion src/interchange/src/avro/encode.rs
Expand Up @@ -14,6 +14,7 @@ use byteorder::{NetworkEndian, WriteBytesExt};
use chrono::Timelike;
use itertools::Itertools;
use lazy_static::lazy_static;
use mz_avro::types::AvroMap;
use repr::adt::jsonb::JsonbRef;
use repr::{ColumnName, ColumnType, Datum, RelationDesc, Row, ScalarType};
use serde_json::json;
Expand Down Expand Up @@ -333,7 +334,7 @@ impl<'a> mz_avro::types::ToAvro for TypedDatum<'a> {
(key.to_string(), value)
})
.collect();
Value::Map(elements)
Value::Map(AvroMap(elements))
}
ScalarType::Record { fields, .. } => {
let list = datum.unwrap_list();
Expand Down
3 changes: 2 additions & 1 deletion src/testdrive/src/format/avro.rs
Expand Up @@ -21,6 +21,7 @@ use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::num::TryFromIntError;

use mz_avro::types::AvroMap;
use regex::Regex;

use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -157,7 +158,7 @@ pub fn from_json(json: &JsonValue, schema: SchemaNode) -> Result<Value, String>
)?,
);
}
Ok(Value::Map(map))
Ok(Value::Map(AvroMap(map)))
}
(val, SchemaPiece::Union(us)) => {
let variants = us.variants();
Expand Down

0 comments on commit c2e51da

Please sign in to comment.