From a59954ddb17c9e027f83b62f5b92f2e77b01a8ca Mon Sep 17 00:00:00 2001 From: Rohan Date: Wed, 9 Oct 2019 16:03:25 -0700 Subject: [PATCH] fix: fix Avro schema validation (#3499) * fix: use the right schema for checking compatibility This patch fixes how we check schema compatibility for new streams/tables. Previously, we had our own code for generating an avro schema that was just used to generate the schema used to check for compatibility. The schema we actually write to the registry is generated by the avro serializer and avro converter. Now we use this schema when checking compatibility as well. * refactor: clean up dead code The previous commit switched us over to using the avro serializer to generate avro schemas from ksql schemas. This patch cleans up the ksql code for generating avro schemas. * fix build * fix tests --- .../io/confluent/ksql/util/SchemaUtil.java | 88 ------ .../confluent/ksql/util/SchemaUtilTest.java | 257 ------------------ .../confluent/ksql/engine/EngineExecutor.java | 2 +- .../java/io/confluent/ksql/util/AvroUtil.java | 13 +- .../confluent/ksql/engine/KsqlEngineTest.java | 6 +- .../integration/IntegrationTestHarness.java | 10 +- .../io/confluent/ksql/util/AvroUtilTest.java | 68 +++-- .../query-validation-tests/elements.json | 2 +- .../query-validation-tests/serdes.json | 10 +- .../KsqlResourceFunctionalTest.java | 7 +- .../StandaloneExecutorFunctionalTest.java | 12 +- .../ksql/serde/avro/AvroDataTranslator.java | 134 +-------- .../ksql/serde/avro/AvroSchemas.java | 176 ++++++++++++ 13 files changed, 265 insertions(+), 520 deletions(-) create mode 100644 ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java index 4eab6c459778..c213f0c5c526 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java @@ -15,11 +15,6 @@ package io.confluent.ksql.util; -import static org.apache.avro.Schema.create; -import static org.apache.avro.Schema.createArray; -import static org.apache.avro.Schema.createMap; -import static org.apache.avro.Schema.createUnion; - import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -28,7 +23,6 @@ import io.confluent.ksql.function.GenericsUtil; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.Operator; -import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.schema.ksql.SchemaConverters; import java.util.List; import java.util.Map; @@ -37,8 +31,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.BiPredicate; -import org.apache.avro.LogicalTypes; -import org.apache.avro.SchemaBuilder.FieldAssembler; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; @@ -46,8 +38,6 @@ public final class SchemaUtil { - private static final String DEFAULT_NAMESPACE = "ksql"; - public static final ColumnName ROWKEY_NAME = ColumnName.of("ROWKEY"); public static final ColumnName ROWTIME_NAME = ColumnName.of("ROWTIME"); public static final ColumnName WINDOWSTART_NAME = ColumnName.of("WINDOWSTART"); @@ -122,84 +112,6 @@ public static String buildAliasedFieldName(final String alias, final String fiel return prefix + fieldName; } - public static org.apache.avro.Schema buildAvroSchema( - final PersistenceSchema schema, - final String name - ) { - return buildAvroSchema(DEFAULT_NAMESPACE, name, schema.serializedSchema()); - } - - private static org.apache.avro.Schema buildAvroSchema( - final String namespace, - final String name, - final Schema schema - ) { - switch (schema.type()) { - case STRING: - return create(org.apache.avro.Schema.Type.STRING); - case BOOLEAN: - return create(org.apache.avro.Schema.Type.BOOLEAN); - case INT32: - return create(org.apache.avro.Schema.Type.INT); - case INT64: - return create(org.apache.avro.Schema.Type.LONG); - case FLOAT64: - return create(org.apache.avro.Schema.Type.DOUBLE); - case BYTES: - return createBytesSchema(schema); - case ARRAY: - return createArray(unionWithNull(buildAvroSchema(namespace, name, schema.valueSchema()))); - case MAP: - return createMap(unionWithNull(buildAvroSchema(namespace, name, schema.valueSchema()))); - case STRUCT: - return buildAvroSchemaFromStruct(namespace, name, schema); - default: - throw new KsqlException("Unsupported AVRO type: " + schema.type().name()); - } - } - - private static org.apache.avro.Schema createBytesSchema( - final Schema schema - ) { - DecimalUtil.requireDecimal(schema); - return LogicalTypes.decimal(DecimalUtil.precision(schema), DecimalUtil.scale(schema)) - .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)); - } - - private static org.apache.avro.Schema buildAvroSchemaFromStruct( - final String namespace, - final String name, - final Schema schema - ) { - final String avroName = avroify(name); - final FieldAssembler fieldAssembler = org.apache.avro.SchemaBuilder - .record(avroName) - .namespace(namespace) - .fields(); - - for (final Field field : schema.fields()) { - final String fieldName = avroify(field.name()); - final String fieldNamespace = namespace + "." + avroName; - - fieldAssembler - .name(fieldName) - .type(unionWithNull(buildAvroSchema(fieldNamespace, fieldName, field.schema()))) - .withDefault(null); - } - - return fieldAssembler.endRecord(); - } - - private static String avroify(final String name) { - return name - .replace(".", "_") - .replace("-", "_"); - } - - private static org.apache.avro.Schema unionWithNull(final org.apache.avro.Schema schema) { - return createUnion(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), schema); - } - public static String getFieldNameWithNoAlias(final String fieldName) { final int idx = fieldName.indexOf(FIELD_NAME_DELIMITER); if (idx < 0) { diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java index 8ebb1aa9094a..f454e3407454 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java @@ -127,263 +127,6 @@ public void shouldGetCorrectJavaClassForBytes() { assertThat(decClazz, equalTo(BigDecimal.class)); } - @Test - public void shouldCreateCorrectAvroSchemaWithNullableFields() { - // Given: - final ConnectSchema schema = (ConnectSchema) SchemaBuilder.struct() - .field("ordertime", Schema.OPTIONAL_INT64_SCHEMA) - .field("orderid", Schema.OPTIONAL_STRING_SCHEMA) - .field("itemid", Schema.OPTIONAL_STRING_SCHEMA) - .field("orderunits", Schema.OPTIONAL_FLOAT64_SCHEMA) - .field("arraycol", SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional().build()) - .field("mapcol", - SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA)) - .optional() - .build(); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(schema, false), "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo( - "{\"type\":\"record\",\"name\":\"orders\",\"namespace\":\"ksql\",\"fields\":" - + "[{\"name\":\"ordertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":" - + "\"orderid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"itemid\"," - + "\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"orderunits\",\"type\":" - + "[\"null\",\"double\"],\"default\":null},{\"name\":\"arraycol\",\"type\":[\"null\"," - + "{\"type\":\"array\",\"items\":[\"null\",\"double\"]}],\"default\":null},{\"name\":" - + "\"mapcol\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[\"null\",\"double\"]}]" - + ",\"default\":null}]}")); - } - - @Test - public void shouldSupportAvroStructs() { - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob"); - - // Then: - final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("RAW_STRUCT"); - assertThat(rawStruct, is(notNullValue())); - assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION)); - assertThat(rawStruct.schema().getTypes().get(0).getType(), - is(org.apache.avro.Schema.Type.NULL)); - assertThat(rawStruct.schema().getTypes().get(1).toString(), is( - "{" - + "\"type\":\"record\"," - + "\"name\":\"RAW_STRUCT\"," - + "\"namespace\":\"ksql.bob\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}" - )); - } - - @Test - public void shouldSupportAvroArrayOfStructs() { - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob"); - - // Then: - final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("ARRAY_OF_STRUCTS"); - assertThat(rawStruct, is(notNullValue())); - assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION)); - assertThat(rawStruct.schema().getTypes().get(0).getType(), - is(org.apache.avro.Schema.Type.NULL)); - assertThat(rawStruct.schema().getTypes().get(1).toString(), is( - "{" - + "\"type\":\"array\"," - + "\"items\":[" - + "\"null\"," - + "{\"type\":\"record\"," - + "\"name\":\"ARRAY_OF_STRUCTS\"," - + "\"namespace\":\"ksql.bob\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}]}" - )); - } - - @Test - public void shouldSupportAvroMapOfStructs() { - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob"); - - // Then: - final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("MAP_OF_STRUCTS"); - assertThat(rawStruct, is(notNullValue())); - assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION)); - assertThat(rawStruct.schema().getTypes().get(0).getType(), - is(org.apache.avro.Schema.Type.NULL)); - assertThat(rawStruct.schema().getTypes().get(1).toString(), is( - "{" - + "\"type\":\"map\"," - + "\"values\":[" - + "\"null\"," - + "{\"type\":\"record\"," - + "\"name\":\"MAP_OF_STRUCTS\"," - + "\"namespace\":\"ksql.bob\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}]}" - )); - } - - @Test - public void shouldSupportAvroNestedStructs() { - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob"); - - // Then: - final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("NESTED_STRUCTS"); - assertThat(rawStruct, is(notNullValue())); - assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION)); - assertThat(rawStruct.schema().getTypes().get(0).getType(), - is(org.apache.avro.Schema.Type.NULL)); - - final String s0Schema = "{" - + "\"type\":\"record\"," - + "\"name\":\"s0\"," - + "\"namespace\":\"ksql.bob.NESTED_STRUCTS\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}"; - - final String ss0Schema = "{" - + "\"type\":\"record\"," - + "\"name\":\"ss0\"," - + "\"namespace\":\"ksql.bob.NESTED_STRUCTS.s1\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}"; - - final String s1Schema = "{" - + "\"type\":\"record\"," - + "\"name\":\"s1\"," - + "\"namespace\":\"ksql.bob.NESTED_STRUCTS\"," - + "\"fields\":[" - + "{\"name\":\"ss0\",\"type\":[\"null\"," + ss0Schema + "],\"default\":null}" - + "]}"; - - assertThat(rawStruct.schema().getTypes().get(1).toString(), is( - "{" - + "\"type\":\"record\"," - + "\"name\":\"NESTED_STRUCTS\"," - + "\"namespace\":\"ksql.bob\"," - + "\"fields\":[" - + "{\"name\":\"s0\",\"type\":[\"null\"," + s0Schema + "],\"default\":null}," - + "{\"name\":\"s1\",\"type\":[\"null\"," + s1Schema + "],\"default\":null}" - + "]}" - )); - } - - @Test - public void shouldCreateAvroSchemaForBoolean() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_BOOLEAN_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"boolean\"")); - } - - @Test - public void shouldCreateAvroSchemaForInt() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_INT32_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"int\"")); - } - - @Test - public void shouldCreateAvroSchemaForBigInt() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_INT64_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"long\"")); - } - - @Test - public void shouldCreateAvroSchemaForDouble() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_FLOAT64_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"double\"")); - } - - @Test - public void shouldCreateAvroSchemaForString() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_STRING_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"string\"")); - } - - @Test - public void shouldCreateAvroSchemaForArray() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema( - SchemaBuilder - .array(Schema.OPTIONAL_INT64_SCHEMA) - .build() - ); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("{" - + "\"type\":\"array\"," - + "\"items\":[\"null\",\"long\"]" - + "}")); - } - - @Test - public void shouldCreateAvroSchemaForMap() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema( - SchemaBuilder - .map(Schema.OPTIONAL_STRING_SCHEMA, Schema.BOOLEAN_SCHEMA) - .build() - ); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("{" - + "\"type\":\"map\"," - + "\"values\":[\"null\",\"boolean\"]" - + "}")); - } - @Test public void shouldGetTheCorrectJavaTypeForBoolean() { final Schema schema = Schema.OPTIONAL_BOOLEAN_SCHEMA; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index a712639239aa..994569fd60db 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -245,7 +245,7 @@ private Optional maybeCreateSinkDDL( ); } final SchemaRegistryClient srClient = serviceContext.getSchemaRegistryClient(); - AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient, ksqlConfig); return Optional.of(ddl); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java b/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java index 1d4e8f815ceb..21e4fec5747b 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java @@ -21,6 +21,8 @@ import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.avro.AvroSchemas; import java.io.IOException; import org.apache.http.HttpStatus; @@ -32,10 +34,12 @@ private AvroUtil() { public static void throwOnInvalidSchemaEvolution( final String statementText, final CreateSourceCommand ddl, - final SchemaRegistryClient schemaRegistryClient + final SchemaRegistryClient schemaRegistryClient, + final KsqlConfig ksqlConfig ) { final KsqlTopic topic = ddl.getTopic(); - if (topic.getValueFormat().getFormat() != Format.AVRO) { + final FormatInfo format = topic.getValueFormat().getFormatInfo(); + if (format.getFormat() != Format.AVRO) { return; } @@ -43,9 +47,10 @@ public static void throwOnInvalidSchemaEvolution( ddl.getSchema(), ddl.getSerdeOptions() ); - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema( + final org.apache.avro.Schema avroSchema = AvroSchemas.getAvroSchema( physicalSchema.valueSchema(), - ddl.getSourceName().name() + format.getAvroFullSchemaName().orElse(KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME), + ksqlConfig ); final String topicName = topic.getKafkaTopicName(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 8cebd9481f90..f8aa0edb7aff 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -480,15 +480,15 @@ public void shouldFailIfAvroSchemaNotEvolvable() { "Cannot register avro schema for T as the schema is incompatible with the current schema version registered for the topic.\n" + "KSQL schema: {" + "\"type\":\"record\"," + - "\"name\":\"T\"," + - "\"namespace\":\"ksql\"," + + "\"name\":\"KsqlDataSourceSchema\"," + + "\"namespace\":\"io.confluent.ksql.avro_schemas\"," + "\"fields\":[" + "{\"name\":\"COL0\",\"type\":[\"null\",\"long\"],\"default\":null}," + "{\"name\":\"COL1\",\"type\":[\"null\",\"string\"],\"default\":null}," + "{\"name\":\"COL2\",\"type\":[\"null\",\"string\"],\"default\":null}," + "{\"name\":\"COL3\",\"type\":[\"null\",\"double\"],\"default\":null}," + "{\"name\":\"COL4\",\"type\":[\"null\",\"boolean\"],\"default\":null}" + - "]" + + "],\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"" + "}\n" + "Registered schema: \"int\""))); expectedException.expect(statementText(is( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java index 687ff76e0c6a..280fe131a6f3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java @@ -29,6 +29,7 @@ import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.GenericRowSerDe; +import io.confluent.ksql.serde.avro.AvroSchemas; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; @@ -551,11 +552,14 @@ private Deserializer getDeserializer( ).deserializer(); } - public void ensureSchema(final String topicName, final PhysicalSchema schema) { + public void ensureSchema( + final String topicName, + final PhysicalSchema schema, + final KsqlConfig ksqlConfig) { final SchemaRegistryClient srClient = serviceContext.get().getSchemaRegistryClient(); try { - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(schema.valueSchema(), "test-" + topicName); + final org.apache.avro.Schema avroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), "test_" + topicName, ksqlConfig); srClient.register(topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, avroSchema); } catch (final Exception e) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java index c660a85771e9..c5b6f68c9e4b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java @@ -37,14 +37,17 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.SchemaConverters.ConnectToSqlTypeConverter; +import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; +import io.confluent.ksql.serde.avro.AvroSchemas; import io.confluent.ksql.serde.connect.ConnectSchemaTranslator; import java.io.IOException; import java.util.Collections; +import java.util.Optional; import org.apache.kafka.connect.data.Schema; import org.junit.Before; import org.junit.Rule; @@ -57,7 +60,7 @@ @RunWith(MockitoJUnitRunner.class) public class AvroUtilTest { - private static final SourceName STREAM_NAME = SourceName.of("some-stream"); + private static final SourceName STREAM_NAME = SourceName.of("some_stream"); private static final String AVRO_SCHEMA_STRING = "{" + "\"namespace\": \"some.namespace\"," @@ -90,10 +93,17 @@ public class AvroUtilTest { private static final LogicalSchema SINGLE_FIELD_SCHEMA = toKsqlSchema(SINGLE_FIELD_AVRO_SCHEMA_STRING); + private static final LogicalSchema SCHEMA_WITH_MAPS = LogicalSchema.builder() + .valueColumn(ColumnName.of("notmap"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("mapcol"), SqlTypes.map(SqlTypes.INTEGER)) + .build(); + + private static final String SCHEMA_NAME = "schema_name"; + private static final KsqlTopic RESULT_TOPIC = new KsqlTopic( "actual-name", KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)), - ValueFormat.of(FormatInfo.of(Format.AVRO)), + ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of(SCHEMA_NAME), Optional.empty())), false); @Rule @@ -104,11 +114,12 @@ public class AvroUtilTest { @Mock private CreateSourceCommand ddlCommand; + private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap()); + @Before public void setUp() { when(ddlCommand.getSerdeOptions()).thenReturn(SerdeOption.none()); when(ddlCommand.getSchema()).thenReturn(MUTLI_FIELD_SCHEMA); - when(ddlCommand.getSourceName()).thenReturn(STREAM_NAME); when(ddlCommand.getTopic()).thenReturn(RESULT_TOPIC); } @@ -118,7 +129,7 @@ public void shouldValidateSchemaEvolutionWithCorrectSubject() throws Exception { when(srClient.testCompatibility(anyString(), any())).thenReturn(true); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); // Then: verify(srClient).testCompatibility(eq(RESULT_TOPIC.getKafkaTopicName() + "-value"), any()); @@ -129,12 +140,31 @@ public void shouldValidateSchemaEvolutionWithCorrectSchema() throws Exception { // Given: final PhysicalSchema schema = PhysicalSchema.from(MUTLI_FIELD_SCHEMA, SerdeOption.none()); - final org.apache.avro.Schema expectedAvroSchema = SchemaUtil - .buildAvroSchema(schema.valueSchema(), STREAM_NAME.name()); + final org.apache.avro.Schema expectedAvroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), SCHEMA_NAME, ksqlConfig); + when(srClient.testCompatibility(anyString(), any())).thenReturn(true); + + // When: + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); + + // Then: + verify(srClient).testCompatibility(any(), eq(expectedAvroSchema)); + } + + @Test + public void shouldValidateSchemaWithMaps() throws Exception { + // Given: + when(ddlCommand.getSchema()).thenReturn(SCHEMA_WITH_MAPS); + final PhysicalSchema schema = PhysicalSchema + .from(SCHEMA_WITH_MAPS, SerdeOption.none()); + when(srClient.testCompatibility(anyString(), any())).thenReturn(true); + final org.apache.avro.Schema expectedAvroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), SCHEMA_NAME, ksqlConfig); + // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); // Then: verify(srClient).testCompatibility(any(), eq(expectedAvroSchema)); @@ -149,11 +179,11 @@ public void shouldValidateWrappedSingleFieldSchemaEvolution() throws Exception { when(srClient.testCompatibility(anyString(), any())).thenReturn(true); - final org.apache.avro.Schema expectedAvroSchema = SchemaUtil - .buildAvroSchema(schema.valueSchema(), STREAM_NAME.name()); + final org.apache.avro.Schema expectedAvroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), SCHEMA_NAME, ksqlConfig); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); // Then: verify(srClient).testCompatibility(any(), eq(expectedAvroSchema)); @@ -170,11 +200,11 @@ public void shouldValidateUnwrappedSingleFieldSchemaEvolution() throws Exception when(srClient.testCompatibility(anyString(), any())).thenReturn(true); - final org.apache.avro.Schema expectedAvroSchema = SchemaUtil - .buildAvroSchema(schema.valueSchema(), STREAM_NAME.name()); + final org.apache.avro.Schema expectedAvroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), SCHEMA_NAME, ksqlConfig); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); // Then: verify(srClient).testCompatibility(any(), eq(expectedAvroSchema)); @@ -186,7 +216,7 @@ public void shouldNotThrowInvalidEvolution() throws Exception { when(srClient.testCompatibility(any(), any())).thenReturn(true); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -198,7 +228,7 @@ public void shouldReturnInvalidEvolution() throws Exception { expectedException.expectMessage("Cannot register avro schema for actual-name as the schema is incompatible with the current schema version registered for the topic"); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -208,7 +238,7 @@ public void shouldNotThrowInvalidEvolutionIfSubjectNotRegistered() throws Except .thenThrow(new RestClientException("Unknown subject", 404, 40401)); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -227,7 +257,7 @@ public void shouldThrowOnSrAuthorizationErrors() throws Exception { ))); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -241,7 +271,7 @@ public void shouldThrowOnAnyOtherEvolutionSrException() throws Exception { expectedException.expectMessage("Could not connect to Schema Registry service"); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -255,7 +285,7 @@ public void shouldThrowOnAnyOtherEvolutionIOException() throws Exception { expectedException.expectMessage("Could not check Schema compatibility"); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } private static LogicalSchema toKsqlSchema(final String avroSchemaString) { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json index f87d8603e729..5e4b66647bca 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -329,7 +329,7 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Cannot register avro schema for OUTPUT as the schema is incompatible with the current schema version registered for the topic.\nKSQL schema: {\"type\":\"record\",\"name\":\"OUTPUT\",\"namespace\":\"ksql\",\"fields\":[{\"name\":\"C1\",\"type\":[\"null\",\"int\"],\"default\":null}]}\nRegistered schema: {\"type\":\"record\",\"name\":\"blah\",\"fields\":[{\"name\":\"C1\",\"type\":\"double\"}]}" + "message": "Cannot register avro schema for OUTPUT as the schema is incompatible with the current schema version registered for the topic.\nKSQL schema: {\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"C1\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}\nRegistered schema: {\"type\":\"record\",\"name\":\"blah\",\"fields\":[{\"name\":\"C1\",\"type\":\"double\"}]}" }, "topics": [ { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json b/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json index e65a5f63a955..0e842b147295 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json @@ -207,7 +207,7 @@ }, { "name": "OUTPUT", - "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type": "map", "values": ["null", "int"]}]}]}, + "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","int"],"default":null}]}}]}]}, "format": "{FORMAT}" } ], @@ -245,7 +245,7 @@ }, { "name": "OUTPUT", - "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type": "map", "values": ["null", "int"]}]}]}, + "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","int"],"default":null}]}}]}]}, "format": "AVRO" } ], @@ -280,7 +280,7 @@ }, { "name": "OUTPUT", - "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type": "map", "values": ["null", "int"]}]}]}, + "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","int"],"default":null}]}}]}]}, "format": "{FORMAT}" } ], @@ -500,7 +500,7 @@ "topics": [ { "name": "OUTPUT", - "schema": {"type": "map", "values": ["null", "double"]}, + "schema": {"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","double"],"default":null}]}}, "format": "{FORMAT}" } ], @@ -527,7 +527,7 @@ "topics": [ { "name": "OUTPUT", - "schema": {"name": "ignored", "type": "record", "fields": [{"name": "FOO", "type": ["null",{"type": "map", "values": ["null", "double"]}]}]}, + "schema": {"name": "ignored", "type": "record", "fields": [{"name": "FOO", "type": ["null",{"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","double"],"default":null}]}}]}]}, "format": "{FORMAT}" } ], diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/KsqlResourceFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/KsqlResourceFunctionalTest.java index 08e7d9bd2f5b..762cb6c0da47 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/KsqlResourceFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/KsqlResourceFunctionalTest.java @@ -38,6 +38,8 @@ import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.avro.AvroSchemas; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.SchemaUtil; import java.util.List; @@ -158,9 +160,10 @@ public void shouldInsertIntoValuesForAvroTopic() throws Exception { TEST_HARNESS.getSchemaRegistryClient() .register( "books" + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, - SchemaUtil.buildAvroSchema( + AvroSchemas.getAvroSchema( schema.valueSchema(), - "books" + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX + "books_value", + new KsqlConfig(REST_APP.getBaseConfig()) ) ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java index 8751a23a57cd..e583259ac971 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java @@ -84,6 +84,7 @@ public class StandaloneExecutorFunctionalTest { private String s1; private String s2; private String t1; + private KsqlConfig ksqlConfig; @BeforeClass public static void classSetUp() { @@ -103,8 +104,7 @@ public void setUp() throws Exception { .putAll(KsqlConfigTestUtil.baseTestConfig()) .put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, TEST_HARNESS.kafkaBootstrapServers()) .build(); - - final KsqlConfig ksqlConfig = new KsqlConfig(properties); + ksqlConfig = new KsqlConfig(properties); when(configStore.getKsqlConfig()).thenReturn(ksqlConfig); @@ -256,7 +256,7 @@ public void shouldFailOnAvroWithoutSchemasIfSchemaNotAvailable() { @Test public void shouldFailOnAvroWithoutSchemasIfSchemaNotEvolvable() { // Given: - givenIncompatibleSchemaExists(s1); + givenIncompatibleSchemaExists(s1, ksqlConfig); givenScript("" + "SET 'auto.offset.reset' = 'earliest';" @@ -297,7 +297,9 @@ public void shouldHandleComments() { TEST_HARNESS.verifyAvailableRows(s1, DATA_SIZE, JSON, DATA_SCHEMA); } - private static void givenIncompatibleSchemaExists(final String topicName) { + private static void givenIncompatibleSchemaExists( + final String topicName, + final KsqlConfig ksqlConfig) { final LogicalSchema logical = LogicalSchema.builder() .valueColumn(ColumnName.of("ORDERID"), SqlTypes.struct() .field("fred", SqlTypes.INTEGER) @@ -310,7 +312,7 @@ private static void givenIncompatibleSchemaExists(final String topicName) { SerdeOption.none() ); - TEST_HARNESS.ensureSchema(topicName, incompatiblePhysical); + TEST_HARNESS.ensureSchema(topicName, incompatiblePhysical, ksqlConfig); } private void givenScript(final String contents) { diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java index 27ce0df99626..e7eeb1174d4e 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java @@ -15,18 +15,13 @@ package io.confluent.ksql.serde.avro; -import static java.util.Objects.requireNonNull; - import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import io.confluent.ksql.schema.connect.SchemaWalker; import io.confluent.ksql.schema.connect.SchemaWalker.Visitor; import io.confluent.ksql.serde.connect.ConnectDataTranslator; import io.confluent.ksql.serde.connect.DataTranslator; import io.confluent.ksql.util.DecimalUtil; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -35,7 +30,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; -import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; /** @@ -60,9 +54,8 @@ public class AvroDataTranslator implements DataTranslator { ) { this.ksqlSchema = throwOnInvalidSchema(Objects.requireNonNull(schema, "schema")); - this.avroCompatibleSchema = buildAvroCompatibleSchema( - this.ksqlSchema, - new Context(Collections.singleton(schemaFullName), useNamedMaps, true) + this.avroCompatibleSchema = AvroSchemas.getAvroCompatibleConnectSchema( + schema, schemaFullName, useNamedMaps ); this.innerTranslator = new ConnectDataTranslator(avroCompatibleSchema); @@ -106,129 +99,6 @@ private static Struct convertStruct( return struct; } - private static final class Context { - private static final String DELIMITER = "_"; - - static final String MAP_KEY_NAME = "MapKey"; - static final String MAP_VALUE_NAME = "MapValue"; - - private final Iterable names; - private final boolean useNamedMaps; - private boolean root; - - private Context( - final Iterable names, - final boolean useNamedMaps, - final boolean root - ) { - this.names = requireNonNull(names, "names"); - this.useNamedMaps = useNamedMaps; - this.root = root; - } - - Context with(final String name) { - return new Context(Iterables.concat(names, ImmutableList.of(name)), useNamedMaps, root); - } - - public String name() { - return String.join(DELIMITER, names); - } - } - - private static String avroCompatibleFieldName(final Field field) { - // Currently the only incompatible field names expected are fully qualified - // column identifiers. Once quoted identifier support is introduced we will - // need to implement something more generic here. - return field.name().replace(".", "_"); - } - - private static Schema buildAvroCompatibleSchema( - final Schema schema, - final Context context - ) { - final boolean notRoot = !context.root; - context.root = false; - - final SchemaBuilder schemaBuilder; - switch (schema.type()) { - default: - if (notRoot || !schema.isOptional()) { - return schema; - } - - schemaBuilder = new SchemaBuilder(schema.type()); - break; - - case STRUCT: - schemaBuilder = buildAvroCompatibleStruct(schema, context); - break; - - case ARRAY: - schemaBuilder = buildAvroCompatibleArray(schema, context); - break; - - case MAP: - schemaBuilder = buildAvroCompatibleMap(schema, context); - break; - } - - if (schema.isOptional() && notRoot) { - schemaBuilder.optional(); - } - - return schemaBuilder.build(); - } - - private static SchemaBuilder buildAvroCompatibleMap( - final Schema schema, final Context context - ) { - final Schema keySchema = - buildAvroCompatibleSchema(schema.keySchema(), context.with(Context.MAP_KEY_NAME)); - - final Schema valueSchema = - buildAvroCompatibleSchema(schema.valueSchema(), context.with(Context.MAP_VALUE_NAME)); - - final SchemaBuilder schemaBuilder = SchemaBuilder.map( - keySchema, - valueSchema - ); - - if (context.useNamedMaps) { - schemaBuilder.name(context.name()); - } - - return schemaBuilder; - } - - private static SchemaBuilder buildAvroCompatibleArray( - final Schema schema, - final Context context - ) { - final Schema valueSchema = buildAvroCompatibleSchema(schema.valueSchema(), context); - - return SchemaBuilder.array(valueSchema); - } - - private static SchemaBuilder buildAvroCompatibleStruct( - final Schema schema, - final Context context - ) { - final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); - - if (schema.name() == null) { - schemaBuilder.name(context.name()); - } - - for (final Field f : schema.fields()) { - final String fieldName = avroCompatibleFieldName(f); - final Schema fieldSchema = buildAvroCompatibleSchema(f.schema(), context.with(f.name())); - - schemaBuilder.field(fieldName, fieldSchema); - } - - return schemaBuilder; - } - @SuppressWarnings("unchecked") private static Object replaceSchema(final Schema schema, final Object object) { if (object == null) { diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java new file mode 100644 index 000000000000..3bdf752f7d74 --- /dev/null +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java @@ -0,0 +1,176 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde.avro; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import io.confluent.connect.avro.AvroData; +import io.confluent.ksql.schema.ksql.PersistenceSchema; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Collections; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +public final class AvroSchemas { + private AvroSchemas() { + } + + public static org.apache.avro.Schema getAvroSchema( + final PersistenceSchema schema, + final String name, + final KsqlConfig ksqlConfig) { + final boolean useNamedMaps = ksqlConfig.getBoolean(KsqlConfig.KSQL_USE_NAMED_AVRO_MAPS); + return new AvroData(0).fromConnectSchema( + getAvroCompatibleConnectSchema(schema.serializedSchema(), name, useNamedMaps) + ); + } + + public static Schema getAvroCompatibleConnectSchema( + final Schema schema, + final String schemaFullName, + final boolean useNamedMaps) { + return buildAvroCompatibleSchema( + schema, + new Context(Collections.singleton(schemaFullName), useNamedMaps, true) + ); + } + + private static final class Context { + private static final String DELIMITER = "_"; + + static final String MAP_KEY_NAME = "MapKey"; + static final String MAP_VALUE_NAME = "MapValue"; + + private final Iterable names; + private final boolean useNamedMaps; + private boolean root; + + private Context( + final Iterable names, + final boolean useNamedMaps, + final boolean root + ) { + this.names = requireNonNull(names, "names"); + this.useNamedMaps = useNamedMaps; + this.root = root; + } + + Context with(final String name) { + return new Context(Iterables.concat(names, ImmutableList.of(name)), useNamedMaps, root); + } + + public String name() { + return String.join(DELIMITER, names); + } + } + + private static String avroCompatibleFieldName(final Field field) { + // Currently the only incompatible field names expected are fully qualified + // column identifiers. Once quoted identifier support is introduced we will + // need to implement something more generic here. + return field.name().replace(".", "_"); + } + + private static Schema buildAvroCompatibleSchema( + final Schema schema, + final Context context + ) { + final boolean notRoot = !context.root; + context.root = false; + + final SchemaBuilder schemaBuilder; + switch (schema.type()) { + default: + if (notRoot || !schema.isOptional()) { + return schema; + } + + schemaBuilder = new SchemaBuilder(schema.type()); + break; + + case STRUCT: + schemaBuilder = buildAvroCompatibleStruct(schema, context); + break; + + case ARRAY: + schemaBuilder = buildAvroCompatibleArray(schema, context); + break; + + case MAP: + schemaBuilder = buildAvroCompatibleMap(schema, context); + break; + } + + if (schema.isOptional() && notRoot) { + schemaBuilder.optional(); + } + + return schemaBuilder.build(); + } + + private static SchemaBuilder buildAvroCompatibleMap( + final Schema schema, final Context context + ) { + final Schema keySchema = + buildAvroCompatibleSchema(schema.keySchema(), context.with(Context.MAP_KEY_NAME)); + + final Schema valueSchema = + buildAvroCompatibleSchema(schema.valueSchema(), context.with(Context.MAP_VALUE_NAME)); + + final SchemaBuilder schemaBuilder = SchemaBuilder.map( + keySchema, + valueSchema + ); + + if (context.useNamedMaps) { + schemaBuilder.name(context.name()); + } + + return schemaBuilder; + } + + private static SchemaBuilder buildAvroCompatibleArray( + final Schema schema, + final Context context + ) { + final Schema valueSchema = buildAvroCompatibleSchema(schema.valueSchema(), context); + + return SchemaBuilder.array(valueSchema); + } + + private static SchemaBuilder buildAvroCompatibleStruct( + final Schema schema, + final Context context + ) { + final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + + if (schema.name() == null) { + schemaBuilder.name(context.name()); + } + + for (final Field f : schema.fields()) { + final String fieldName = avroCompatibleFieldName(f); + final Schema fieldSchema = buildAvroCompatibleSchema(f.schema(), context.with(f.name())); + + schemaBuilder.field(fieldName, fieldSchema); + } + + return schemaBuilder; + } +}