From 978f0cf82487540034294ec7dc3b526ed5486ab3 Mon Sep 17 00:00:00 2001 From: Kurt Harriger Date: Tue, 7 Apr 2015 12:02:16 -0600 Subject: [PATCH 1/4] AVRO-1649: Add SchemaCompatibilityResult Modify private methods without breaking backward compatibility --- .../org/apache/avro/SchemaCompatibility.java | 163 ++++++++++++------ 1 file changed, 107 insertions(+), 56 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java index e62ea0bfd7b..cd8ea58b300 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java +++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java @@ -45,6 +45,8 @@ private SchemaCompatibility() { /** Message to annotate reader/writer schema pairs that are compatible. */ public static final String READER_WRITER_COMPATIBLE_MESSAGE = "Reader schema can always successfully decode data written using the writer schema."; + public static final String READER_WRITER_INCOMPATABLE_MESSAGE = + "Data encoded using writer schema:%n%s%nwill or may fail to decode using reader schema:%n%s%n"; /** * Validates that the provided reader schema can be used to decode avro data written with the @@ -209,8 +211,8 @@ public String toString() { *

Provides memoization to handle recursive schemas.

*/ private static final class ReaderWriterCompatiblityChecker { - private final Map mMemoizeMap = - new HashMap(); + private final Map mMemoizeMap = + new HashMap(); /** * Reports the compatibility of a reader/writer schema pair. @@ -221,28 +223,49 @@ private static final class ReaderWriterCompatiblityChecker { * @param writer Writer schema to test. * @return the compatibility of the reader/writer schema pair. */ + @Deprecated // Use getCompatibilityResult public SchemaCompatibilityType getCompatibility( final Schema reader, final Schema writer + ) { + LOG.debug("Checking compatibility of reader {} with writer {}", reader, writer); + if( getCompatibilityResult(reader, writer).isCompatible() ) { + return SchemaCompatibilityType.COMPATIBLE; + } else { + return SchemaCompatibilityType.INCOMPATIBLE; + } + } + + /** + * Reports the compatibility of a reader/writer schema pair. + * + *

Memoizes the compatibility results.

+ * + * @param reader Reader schema to test. + * @param writer Writer schema to test. + * @return the compatibility of the reader/writer schema pair. + */ + public SchemaCompatibilityResult getCompatibilityResult( + final Schema reader, + final Schema writer ) { LOG.debug("Checking compatibility of reader {} with writer {}", reader, writer); final ReaderWriter pair = new ReaderWriter(reader, writer); - final SchemaCompatibilityType existing = mMemoizeMap.get(pair); + final SchemaCompatibilityResult existing = mMemoizeMap.get(pair); if (existing != null) { - if (existing == SchemaCompatibilityType.RECURSION_IN_PROGRESS) { + if (existing == SchemaCompatibilityResult.RECURSION_IN_PROGRESS) { // Break the recursion here. // schemas are compatible unless proven incompatible: - return SchemaCompatibilityType.COMPATIBLE; + return SchemaCompatibilityResult.COMPATIBLE; } return existing; } // Mark this reader/writer pair as "in progress": - mMemoizeMap.put(pair, SchemaCompatibilityType.RECURSION_IN_PROGRESS); - final SchemaCompatibilityType calculated = calculateCompatibility(reader, writer); + mMemoizeMap.put(pair, SchemaCompatibilityResult.RECURSION_IN_PROGRESS); + final SchemaCompatibilityResult calculated = calculateCompatibility(reader, writer); mMemoizeMap.put(pair, calculated); return calculated; } - /** * Calculates the compatibility of a reader/writer schema pair. * @@ -254,9 +277,9 @@ public SchemaCompatibilityType getCompatibility( * @param writer Writer schema to test. * @return the compatibility of the reader/writer schema pair. */ - private SchemaCompatibilityType calculateCompatibility( - final Schema reader, - final Schema writer + private SchemaCompatibilityResult calculateCompatibility( + final Schema reader, + final Schema writer ) { assert (reader != null); assert (writer != null); @@ -271,43 +294,40 @@ private SchemaCompatibilityType calculateCompatibility( case DOUBLE: case BYTES: case STRING: { - return SchemaCompatibilityType.COMPATIBLE; + return SchemaCompatibilityResult.COMPATIBLE; } case ARRAY: { - return getCompatibility(reader.getElementType(), writer.getElementType()); + return getCompatibilityResult(reader.getElementType(), writer.getElementType()); } case MAP: { - return getCompatibility(reader.getValueType(), writer.getValueType()); + return getCompatibilityResult(reader.getValueType(), writer.getValueType()); } case FIXED: { // fixed size and name must match: if (!schemaNameEquals(reader, writer)) { - return SchemaCompatibilityType.INCOMPATIBLE; + return SchemaCompatibilityResult.INCOMPATIBLE_NAME; } if (reader.getFixedSize() != writer.getFixedSize()) { - return SchemaCompatibilityType.INCOMPATIBLE; + return SchemaCompatibilityResult.INCOMPATIBLE_SIZE; } - return SchemaCompatibilityType.COMPATIBLE; + return SchemaCompatibilityResult.COMPATIBLE; } case ENUM: { // enum names must match: if (!schemaNameEquals(reader, writer)) { - return SchemaCompatibilityType.INCOMPATIBLE; + return SchemaCompatibilityResult.INCOMPATIBLE_NAME; } // reader symbols must contain all writer symbols: final Set symbols = new HashSet(writer.getEnumSymbols()); symbols.removeAll(reader.getEnumSymbols()); - // TODO: Report a human-readable error. - // if (!symbols.isEmpty()) { - // } return symbols.isEmpty() - ? SchemaCompatibilityType.COMPATIBLE - : SchemaCompatibilityType.INCOMPATIBLE; + ? SchemaCompatibilityResult.COMPATIBLE + : SchemaCompatibilityResult.INCOMPATIBLE_ENUM_MISSING_FIELDS; } case RECORD: { // record names must match: if (!schemaNameEquals(reader, writer)) { - return SchemaCompatibilityType.INCOMPATIBLE; + return SchemaCompatibilityResult.INCOMPATIBLE_NAME; } // Check that each field in the reader record can be populated from the writer record: @@ -318,28 +338,29 @@ private SchemaCompatibilityType calculateCompatibility( // reader field must have a default value. if (readerField.defaultValue() == null) { // reader field has no default value - return SchemaCompatibilityType.INCOMPATIBLE; + return SchemaCompatibilityResult.INCOMPATIBLE_MISSING_DEFAULT; } } else { - if (getCompatibility(readerField.schema(), writerField.schema()) - == SchemaCompatibilityType.INCOMPATIBLE) { - return SchemaCompatibilityType.INCOMPATIBLE; + SchemaCompatibilityResult compatibilityResult = getCompatibilityResult(readerField.schema(), writerField.schema()); + if (!compatibilityResult.isCompatible()) { + return compatibilityResult; } } } // All fields in the reader record can be populated from the writer record: - return SchemaCompatibilityType.COMPATIBLE; + return SchemaCompatibilityResult.COMPATIBLE; } case UNION: { // Check that each individual branch of the writer union can be decoded: for (final Schema writerBranch : writer.getTypes()) { - if (getCompatibility(reader, writerBranch) == SchemaCompatibilityType.INCOMPATIBLE) { - return SchemaCompatibilityType.INCOMPATIBLE; + SchemaCompatibilityResult schemaCompatibilityResult = getCompatibilityResult(reader, writerBranch); + if (!schemaCompatibilityResult.isCompatible()) { + return schemaCompatibilityResult; } } // Each schema in the writer union can be decoded with the reader: - return SchemaCompatibilityType.COMPATIBLE; + return SchemaCompatibilityResult.COMPATIBLE; } default: { @@ -352,48 +373,48 @@ private SchemaCompatibilityType calculateCompatibility( // Handle the corner case where writer is a union of a singleton branch: { X } === X if ((writer.getType() == Schema.Type.UNION) - && writer.getTypes().size() == 1) { - return getCompatibility(reader, writer.getTypes().get(0)); + && writer.getTypes().size() == 1) { + return getCompatibilityResult(reader, writer.getTypes().get(0)); } switch (reader.getType()) { - case NULL: return SchemaCompatibilityType.INCOMPATIBLE; - case BOOLEAN: return SchemaCompatibilityType.INCOMPATIBLE; - case INT: return SchemaCompatibilityType.INCOMPATIBLE; + case NULL: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; + case BOOLEAN: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; + case INT: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; case LONG: { return (writer.getType() == Type.INT) - ? SchemaCompatibilityType.COMPATIBLE - : SchemaCompatibilityType.INCOMPATIBLE; + ? SchemaCompatibilityResult.COMPATIBLE + : SchemaCompatibilityResult.INCOMPATIBLE_TYPE; } case FLOAT: { return ((writer.getType() == Type.INT) - || (writer.getType() == Type.LONG)) - ? SchemaCompatibilityType.COMPATIBLE - : SchemaCompatibilityType.INCOMPATIBLE; + || (writer.getType() == Type.LONG)) + ? SchemaCompatibilityResult.COMPATIBLE + : SchemaCompatibilityResult.INCOMPATIBLE_TYPE; } case DOUBLE: { return ((writer.getType() == Type.INT) - || (writer.getType() == Type.LONG) - || (writer.getType() == Type.FLOAT)) - ? SchemaCompatibilityType.COMPATIBLE - : SchemaCompatibilityType.INCOMPATIBLE; + || (writer.getType() == Type.LONG) + || (writer.getType() == Type.FLOAT)) + ? SchemaCompatibilityResult.COMPATIBLE + : SchemaCompatibilityResult.INCOMPATIBLE_TYPE; } - case BYTES: return SchemaCompatibilityType.INCOMPATIBLE; - case STRING: return SchemaCompatibilityType.INCOMPATIBLE; - case ARRAY: return SchemaCompatibilityType.INCOMPATIBLE; - case MAP: return SchemaCompatibilityType.INCOMPATIBLE; - case FIXED: return SchemaCompatibilityType.INCOMPATIBLE; - case ENUM: return SchemaCompatibilityType.INCOMPATIBLE; - case RECORD: return SchemaCompatibilityType.INCOMPATIBLE; + case BYTES: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; + case STRING: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; + case ARRAY: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; + case MAP: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; + case FIXED: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; + case ENUM: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; + case RECORD: return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; case UNION: { for (final Schema readerBranch : reader.getTypes()) { - if (getCompatibility(readerBranch, writer) == SchemaCompatibilityType.COMPATIBLE) { - return SchemaCompatibilityType.COMPATIBLE; + if (getCompatibilityResult(readerBranch, writer) == SchemaCompatibilityResult.COMPATIBLE) { + return SchemaCompatibilityResult.COMPATIBLE; } } // No branch in the reader union has been found compatible with the writer schema: - return SchemaCompatibilityType.INCOMPATIBLE; + return SchemaCompatibilityResult.INCOMPATIBLE_TYPE; } default: { @@ -415,6 +436,36 @@ public static enum SchemaCompatibilityType { RECURSION_IN_PROGRESS; } + /** + * Identifies the type of a schema compatibility result. + */ + public static enum SchemaCompatibilityResult { + COMPATIBLE(READER_WRITER_COMPATIBLE_MESSAGE), + + INCOMPATIBLE_NAME(READER_WRITER_INCOMPATABLE_MESSAGE + "Schema names must match."), + INCOMPATIBLE_SIZE(READER_WRITER_INCOMPATABLE_MESSAGE + " Fixed schemas are no the same size."), + INCOMPATIBLE_ENUM_MISSING_FIELDS(READER_WRITER_INCOMPATABLE_MESSAGE + " Reader schema is missing ENUM values."), + INCOMPATIBLE_MISSING_DEFAULT(READER_WRITER_INCOMPATABLE_MESSAGE + " New fields must have a default value."), + INCOMPATIBLE_TYPE(READER_WRITER_INCOMPATABLE_MESSAGE + " Schema types are incompatable."), + + /** Used internally to tag a reader/writer schema pair and prevent recursion. */ + RECURSION_IN_PROGRESS(""); + + private final String description; + + SchemaCompatibilityResult(String description) { + this.description = description; + } + + protected String description(Schema reader, Schema writer) { + return String.format(description, writer.toString(true), reader.toString(true)); + } + + public boolean isCompatible() { + return this == COMPATIBLE; + } + } + // ----------------------------------------------------------------------------------------------- /** From 62b155e222f74371efc07e719c04dc43ab2e8cd4 Mon Sep 17 00:00:00 2001 From: Kurt Harriger Date: Tue, 7 Apr 2015 12:06:12 -0600 Subject: [PATCH 2/4] AVRO-1649: Return more informative description Return SchemaPairCompatibility with a more informative incompatibility description --- .../org/apache/avro/SchemaCompatibility.java | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java index cd8ea58b300..0fe6d3cc21f 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java +++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java @@ -60,29 +60,14 @@ public static SchemaPairCompatibility checkReaderWriterCompatibility( final Schema reader, final Schema writer ) { - final SchemaCompatibilityType compatibility = + final SchemaCompatibilityResult compatibility = new ReaderWriterCompatiblityChecker() - .getCompatibility(reader, writer); - - final String message; - switch (compatibility) { - case INCOMPATIBLE: { - message = String.format( - "Data encoded using writer schema:%n%s%n" - + "will or may fail to decode using reader schema:%n%s%n", - writer.toString(true), - reader.toString(true)); - break; - } - case COMPATIBLE: { - message = READER_WRITER_COMPATIBLE_MESSAGE; - break; - } - default: throw new AvroRuntimeException("Unknown compatibility: " + compatibility); - } + .getCompatibilityResult(reader, writer); + + final String message = compatibility.description(reader, writer); return new SchemaPairCompatibility( - compatibility, + compatibility.isCompatible() ? SchemaCompatibilityType.COMPATIBLE : SchemaCompatibilityType.INCOMPATIBLE, reader, writer, message); From 2d420bd46d73138d0116d646f130e17add222633 Mon Sep 17 00:00:00 2001 From: Kurt Harriger Date: Mon, 17 Aug 2015 11:05:43 -0600 Subject: [PATCH 3/4] Cleanup descriptions --- .../org/apache/avro/SchemaCompatibility.java | 10 ++++----- .../apache/avro/TestSchemaCompatibility.java | 21 ++++++------------- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java index 0fe6d3cc21f..2c3247909fb 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java +++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java @@ -427,11 +427,11 @@ public static enum SchemaCompatibilityType { public static enum SchemaCompatibilityResult { COMPATIBLE(READER_WRITER_COMPATIBLE_MESSAGE), - INCOMPATIBLE_NAME(READER_WRITER_INCOMPATABLE_MESSAGE + "Schema names must match."), - INCOMPATIBLE_SIZE(READER_WRITER_INCOMPATABLE_MESSAGE + " Fixed schemas are no the same size."), - INCOMPATIBLE_ENUM_MISSING_FIELDS(READER_WRITER_INCOMPATABLE_MESSAGE + " Reader schema is missing ENUM values."), - INCOMPATIBLE_MISSING_DEFAULT(READER_WRITER_INCOMPATABLE_MESSAGE + " New fields must have a default value."), - INCOMPATIBLE_TYPE(READER_WRITER_INCOMPATABLE_MESSAGE + " Schema types are incompatable."), + INCOMPATIBLE_NAME(READER_WRITER_INCOMPATABLE_MESSAGE + ". Schema names must match."), + INCOMPATIBLE_SIZE(READER_WRITER_INCOMPATABLE_MESSAGE + ". Fixed schemas are no the same size."), + INCOMPATIBLE_ENUM_MISSING_FIELDS(READER_WRITER_INCOMPATABLE_MESSAGE + ". Reader schema is missing ENUM values."), + INCOMPATIBLE_MISSING_DEFAULT(READER_WRITER_INCOMPATABLE_MESSAGE + ". New fields must have a default value."), + INCOMPATIBLE_TYPE(READER_WRITER_INCOMPATABLE_MESSAGE + ". Schema types are incompatible."), /** Used internally to tag a reader/writer schema pair and prevent recursion. */ RECURSION_IN_PROGRESS(""); diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java index 9b8cde19b58..8e1b560522c 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaCompatibility.java @@ -244,11 +244,8 @@ public void testValidateSchemaNewField() throws Exception { SchemaCompatibility.SchemaCompatibilityType.INCOMPATIBLE, reader, WRITER_SCHEMA, - String.format( - "Data encoded using writer schema:%n%s%n" - + "will or may fail to decode using reader schema:%n%s%n", - WRITER_SCHEMA.toString(true), - reader.toString(true))); + SchemaCompatibility.SchemaCompatibilityResult.INCOMPATIBLE_MISSING_DEFAULT.description(reader,WRITER_SCHEMA) + ); // Test new field without default value. assertEquals(expectedResult, checkReaderWriterCompatibility(reader, WRITER_SCHEMA)); @@ -269,11 +266,8 @@ public void testValidateArrayWriterSchema() throws Exception { SchemaCompatibility.SchemaCompatibilityType.INCOMPATIBLE, invalidReader, STRING_ARRAY_SCHEMA, - String.format( - "Data encoded using writer schema:%n%s%n" - + "will or may fail to decode using reader schema:%n%s%n", - STRING_ARRAY_SCHEMA.toString(true), - invalidReader.toString(true))); + SchemaCompatibility.SchemaCompatibilityResult.INCOMPATIBLE_TYPE + .description(invalidReader, STRING_ARRAY_SCHEMA)); assertEquals( validResult, @@ -297,11 +291,8 @@ public void testValidatePrimitiveWriterSchema() throws Exception { SchemaCompatibility.SchemaCompatibilityType.INCOMPATIBLE, INT_SCHEMA, STRING_SCHEMA, - String.format( - "Data encoded using writer schema:%n%s%n" - + "will or may fail to decode using reader schema:%n%s%n", - STRING_SCHEMA.toString(true), - INT_SCHEMA.toString(true))); + SchemaCompatibility.SchemaCompatibilityResult.INCOMPATIBLE_TYPE + .description(INT_SCHEMA, STRING_SCHEMA)); assertEquals( validResult, From 7b77a043b521b228852223e5d7df64ca8edec532 Mon Sep 17 00:00:00 2001 From: Kurt Harriger Date: Tue, 18 Aug 2015 08:47:05 -0600 Subject: [PATCH 4/4] AVRO-1649 Fix typo in message --- .../avro/src/main/java/org/apache/avro/SchemaCompatibility.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java index 2c3247909fb..e39e9ad513a 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java +++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java @@ -428,7 +428,7 @@ public static enum SchemaCompatibilityResult { COMPATIBLE(READER_WRITER_COMPATIBLE_MESSAGE), INCOMPATIBLE_NAME(READER_WRITER_INCOMPATABLE_MESSAGE + ". Schema names must match."), - INCOMPATIBLE_SIZE(READER_WRITER_INCOMPATABLE_MESSAGE + ". Fixed schemas are no the same size."), + INCOMPATIBLE_SIZE(READER_WRITER_INCOMPATABLE_MESSAGE + ". Fixed schemas are not the same size."), INCOMPATIBLE_ENUM_MISSING_FIELDS(READER_WRITER_INCOMPATABLE_MESSAGE + ". Reader schema is missing ENUM values."), INCOMPATIBLE_MISSING_DEFAULT(READER_WRITER_INCOMPATABLE_MESSAGE + ". New fields must have a default value."), INCOMPATIBLE_TYPE(READER_WRITER_INCOMPATABLE_MESSAGE + ". Schema types are incompatible."),