Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-1649: Teach SchemaCompatability to be more descriptive with failing compatibility checks. #24

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
118 changes: 56 additions & 62 deletions lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
Expand Up @@ -45,6 +45,8 @@ private SchemaCompatibility() {
/** Message to annotate reader/writer schema pairs that are compatible. */
public static final String READER_WRITER_COMPATIBLE_MESSAGE =
"Reader schema can always successfully decode data written using the writer schema.";
public static final String READER_WRITER_INCOMPATABLE_MESSAGE =
"Data encoded using writer schema:%n%s%nwill or may fail to decode using reader schema:%n%s%n";

/**
* Validates that the provided reader schema can be used to decode avro data written with the
Expand All @@ -62,28 +64,10 @@ public static SchemaPairCompatibility checkReaderWriterCompatibility(
new ReaderWriterCompatiblityChecker()
.getCompatibility(reader, writer);

final String message;
switch (compatibility) {
case INCOMPATIBLE: {
message = String.format(
"Data encoded using writer schema:%n%s%n"
+ "will or may fail to decode using reader schema:%n%s%n",
writer.toString(true),
reader.toString(true));
break;
}
case COMPATIBLE: {
message = READER_WRITER_COMPATIBLE_MESSAGE;
break;
}
default: throw new AvroRuntimeException("Unknown compatibility: " + compatibility);
}

return new SchemaPairCompatibility(
compatibility,
reader,
writer,
message);
writer);
}

// -----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -282,32 +266,29 @@ private SchemaCompatibilityType calculateCompatibility(
case FIXED: {
// fixed size and name must match:
if (!schemaNameEquals(reader, writer)) {
return SchemaCompatibilityType.INCOMPATIBLE;
return SchemaCompatibilityType.INCOMPATIBLE_NAME;
}
if (reader.getFixedSize() != writer.getFixedSize()) {
return SchemaCompatibilityType.INCOMPATIBLE;
return SchemaCompatibilityType.INCOMPATIBLE_SIZE;
}
return SchemaCompatibilityType.COMPATIBLE;
}
case ENUM: {
// enum names must match:
if (!schemaNameEquals(reader, writer)) {
return SchemaCompatibilityType.INCOMPATIBLE;
return SchemaCompatibilityType.INCOMPATIBLE_NAME;
}
// reader symbols must contain all writer symbols:
final Set<String> symbols = new HashSet<String>(writer.getEnumSymbols());
symbols.removeAll(reader.getEnumSymbols());
// TODO: Report a human-readable error.
// if (!symbols.isEmpty()) {
// }
return symbols.isEmpty()
? SchemaCompatibilityType.COMPATIBLE
: SchemaCompatibilityType.INCOMPATIBLE;
: SchemaCompatibilityType.INCOMPATIBLE_ENUM_MISSING_FIELDS;
}
case RECORD: {
// record names must match:
if (!schemaNameEquals(reader, writer)) {
return SchemaCompatibilityType.INCOMPATIBLE;
return SchemaCompatibilityType.INCOMPATIBLE_NAME;
}

// Check that each field in the reader record can be populated from the writer record:
Expand All @@ -318,12 +299,12 @@ private SchemaCompatibilityType calculateCompatibility(
// reader field must have a default value.
if (readerField.defaultValue() == null) {
// reader field has no default value
return SchemaCompatibilityType.INCOMPATIBLE;
return SchemaCompatibilityType.INCOMPATIBLE_MISSING_DEFAULT;
}
} else {
if (getCompatibility(readerField.schema(), writerField.schema())
== SchemaCompatibilityType.INCOMPATIBLE) {
return SchemaCompatibilityType.INCOMPATIBLE;
SchemaCompatibilityType compatibilityType = getCompatibility(readerField.schema(), writerField.schema());
if (!compatibilityType.isCompatible()) {
return compatibilityType;
}
}
}
Expand All @@ -334,8 +315,9 @@ private SchemaCompatibilityType calculateCompatibility(
case UNION: {
// Check that each individual branch of the writer union can be decoded:
for (final Schema writerBranch : writer.getTypes()) {
if (getCompatibility(reader, writerBranch) == SchemaCompatibilityType.INCOMPATIBLE) {
return SchemaCompatibilityType.INCOMPATIBLE;
SchemaCompatibilityType schemaCompatibilityType = getCompatibility(reader, writerBranch);
if (!schemaCompatibilityType.isCompatible()) {
return schemaCompatibilityType;
}
}
// Each schema in the writer union can be decoded with the reader:
Expand All @@ -357,43 +339,43 @@ private SchemaCompatibilityType calculateCompatibility(
}

switch (reader.getType()) {
case NULL: return SchemaCompatibilityType.INCOMPATIBLE;
case BOOLEAN: return SchemaCompatibilityType.INCOMPATIBLE;
case INT: return SchemaCompatibilityType.INCOMPATIBLE;
case NULL: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case BOOLEAN: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case INT: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case LONG: {
return (writer.getType() == Type.INT)
? SchemaCompatibilityType.COMPATIBLE
: SchemaCompatibilityType.INCOMPATIBLE;
: SchemaCompatibilityType.INCOMPATIBLE_TYPE;
}
case FLOAT: {
return ((writer.getType() == Type.INT)
|| (writer.getType() == Type.LONG))
? SchemaCompatibilityType.COMPATIBLE
: SchemaCompatibilityType.INCOMPATIBLE;
: SchemaCompatibilityType.INCOMPATIBLE_TYPE;

}
case DOUBLE: {
return ((writer.getType() == Type.INT)
|| (writer.getType() == Type.LONG)
|| (writer.getType() == Type.FLOAT))
? SchemaCompatibilityType.COMPATIBLE
: SchemaCompatibilityType.INCOMPATIBLE;
: SchemaCompatibilityType.INCOMPATIBLE_TYPE;
}
case BYTES: return SchemaCompatibilityType.INCOMPATIBLE;
case STRING: return SchemaCompatibilityType.INCOMPATIBLE;
case ARRAY: return SchemaCompatibilityType.INCOMPATIBLE;
case MAP: return SchemaCompatibilityType.INCOMPATIBLE;
case FIXED: return SchemaCompatibilityType.INCOMPATIBLE;
case ENUM: return SchemaCompatibilityType.INCOMPATIBLE;
case RECORD: return SchemaCompatibilityType.INCOMPATIBLE;
case BYTES: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case STRING: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case ARRAY: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case MAP: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case FIXED: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case ENUM: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case RECORD: return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
case UNION: {
for (final Schema readerBranch : reader.getTypes()) {
if (getCompatibility(readerBranch, writer) == SchemaCompatibilityType.COMPATIBLE) {
return SchemaCompatibilityType.COMPATIBLE;
}
}
// No branch in the reader union has been found compatible with the writer schema:
return SchemaCompatibilityType.INCOMPATIBLE;
return SchemaCompatibilityType.INCOMPATIBLE_TYPE;
}

default: {
Expand All @@ -408,11 +390,30 @@ private SchemaCompatibilityType calculateCompatibility(
* Identifies the type of a schema compatibility result.
*/
public static enum SchemaCompatibilityType {
COMPATIBLE,
INCOMPATIBLE,
COMPATIBLE(READER_WRITER_COMPATIBLE_MESSAGE),

INCOMPATIBLE_NAME(READER_WRITER_INCOMPATABLE_MESSAGE + "Schema names must match."),
INCOMPATIBLE_SIZE(READER_WRITER_INCOMPATABLE_MESSAGE + " Fixed schemas are no the same size."),
INCOMPATIBLE_ENUM_MISSING_FIELDS(READER_WRITER_INCOMPATABLE_MESSAGE + " Reader schema is missing ENUM values."),
INCOMPATIBLE_MISSING_DEFAULT(READER_WRITER_INCOMPATABLE_MESSAGE + " New fields must have a default value."),
INCOMPATIBLE_TYPE(READER_WRITER_INCOMPATABLE_MESSAGE + " Schema types are incompatable."),

/** Used internally to tag a reader/writer schema pair and prevent recursion. */
RECURSION_IN_PROGRESS;
RECURSION_IN_PROGRESS("");

private final String description;

SchemaCompatibilityType(String description) {
this.description = description;
}

protected String description(Schema reader, Schema writer) {
return String.format(description, writer.toString(true), reader.toString(true));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change and would require holding this back until at least 1.8. I don't see a need to make a breaking change, though. What about adding a version of getCompatibility that returns a SchemaPairCompatibility instead of the SchemaCompatibilityType? Then you could fill in the specific message (with even more detail, like record field path) and return all the way up for the same result.

You'd probably want an internal method to do the check and memoize results that getCompatibility calls and retrieves the compatibility type result from. calculateCompatibility could be used for that purpose, but I think it would be cleaner to memoize separately in a memoizeCalculateCompatibility. You could return the correct SchemaPairCompatibility from checkReaderWriterCompatibility. Does that sound reasonable? Then this could easily be a compatible change.


public boolean isCompatible() {
return this == COMPATIBLE;
}
}

// -----------------------------------------------------------------------------------------------
Expand All @@ -432,26 +433,20 @@ public static final class SchemaPairCompatibility {
/** Validated writer schema. */
private final Schema mWriter;

/** Human readable description of this result. */
private final String mDescription;

/**
* Constructs a new instance.
*
* @param type of the schema compatibility.
* @param reader schema that was validated.
* @param writer schema that was validated.
* @param description of this compatibility result.
*/
public SchemaPairCompatibility(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is part of the public API, this constructor should still be supported, but possibly deprecated.

SchemaCompatibilityType type,
Schema reader,
Schema writer,
String description) {
Schema writer) {
mType = type;
mReader = reader;
mWriter = writer;
mDescription = description;
}

/**
Expand Down Expand Up @@ -487,15 +482,15 @@ public Schema getWriter() {
* @return a human readable description of this validation result.
*/
public String getDescription() {
return mDescription;
return mType.description(mReader, mWriter);
}

/** {@inheritDoc} */
@Override
public String toString() {
return String.format(
"SchemaPairCompatibility{type:%s, readerSchema:%s, writerSchema:%s, description:%s}",
mType, mReader, mWriter, mDescription);
mType, mReader, mWriter, mType.description(mReader, mWriter));
}

/** {@inheritDoc} */
Expand All @@ -505,8 +500,7 @@ public boolean equals(Object other) {
final SchemaPairCompatibility result = (SchemaPairCompatibility) other;
return objectsEqual(result.mType, mType)
&& objectsEqual(result.mReader, mReader)
&& objectsEqual(result.mWriter, mWriter)
&& objectsEqual(result.mDescription, mDescription);
&& objectsEqual(result.mWriter, mWriter);
} else {
return false;
}
Expand All @@ -515,7 +509,7 @@ && objectsEqual(result.mWriter, mWriter)
/** {@inheritDoc} */
@Override
public int hashCode() {
return Arrays.hashCode(new Object[]{mType, mReader, mWriter, mDescription});
return Arrays.hashCode(new Object[]{mType, mReader, mWriter});
}
}

Expand Down