Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposed change in AvroSchema to handle circular references. #445

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class AvroSchemaConverter {

private final boolean assumeRepeatedIsListElement;
private final boolean writeOldListStructure;

private ArrayList<Schema> schemapath;
private ArrayList<GroupType> grouppath;

public AvroSchemaConverter() {
this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
Expand Down Expand Up @@ -112,7 +115,13 @@ public MessageType convert(Schema avroSchema) {
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields()));
schemapath = new ArrayList<Schema>();
schemapath.add(avroSchema);
grouppath = new ArrayList<GroupType>();
MessageType m = new MessageType(avroSchema.getFullName());
grouppath.add(m);
m.addFields(convertFields(avroSchema.getFields()));
return m;
}

private List<Type> convertFields(List<Schema.Field> fields) {
Expand Down Expand Up @@ -149,7 +158,50 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
} else if (type.equals(Schema.Type.STRING)) {
builder = Types.primitive(BINARY, repetition).as(UTF8);
} else if (type.equals(Schema.Type.RECORD)) {
return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
/*
* A Schema might contain directly or indirectly a parent schema.
* Example1: "Person"-Schema has a field of type array-of-"Person" named "children" --> A "Person" can have multiple Person records in the field "children"
* Example2: "Person"-Schema has a field "contacts" which lists various contact options. These contact options have an optional field naturalperson which is of type "Person"
*
* To solve that, whenever a new record schema is found, we check if this schema had been used somewhere along the path.
* If No, then it is just a regular structure tree, no circular references where one schema has itself as child.
* If Yes, then this field is redefined as a INT64 containing a generated ID and records of that element can be found in the parent structure via the __ID field.
*/
int index = schemapath.lastIndexOf(schema); // Has the current schema been used in the schema tree already?
if (index == -1) {
/*
* No, it has not been used, it is the first time this schema appears in this section of the tree, hence simply add it.
* But we need to build the schema tree so the recursive calls know the tree structure.
* And we need to build the same tree with the generated GroupTypes so we can add the __ID column in case it is needed.
*/
schemapath.add(schema);
GroupType group = new GroupType(repetition, fieldName);
grouppath.add(group);
group.addFields(convertFields(schema.getFields()));
schemapath.remove(schemapath.size()-1);
grouppath.remove(grouppath.size()-1);
return group;
} else {
/*
* We found a recursion like Schema1 -> Schema2 -> Schema3 -> .... SchemaN -> Schema2.
* In that case the column within the SchemaN that is reusing Schema2 as datatype gets an INT64 and the reused Schema2 has to have an additional __ID column.
* This __ID column will not be filled for Schema1 fields but contain the records for the SchemaN field with the __ID column reference.
*/
GroupType referencegroup = grouppath.get(index);
if (!referencegroup.containsField("__ID")) {
if (!referencegroup.isRepetition(REPEATED)) {
/*
* Originally this AvroSchema can contain a single record only. But as we are reusing it to store child
* records as well, it needs to be turned into an array.
*
* What is the most efficient way for that??
*/
//TODO: Change referencegroup to an array object
}
referencegroup.addField(Types.primitive(INT64, repetition).named("__ID"));
}
builder = Types.primitive(INT64, repetition);
}
} else if (type.equals(Schema.Type.ENUM)) {
builder = Types.primitive(BINARY, repetition).as(ENUM);
} else if (type.equals(Schema.Type.ARRAY)) {
Expand Down Expand Up @@ -443,3 +495,4 @@ private static Schema optional(Schema original) {
original));
}
}

Loading