Skip to content

Commit

Permalink
fix: fix Avro schema validation (#3499)
Browse files Browse the repository at this point in the history
* fix: use the right schema for checking compatibility

This patch fixes how we check schema compatibility for new streams/tables.
Previously, we had our own code for generating an avro schema that was just
used to generate the schema used to check for compatibility. The schema we
actually write to the registry is generated by the avro serializer and
avro converter. Now we use this schema when checking compatibility as well.

* refactor: clean up dead code

The previous commit switched us over to using the avro serializer to
generate avro schemas from ksql schemas. This patch cleans up the ksql
code for generating avro schemas.

* fix build

* fix tests
  • Loading branch information
rodesai authored Oct 9, 2019
1 parent 378b8af commit a59954d
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 520 deletions.
88 changes: 0 additions & 88 deletions ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@

package io.confluent.ksql.util;

import static org.apache.avro.Schema.create;
import static org.apache.avro.Schema.createArray;
import static org.apache.avro.Schema.createMap;
import static org.apache.avro.Schema.createUnion;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -28,7 +23,6 @@
import io.confluent.ksql.function.GenericsUtil;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.Operator;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import java.util.List;
import java.util.Map;
Expand All @@ -37,17 +31,13 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BiPredicate;
import org.apache.avro.LogicalTypes;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;

public final class SchemaUtil {

private static final String DEFAULT_NAMESPACE = "ksql";

public static final ColumnName ROWKEY_NAME = ColumnName.of("ROWKEY");
public static final ColumnName ROWTIME_NAME = ColumnName.of("ROWTIME");
public static final ColumnName WINDOWSTART_NAME = ColumnName.of("WINDOWSTART");
Expand Down Expand Up @@ -122,84 +112,6 @@ public static String buildAliasedFieldName(final String alias, final String fiel
return prefix + fieldName;
}

public static org.apache.avro.Schema buildAvroSchema(
final PersistenceSchema schema,
final String name
) {
return buildAvroSchema(DEFAULT_NAMESPACE, name, schema.serializedSchema());
}

private static org.apache.avro.Schema buildAvroSchema(
final String namespace,
final String name,
final Schema schema
) {
switch (schema.type()) {
case STRING:
return create(org.apache.avro.Schema.Type.STRING);
case BOOLEAN:
return create(org.apache.avro.Schema.Type.BOOLEAN);
case INT32:
return create(org.apache.avro.Schema.Type.INT);
case INT64:
return create(org.apache.avro.Schema.Type.LONG);
case FLOAT64:
return create(org.apache.avro.Schema.Type.DOUBLE);
case BYTES:
return createBytesSchema(schema);
case ARRAY:
return createArray(unionWithNull(buildAvroSchema(namespace, name, schema.valueSchema())));
case MAP:
return createMap(unionWithNull(buildAvroSchema(namespace, name, schema.valueSchema())));
case STRUCT:
return buildAvroSchemaFromStruct(namespace, name, schema);
default:
throw new KsqlException("Unsupported AVRO type: " + schema.type().name());
}
}

private static org.apache.avro.Schema createBytesSchema(
final Schema schema
) {
DecimalUtil.requireDecimal(schema);
return LogicalTypes.decimal(DecimalUtil.precision(schema), DecimalUtil.scale(schema))
.addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES));
}

private static org.apache.avro.Schema buildAvroSchemaFromStruct(
final String namespace,
final String name,
final Schema schema
) {
final String avroName = avroify(name);
final FieldAssembler<org.apache.avro.Schema> fieldAssembler = org.apache.avro.SchemaBuilder
.record(avroName)
.namespace(namespace)
.fields();

for (final Field field : schema.fields()) {
final String fieldName = avroify(field.name());
final String fieldNamespace = namespace + "." + avroName;

fieldAssembler
.name(fieldName)
.type(unionWithNull(buildAvroSchema(fieldNamespace, fieldName, field.schema())))
.withDefault(null);
}

return fieldAssembler.endRecord();
}

private static String avroify(final String name) {
return name
.replace(".", "_")
.replace("-", "_");
}

private static org.apache.avro.Schema unionWithNull(final org.apache.avro.Schema schema) {
return createUnion(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), schema);
}

public static String getFieldNameWithNoAlias(final String fieldName) {
final int idx = fieldName.indexOf(FIELD_NAME_DELIMITER);
if (idx < 0) {
Expand Down
257 changes: 0 additions & 257 deletions ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,263 +127,6 @@ public void shouldGetCorrectJavaClassForBytes() {
assertThat(decClazz, equalTo(BigDecimal.class));
}

@Test
public void shouldCreateCorrectAvroSchemaWithNullableFields() {
// Given:
final ConnectSchema schema = (ConnectSchema) SchemaBuilder.struct()
.field("ordertime", Schema.OPTIONAL_INT64_SCHEMA)
.field("orderid", Schema.OPTIONAL_STRING_SCHEMA)
.field("itemid", Schema.OPTIONAL_STRING_SCHEMA)
.field("orderunits", Schema.OPTIONAL_FLOAT64_SCHEMA)
.field("arraycol", SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional().build())
.field("mapcol",
SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA))
.optional()
.build();

// When:
final org.apache.avro.Schema avroSchema = SchemaUtil
.buildAvroSchema(PersistenceSchema.from(schema, false), "orders");

// Then:
assertThat(avroSchema.toString(), equalTo(
"{\"type\":\"record\",\"name\":\"orders\",\"namespace\":\"ksql\",\"fields\":"
+ "[{\"name\":\"ordertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":"
+ "\"orderid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"itemid\","
+ "\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"orderunits\",\"type\":"
+ "[\"null\",\"double\"],\"default\":null},{\"name\":\"arraycol\",\"type\":[\"null\","
+ "{\"type\":\"array\",\"items\":[\"null\",\"double\"]}],\"default\":null},{\"name\":"
+ "\"mapcol\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[\"null\",\"double\"]}]"
+ ",\"default\":null}]}"));
}

@Test
public void shouldSupportAvroStructs() {
// When:
final org.apache.avro.Schema avroSchema = SchemaUtil
.buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob");

// Then:
final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("RAW_STRUCT");
assertThat(rawStruct, is(notNullValue()));
assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION));
assertThat(rawStruct.schema().getTypes().get(0).getType(),
is(org.apache.avro.Schema.Type.NULL));
assertThat(rawStruct.schema().getTypes().get(1).toString(), is(
"{"
+ "\"type\":\"record\","
+ "\"name\":\"RAW_STRUCT\","
+ "\"namespace\":\"ksql.bob\","
+ "\"fields\":["
+ "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}"
+ "]}"
));
}

@Test
public void shouldSupportAvroArrayOfStructs() {
// When:
final org.apache.avro.Schema avroSchema = SchemaUtil
.buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob");

// Then:
final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("ARRAY_OF_STRUCTS");
assertThat(rawStruct, is(notNullValue()));
assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION));
assertThat(rawStruct.schema().getTypes().get(0).getType(),
is(org.apache.avro.Schema.Type.NULL));
assertThat(rawStruct.schema().getTypes().get(1).toString(), is(
"{"
+ "\"type\":\"array\","
+ "\"items\":["
+ "\"null\","
+ "{\"type\":\"record\","
+ "\"name\":\"ARRAY_OF_STRUCTS\","
+ "\"namespace\":\"ksql.bob\","
+ "\"fields\":["
+ "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}"
+ "]}]}"
));
}

@Test
public void shouldSupportAvroMapOfStructs() {
// When:
final org.apache.avro.Schema avroSchema = SchemaUtil
.buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob");

// Then:
final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("MAP_OF_STRUCTS");
assertThat(rawStruct, is(notNullValue()));
assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION));
assertThat(rawStruct.schema().getTypes().get(0).getType(),
is(org.apache.avro.Schema.Type.NULL));
assertThat(rawStruct.schema().getTypes().get(1).toString(), is(
"{"
+ "\"type\":\"map\","
+ "\"values\":["
+ "\"null\","
+ "{\"type\":\"record\","
+ "\"name\":\"MAP_OF_STRUCTS\","
+ "\"namespace\":\"ksql.bob\","
+ "\"fields\":["
+ "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}"
+ "]}]}"
));
}

@Test
public void shouldSupportAvroNestedStructs() {
// When:
final org.apache.avro.Schema avroSchema = SchemaUtil
.buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob");

// Then:
final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("NESTED_STRUCTS");
assertThat(rawStruct, is(notNullValue()));
assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION));
assertThat(rawStruct.schema().getTypes().get(0).getType(),
is(org.apache.avro.Schema.Type.NULL));

final String s0Schema = "{"
+ "\"type\":\"record\","
+ "\"name\":\"s0\","
+ "\"namespace\":\"ksql.bob.NESTED_STRUCTS\","
+ "\"fields\":["
+ "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}"
+ "]}";

final String ss0Schema = "{"
+ "\"type\":\"record\","
+ "\"name\":\"ss0\","
+ "\"namespace\":\"ksql.bob.NESTED_STRUCTS.s1\","
+ "\"fields\":["
+ "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}"
+ "]}";

final String s1Schema = "{"
+ "\"type\":\"record\","
+ "\"name\":\"s1\","
+ "\"namespace\":\"ksql.bob.NESTED_STRUCTS\","
+ "\"fields\":["
+ "{\"name\":\"ss0\",\"type\":[\"null\"," + ss0Schema + "],\"default\":null}"
+ "]}";

assertThat(rawStruct.schema().getTypes().get(1).toString(), is(
"{"
+ "\"type\":\"record\","
+ "\"name\":\"NESTED_STRUCTS\","
+ "\"namespace\":\"ksql.bob\","
+ "\"fields\":["
+ "{\"name\":\"s0\",\"type\":[\"null\"," + s0Schema + "],\"default\":null},"
+ "{\"name\":\"s1\",\"type\":[\"null\"," + s1Schema + "],\"default\":null}"
+ "]}"
));
}

@Test
public void shouldCreateAvroSchemaForBoolean() {
// Given:
final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_BOOLEAN_SCHEMA);

// When:
final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders");

// Then:
assertThat(avroSchema.toString(), equalTo("\"boolean\""));
}

@Test
public void shouldCreateAvroSchemaForInt() {
// Given:
final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_INT32_SCHEMA);

// When:
final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders");

// Then:
assertThat(avroSchema.toString(), equalTo("\"int\""));
}

@Test
public void shouldCreateAvroSchemaForBigInt() {
// Given:
final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_INT64_SCHEMA);

// When:
final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders");

// Then:
assertThat(avroSchema.toString(), equalTo("\"long\""));
}

@Test
public void shouldCreateAvroSchemaForDouble() {
// Given:
final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_FLOAT64_SCHEMA);

// When:
final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders");

// Then:
assertThat(avroSchema.toString(), equalTo("\"double\""));
}

@Test
public void shouldCreateAvroSchemaForString() {
// Given:
final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_STRING_SCHEMA);

// When:
final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders");

// Then:
assertThat(avroSchema.toString(), equalTo("\"string\""));
}

@Test
public void shouldCreateAvroSchemaForArray() {
// Given:
final PersistenceSchema schema = unwrappedPersistenceSchema(
SchemaBuilder
.array(Schema.OPTIONAL_INT64_SCHEMA)
.build()
);

// When:
final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders");

// Then:
assertThat(avroSchema.toString(), equalTo("{"
+ "\"type\":\"array\","
+ "\"items\":[\"null\",\"long\"]"
+ "}"));
}

@Test
public void shouldCreateAvroSchemaForMap() {
// Given:
final PersistenceSchema schema = unwrappedPersistenceSchema(
SchemaBuilder
.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.BOOLEAN_SCHEMA)
.build()
);

// When:
final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders");

// Then:
assertThat(avroSchema.toString(), equalTo("{"
+ "\"type\":\"map\","
+ "\"values\":[\"null\",\"boolean\"]"
+ "}"));
}

@Test
public void shouldGetTheCorrectJavaTypeForBoolean() {
final Schema schema = Schema.OPTIONAL_BOOLEAN_SCHEMA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private Optional<DdlCommand> maybeCreateSinkDDL(
);
}
final SchemaRegistryClient srClient = serviceContext.getSchemaRegistryClient();
AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient);
AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient, ksqlConfig);
return Optional.of(ddl);
}

Expand Down
Loading

0 comments on commit a59954d

Please sign in to comment.