Skip to content
Permalink
Browse files
AVRO-3512: Fix the aliases' namespace when parsing named schemata (#1685
)

* AVRO-3512: Fix the aliases' namespace when parsing named schemata

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>

* AVRO-3512: Fix formatting issue

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
  • Loading branch information
martin-g committed May 16, 2022
1 parent 57c5428 commit bed2ec96a8d8ecbffcf02f0ec73e4f4ecb886fb9
Showing 3 changed files with 137 additions and 10 deletions.
@@ -398,7 +398,7 @@
<title>Aliases</title>
<p>Named types and fields may have aliases. An implementation
may optionally use aliases to map a writer's schema to the
reader's. This faciliates both schema evolution as well as
reader's. This facilitates both schema evolution as well as
processing disparate datasets.</p>
<p>Aliases function by re-writing the writer's schema using
aliases from the reader's schema. For example, if the
@@ -257,7 +257,7 @@ Complex types (`record`, `enum`, `array`, `map`, `fixed`) have no namespace, but
A schema or protocol may not contain multiple definitions of a fullname. Further, a name must be defined before it is used ("before" in the depth-first, left-to-right traversal of the JSON parse tree, where the types attribute of a protocol is always deemed to come "before" the messages attribute.)

### Aliases
Named types and fields may have aliases. An implementation may optionally use aliases to map a writer's schema to the reader's. This faciliates both schema evolution as well as processing disparate datasets.
Named types and fields may have aliases. An implementation may optionally use aliases to map a writer's schema to the reader's. This facilitates both schema evolution as well as processing disparate datasets.

Aliases function by re-writing the writer's schema using aliases from the reader's schema. For example, if the writer's schema was named "Foo" and the reader's schema is named "Bar" and has an alias of "Foo", then the implementation would act as though "Foo" were named "Bar" when reading. Similarly, if data was written as a record with a field named "x" and is read as a record with a field named "y" with alias "x", then the implementation would act as though "x" were named "y" when reading.

@@ -1115,6 +1115,7 @@ impl Parser {
Some(ref ns) => format!("{}.{}", ns, alias),
None => alias.clone(),
};

let alias_name = Name::new(alias_fullname.as_str()).unwrap();
self.resolving_schemas.remove(&alias_name);
self.parsed_schemas.insert(alias_name, schema.clone());
@@ -1148,8 +1149,6 @@ impl Parser {
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
let name = Name::parse(complex)?;
let aliases = complex.aliases();
let fields_opt = complex.get("fields");

if fields_opt.is_none() {
@@ -1158,6 +1157,9 @@ impl Parser {
}
}

let name = Name::parse(complex)?;
let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);

let mut lookup = BTreeMap::new();
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
self.register_resolving_schema(&fully_qualified_name, &aliases);
@@ -1199,9 +1201,6 @@ impl Parser {
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
let name = Name::parse(complex)?;
let aliases = complex.aliases();
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
let symbols_opt = complex.get("symbols");

if symbols_opt.is_none() {
@@ -1210,6 +1209,10 @@ impl Parser {
}
}

let name = Name::parse(complex)?;
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);

let symbols: Vec<String> = symbols_opt
.and_then(|v| v.as_array())
.ok_or(Error::GetEnumSymbolsField)
@@ -1297,9 +1300,6 @@ impl Parser {
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
let name = Name::parse(complex)?;
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
let aliases = complex.aliases();
let size_opt = complex.get("size");
if size_opt.is_none() {
if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
@@ -1316,6 +1316,10 @@ impl Parser {
.and_then(|v| v.as_i64())
.ok_or(Error::GetFixedSizeField)?;

let name = Name::parse(complex)?;
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace);

let schema = Schema::Fixed {
name,
aliases: aliases.clone(),
@@ -1329,6 +1333,29 @@ impl Parser {
}
}

// A type alias may be specified either as a fully namespace-qualified, or relative
// to the namespace of the name it is an alias for. For example, if a type named "a.b"
// has aliases of "c" and "x.y", then the fully qualified names of its aliases are "a.c"
// and "x.y".
// https://avro.apache.org/docs/current/spec.html#Aliases
fn fix_aliases_namespace(aliases: Aliases, namespace: &Namespace) -> Aliases {
aliases.map(|aliases| {
aliases
.iter()
.map(|alias| {
if alias.find('.').is_none() {
match namespace {
Some(ref ns) => format!("{}.{}", ns, alias),
None => alias.clone(),
}
} else {
alias.clone()
}
})
.collect()
})
}

fn get_schema_type_name(name: Name, value: Value) -> Name {
match value.get("type") {
Some(Value::Object(complex_type)) => match complex_type.name() {
@@ -3505,4 +3532,104 @@ mod tests {
let _schema = Schema::parse_str(&schema_str).expect("test failed");
assert_eq!(schema, _schema);
}

#[test]
fn avro_3512_alias_with_null_namespace_record() {
let schema = Schema::parse_str(
r#"
{
"type": "record",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"fields" : [
{"name": "time", "type": "long"}
]
}
"#,
)
.unwrap();

if let Schema::Record { ref aliases, .. } = schema {
match aliases {
Some(aliases) => {
assert_eq!(aliases.len(), 3);
assert_eq!(aliases[0], "space.b");
assert_eq!(aliases[1], "x.y");
assert_eq!(aliases[2], ".c");
}
None => {
panic!("'aliases' must be Some");
}
}
} else {
panic!("The Schema should be a record: {:?}", schema);
}
}

#[test]
fn avro_3512_alias_with_null_namespace_enum() {
let schema = Schema::parse_str(
r#"
{
"type": "enum",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"symbols" : [
"symbol1", "symbol2"
]
}
"#,
)
.unwrap();

if let Schema::Enum { ref aliases, .. } = schema {
match aliases {
Some(aliases) => {
assert_eq!(aliases.len(), 3);
assert_eq!(aliases[0], "space.b");
assert_eq!(aliases[1], "x.y");
assert_eq!(aliases[2], ".c");
}
None => {
panic!("'aliases' must be Some");
}
}
} else {
panic!("The Schema should be an enum: {:?}", schema);
}
}

#[test]
fn avro_3512_alias_with_null_namespace_fixed() {
let schema = Schema::parse_str(
r#"
{
"type": "fixed",
"name": "a",
"namespace": "space",
"aliases": ["b", "x.y", ".c"],
"size" : 12
}
"#,
)
.unwrap();

if let Schema::Fixed { ref aliases, .. } = schema {
match aliases {
Some(aliases) => {
assert_eq!(aliases.len(), 3);
assert_eq!(aliases[0], "space.b");
assert_eq!(aliases[1], "x.y");
assert_eq!(aliases[2], ".c");
}
None => {
panic!("'aliases' must be Some");
}
}
} else {
panic!("The Schema should be a fixed: {:?}", schema);
}
}
}

0 comments on commit bed2ec9

Please sign in to comment.