diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java index 4eab6c459778..c213f0c5c526 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java @@ -15,11 +15,6 @@ package io.confluent.ksql.util; -import static org.apache.avro.Schema.create; -import static org.apache.avro.Schema.createArray; -import static org.apache.avro.Schema.createMap; -import static org.apache.avro.Schema.createUnion; - import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -28,7 +23,6 @@ import io.confluent.ksql.function.GenericsUtil; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.Operator; -import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.schema.ksql.SchemaConverters; import java.util.List; import java.util.Map; @@ -37,8 +31,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.BiPredicate; -import org.apache.avro.LogicalTypes; -import org.apache.avro.SchemaBuilder.FieldAssembler; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; @@ -46,8 +38,6 @@ public final class SchemaUtil { - private static final String DEFAULT_NAMESPACE = "ksql"; - public static final ColumnName ROWKEY_NAME = ColumnName.of("ROWKEY"); public static final ColumnName ROWTIME_NAME = ColumnName.of("ROWTIME"); public static final ColumnName WINDOWSTART_NAME = ColumnName.of("WINDOWSTART"); @@ -122,84 +112,6 @@ public static String buildAliasedFieldName(final String alias, final String fiel return prefix + fieldName; } - public static org.apache.avro.Schema buildAvroSchema( - final PersistenceSchema schema, - final String name - ) { - return buildAvroSchema(DEFAULT_NAMESPACE, name, schema.serializedSchema()); - } - - private static org.apache.avro.Schema buildAvroSchema( - final String namespace, - final String name, - final Schema schema - ) { - switch (schema.type()) { - case STRING: - return create(org.apache.avro.Schema.Type.STRING); - case BOOLEAN: - return create(org.apache.avro.Schema.Type.BOOLEAN); - case INT32: - return create(org.apache.avro.Schema.Type.INT); - case INT64: - return create(org.apache.avro.Schema.Type.LONG); - case FLOAT64: - return create(org.apache.avro.Schema.Type.DOUBLE); - case BYTES: - return createBytesSchema(schema); - case ARRAY: - return createArray(unionWithNull(buildAvroSchema(namespace, name, schema.valueSchema()))); - case MAP: - return createMap(unionWithNull(buildAvroSchema(namespace, name, schema.valueSchema()))); - case STRUCT: - return buildAvroSchemaFromStruct(namespace, name, schema); - default: - throw new KsqlException("Unsupported AVRO type: " + schema.type().name()); - } - } - - private static org.apache.avro.Schema createBytesSchema( - final Schema schema - ) { - DecimalUtil.requireDecimal(schema); - return LogicalTypes.decimal(DecimalUtil.precision(schema), DecimalUtil.scale(schema)) - .addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES)); - } - - private static org.apache.avro.Schema buildAvroSchemaFromStruct( - final String namespace, - final String name, - final Schema schema - ) { - final String avroName = avroify(name); - final FieldAssembler fieldAssembler = org.apache.avro.SchemaBuilder - .record(avroName) - .namespace(namespace) - .fields(); - - for (final Field field : schema.fields()) { - final String fieldName = avroify(field.name()); - final String fieldNamespace = namespace + "." + avroName; - - fieldAssembler - .name(fieldName) - .type(unionWithNull(buildAvroSchema(fieldNamespace, fieldName, field.schema()))) - .withDefault(null); - } - - return fieldAssembler.endRecord(); - } - - private static String avroify(final String name) { - return name - .replace(".", "_") - .replace("-", "_"); - } - - private static org.apache.avro.Schema unionWithNull(final org.apache.avro.Schema schema) { - return createUnion(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL), schema); - } - public static String getFieldNameWithNoAlias(final String fieldName) { final int idx = fieldName.indexOf(FIELD_NAME_DELIMITER); if (idx < 0) { diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java index 8ebb1aa9094a..f454e3407454 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java @@ -127,263 +127,6 @@ public void shouldGetCorrectJavaClassForBytes() { assertThat(decClazz, equalTo(BigDecimal.class)); } - @Test - public void shouldCreateCorrectAvroSchemaWithNullableFields() { - // Given: - final ConnectSchema schema = (ConnectSchema) SchemaBuilder.struct() - .field("ordertime", Schema.OPTIONAL_INT64_SCHEMA) - .field("orderid", Schema.OPTIONAL_STRING_SCHEMA) - .field("itemid", Schema.OPTIONAL_STRING_SCHEMA) - .field("orderunits", Schema.OPTIONAL_FLOAT64_SCHEMA) - .field("arraycol", SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional().build()) - .field("mapcol", - SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA)) - .optional() - .build(); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(schema, false), "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo( - "{\"type\":\"record\",\"name\":\"orders\",\"namespace\":\"ksql\",\"fields\":" - + "[{\"name\":\"ordertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":" - + "\"orderid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"itemid\"," - + "\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"orderunits\",\"type\":" - + "[\"null\",\"double\"],\"default\":null},{\"name\":\"arraycol\",\"type\":[\"null\"," - + "{\"type\":\"array\",\"items\":[\"null\",\"double\"]}],\"default\":null},{\"name\":" - + "\"mapcol\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[\"null\",\"double\"]}]" - + ",\"default\":null}]}")); - } - - @Test - public void shouldSupportAvroStructs() { - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob"); - - // Then: - final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("RAW_STRUCT"); - assertThat(rawStruct, is(notNullValue())); - assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION)); - assertThat(rawStruct.schema().getTypes().get(0).getType(), - is(org.apache.avro.Schema.Type.NULL)); - assertThat(rawStruct.schema().getTypes().get(1).toString(), is( - "{" - + "\"type\":\"record\"," - + "\"name\":\"RAW_STRUCT\"," - + "\"namespace\":\"ksql.bob\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}" - )); - } - - @Test - public void shouldSupportAvroArrayOfStructs() { - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob"); - - // Then: - final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("ARRAY_OF_STRUCTS"); - assertThat(rawStruct, is(notNullValue())); - assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION)); - assertThat(rawStruct.schema().getTypes().get(0).getType(), - is(org.apache.avro.Schema.Type.NULL)); - assertThat(rawStruct.schema().getTypes().get(1).toString(), is( - "{" - + "\"type\":\"array\"," - + "\"items\":[" - + "\"null\"," - + "{\"type\":\"record\"," - + "\"name\":\"ARRAY_OF_STRUCTS\"," - + "\"namespace\":\"ksql.bob\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}]}" - )); - } - - @Test - public void shouldSupportAvroMapOfStructs() { - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob"); - - // Then: - final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("MAP_OF_STRUCTS"); - assertThat(rawStruct, is(notNullValue())); - assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION)); - assertThat(rawStruct.schema().getTypes().get(0).getType(), - is(org.apache.avro.Schema.Type.NULL)); - assertThat(rawStruct.schema().getTypes().get(1).toString(), is( - "{" - + "\"type\":\"map\"," - + "\"values\":[" - + "\"null\"," - + "{\"type\":\"record\"," - + "\"name\":\"MAP_OF_STRUCTS\"," - + "\"namespace\":\"ksql.bob\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}]}" - )); - } - - @Test - public void shouldSupportAvroNestedStructs() { - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(PersistenceSchema.from(SCHEMA, false), "bob"); - - // Then: - final org.apache.avro.Schema.Field rawStruct = avroSchema.getField("NESTED_STRUCTS"); - assertThat(rawStruct, is(notNullValue())); - assertThat(rawStruct.schema().getType(), is(org.apache.avro.Schema.Type.UNION)); - assertThat(rawStruct.schema().getTypes().get(0).getType(), - is(org.apache.avro.Schema.Type.NULL)); - - final String s0Schema = "{" - + "\"type\":\"record\"," - + "\"name\":\"s0\"," - + "\"namespace\":\"ksql.bob.NESTED_STRUCTS\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}"; - - final String ss0Schema = "{" - + "\"type\":\"record\"," - + "\"name\":\"ss0\"," - + "\"namespace\":\"ksql.bob.NESTED_STRUCTS.s1\"," - + "\"fields\":[" - + "{\"name\":\"f0\",\"type\":[\"null\",\"long\"],\"default\":null}," - + "{\"name\":\"f1\",\"type\":[\"null\",\"boolean\"],\"default\":null}" - + "]}"; - - final String s1Schema = "{" - + "\"type\":\"record\"," - + "\"name\":\"s1\"," - + "\"namespace\":\"ksql.bob.NESTED_STRUCTS\"," - + "\"fields\":[" - + "{\"name\":\"ss0\",\"type\":[\"null\"," + ss0Schema + "],\"default\":null}" - + "]}"; - - assertThat(rawStruct.schema().getTypes().get(1).toString(), is( - "{" - + "\"type\":\"record\"," - + "\"name\":\"NESTED_STRUCTS\"," - + "\"namespace\":\"ksql.bob\"," - + "\"fields\":[" - + "{\"name\":\"s0\",\"type\":[\"null\"," + s0Schema + "],\"default\":null}," - + "{\"name\":\"s1\",\"type\":[\"null\"," + s1Schema + "],\"default\":null}" - + "]}" - )); - } - - @Test - public void shouldCreateAvroSchemaForBoolean() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_BOOLEAN_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"boolean\"")); - } - - @Test - public void shouldCreateAvroSchemaForInt() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_INT32_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"int\"")); - } - - @Test - public void shouldCreateAvroSchemaForBigInt() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_INT64_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"long\"")); - } - - @Test - public void shouldCreateAvroSchemaForDouble() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_FLOAT64_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"double\"")); - } - - @Test - public void shouldCreateAvroSchemaForString() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema(Schema.OPTIONAL_STRING_SCHEMA); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("\"string\"")); - } - - @Test - public void shouldCreateAvroSchemaForArray() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema( - SchemaBuilder - .array(Schema.OPTIONAL_INT64_SCHEMA) - .build() - ); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("{" - + "\"type\":\"array\"," - + "\"items\":[\"null\",\"long\"]" - + "}")); - } - - @Test - public void shouldCreateAvroSchemaForMap() { - // Given: - final PersistenceSchema schema = unwrappedPersistenceSchema( - SchemaBuilder - .map(Schema.OPTIONAL_STRING_SCHEMA, Schema.BOOLEAN_SCHEMA) - .build() - ); - - // When: - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema(schema, "orders"); - - // Then: - assertThat(avroSchema.toString(), equalTo("{" - + "\"type\":\"map\"," - + "\"values\":[\"null\",\"boolean\"]" - + "}")); - } - @Test public void shouldGetTheCorrectJavaTypeForBoolean() { final Schema schema = Schema.OPTIONAL_BOOLEAN_SCHEMA; diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index a712639239aa..994569fd60db 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -245,7 +245,7 @@ private Optional maybeCreateSinkDDL( ); } final SchemaRegistryClient srClient = serviceContext.getSchemaRegistryClient(); - AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient, ksqlConfig); return Optional.of(ddl); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java b/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java index 1d4e8f815ceb..21e4fec5747b 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java @@ -21,6 +21,8 @@ import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.avro.AvroSchemas; import java.io.IOException; import org.apache.http.HttpStatus; @@ -32,10 +34,12 @@ private AvroUtil() { public static void throwOnInvalidSchemaEvolution( final String statementText, final CreateSourceCommand ddl, - final SchemaRegistryClient schemaRegistryClient + final SchemaRegistryClient schemaRegistryClient, + final KsqlConfig ksqlConfig ) { final KsqlTopic topic = ddl.getTopic(); - if (topic.getValueFormat().getFormat() != Format.AVRO) { + final FormatInfo format = topic.getValueFormat().getFormatInfo(); + if (format.getFormat() != Format.AVRO) { return; } @@ -43,9 +47,10 @@ public static void throwOnInvalidSchemaEvolution( ddl.getSchema(), ddl.getSerdeOptions() ); - final org.apache.avro.Schema avroSchema = SchemaUtil.buildAvroSchema( + final org.apache.avro.Schema avroSchema = AvroSchemas.getAvroSchema( physicalSchema.valueSchema(), - ddl.getSourceName().name() + format.getAvroFullSchemaName().orElse(KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME), + ksqlConfig ); final String topicName = topic.getKafkaTopicName(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java index 8cebd9481f90..f8aa0edb7aff 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTest.java @@ -480,15 +480,15 @@ public void shouldFailIfAvroSchemaNotEvolvable() { "Cannot register avro schema for T as the schema is incompatible with the current schema version registered for the topic.\n" + "KSQL schema: {" + "\"type\":\"record\"," + - "\"name\":\"T\"," + - "\"namespace\":\"ksql\"," + + "\"name\":\"KsqlDataSourceSchema\"," + + "\"namespace\":\"io.confluent.ksql.avro_schemas\"," + "\"fields\":[" + "{\"name\":\"COL0\",\"type\":[\"null\",\"long\"],\"default\":null}," + "{\"name\":\"COL1\",\"type\":[\"null\",\"string\"],\"default\":null}," + "{\"name\":\"COL2\",\"type\":[\"null\",\"string\"],\"default\":null}," + "{\"name\":\"COL3\",\"type\":[\"null\",\"double\"],\"default\":null}," + "{\"name\":\"COL4\",\"type\":[\"null\",\"boolean\"],\"default\":null}" + - "]" + + "],\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"" + "}\n" + "Registered schema: \"int\""))); expectedException.expect(statementText(is( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java index 687ff76e0c6a..280fe131a6f3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java @@ -29,6 +29,7 @@ import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.GenericRowSerDe; +import io.confluent.ksql.serde.avro.AvroSchemas; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; @@ -551,11 +552,14 @@ private Deserializer getDeserializer( ).deserializer(); } - public void ensureSchema(final String topicName, final PhysicalSchema schema) { + public void ensureSchema( + final String topicName, + final PhysicalSchema schema, + final KsqlConfig ksqlConfig) { final SchemaRegistryClient srClient = serviceContext.get().getSchemaRegistryClient(); try { - final org.apache.avro.Schema avroSchema = SchemaUtil - .buildAvroSchema(schema.valueSchema(), "test-" + topicName); + final org.apache.avro.Schema avroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), "test_" + topicName, ksqlConfig); srClient.register(topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, avroSchema); } catch (final Exception e) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java index c660a85771e9..c5b6f68c9e4b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/AvroUtilTest.java @@ -37,14 +37,17 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.SchemaConverters; import io.confluent.ksql.schema.ksql.SchemaConverters.ConnectToSqlTypeConverter; +import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; +import io.confluent.ksql.serde.avro.AvroSchemas; import io.confluent.ksql.serde.connect.ConnectSchemaTranslator; import java.io.IOException; import java.util.Collections; +import java.util.Optional; import org.apache.kafka.connect.data.Schema; import org.junit.Before; import org.junit.Rule; @@ -57,7 +60,7 @@ @RunWith(MockitoJUnitRunner.class) public class AvroUtilTest { - private static final SourceName STREAM_NAME = SourceName.of("some-stream"); + private static final SourceName STREAM_NAME = SourceName.of("some_stream"); private static final String AVRO_SCHEMA_STRING = "{" + "\"namespace\": \"some.namespace\"," @@ -90,10 +93,17 @@ public class AvroUtilTest { private static final LogicalSchema SINGLE_FIELD_SCHEMA = toKsqlSchema(SINGLE_FIELD_AVRO_SCHEMA_STRING); + private static final LogicalSchema SCHEMA_WITH_MAPS = LogicalSchema.builder() + .valueColumn(ColumnName.of("notmap"), SqlTypes.BIGINT) + .valueColumn(ColumnName.of("mapcol"), SqlTypes.map(SqlTypes.INTEGER)) + .build(); + + private static final String SCHEMA_NAME = "schema_name"; + private static final KsqlTopic RESULT_TOPIC = new KsqlTopic( "actual-name", KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)), - ValueFormat.of(FormatInfo.of(Format.AVRO)), + ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of(SCHEMA_NAME), Optional.empty())), false); @Rule @@ -104,11 +114,12 @@ public class AvroUtilTest { @Mock private CreateSourceCommand ddlCommand; + private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap()); + @Before public void setUp() { when(ddlCommand.getSerdeOptions()).thenReturn(SerdeOption.none()); when(ddlCommand.getSchema()).thenReturn(MUTLI_FIELD_SCHEMA); - when(ddlCommand.getSourceName()).thenReturn(STREAM_NAME); when(ddlCommand.getTopic()).thenReturn(RESULT_TOPIC); } @@ -118,7 +129,7 @@ public void shouldValidateSchemaEvolutionWithCorrectSubject() throws Exception { when(srClient.testCompatibility(anyString(), any())).thenReturn(true); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); // Then: verify(srClient).testCompatibility(eq(RESULT_TOPIC.getKafkaTopicName() + "-value"), any()); @@ -129,12 +140,31 @@ public void shouldValidateSchemaEvolutionWithCorrectSchema() throws Exception { // Given: final PhysicalSchema schema = PhysicalSchema.from(MUTLI_FIELD_SCHEMA, SerdeOption.none()); - final org.apache.avro.Schema expectedAvroSchema = SchemaUtil - .buildAvroSchema(schema.valueSchema(), STREAM_NAME.name()); + final org.apache.avro.Schema expectedAvroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), SCHEMA_NAME, ksqlConfig); + when(srClient.testCompatibility(anyString(), any())).thenReturn(true); + + // When: + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); + + // Then: + verify(srClient).testCompatibility(any(), eq(expectedAvroSchema)); + } + + @Test + public void shouldValidateSchemaWithMaps() throws Exception { + // Given: + when(ddlCommand.getSchema()).thenReturn(SCHEMA_WITH_MAPS); + final PhysicalSchema schema = PhysicalSchema + .from(SCHEMA_WITH_MAPS, SerdeOption.none()); + when(srClient.testCompatibility(anyString(), any())).thenReturn(true); + final org.apache.avro.Schema expectedAvroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), SCHEMA_NAME, ksqlConfig); + // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); // Then: verify(srClient).testCompatibility(any(), eq(expectedAvroSchema)); @@ -149,11 +179,11 @@ public void shouldValidateWrappedSingleFieldSchemaEvolution() throws Exception { when(srClient.testCompatibility(anyString(), any())).thenReturn(true); - final org.apache.avro.Schema expectedAvroSchema = SchemaUtil - .buildAvroSchema(schema.valueSchema(), STREAM_NAME.name()); + final org.apache.avro.Schema expectedAvroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), SCHEMA_NAME, ksqlConfig); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); // Then: verify(srClient).testCompatibility(any(), eq(expectedAvroSchema)); @@ -170,11 +200,11 @@ public void shouldValidateUnwrappedSingleFieldSchemaEvolution() throws Exception when(srClient.testCompatibility(anyString(), any())).thenReturn(true); - final org.apache.avro.Schema expectedAvroSchema = SchemaUtil - .buildAvroSchema(schema.valueSchema(), STREAM_NAME.name()); + final org.apache.avro.Schema expectedAvroSchema = AvroSchemas + .getAvroSchema(schema.valueSchema(), SCHEMA_NAME, ksqlConfig); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); // Then: verify(srClient).testCompatibility(any(), eq(expectedAvroSchema)); @@ -186,7 +216,7 @@ public void shouldNotThrowInvalidEvolution() throws Exception { when(srClient.testCompatibility(any(), any())).thenReturn(true); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -198,7 +228,7 @@ public void shouldReturnInvalidEvolution() throws Exception { expectedException.expectMessage("Cannot register avro schema for actual-name as the schema is incompatible with the current schema version registered for the topic"); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -208,7 +238,7 @@ public void shouldNotThrowInvalidEvolutionIfSubjectNotRegistered() throws Except .thenThrow(new RestClientException("Unknown subject", 404, 40401)); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -227,7 +257,7 @@ public void shouldThrowOnSrAuthorizationErrors() throws Exception { ))); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -241,7 +271,7 @@ public void shouldThrowOnAnyOtherEvolutionSrException() throws Exception { expectedException.expectMessage("Could not connect to Schema Registry service"); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } @Test @@ -255,7 +285,7 @@ public void shouldThrowOnAnyOtherEvolutionIOException() throws Exception { expectedException.expectMessage("Could not check Schema compatibility"); // When: - AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient); + AvroUtil.throwOnInvalidSchemaEvolution(STATEMENT_TEXT, ddlCommand, srClient, ksqlConfig); } private static LogicalSchema toKsqlSchema(final String avroSchemaString) { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json index f87d8603e729..5e4b66647bca 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -329,7 +329,7 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Cannot register avro schema for OUTPUT as the schema is incompatible with the current schema version registered for the topic.\nKSQL schema: {\"type\":\"record\",\"name\":\"OUTPUT\",\"namespace\":\"ksql\",\"fields\":[{\"name\":\"C1\",\"type\":[\"null\",\"int\"],\"default\":null}]}\nRegistered schema: {\"type\":\"record\",\"name\":\"blah\",\"fields\":[{\"name\":\"C1\",\"type\":\"double\"}]}" + "message": "Cannot register avro schema for OUTPUT as the schema is incompatible with the current schema version registered for the topic.\nKSQL schema: {\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"C1\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}\nRegistered schema: {\"type\":\"record\",\"name\":\"blah\",\"fields\":[{\"name\":\"C1\",\"type\":\"double\"}]}" }, "topics": [ { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json b/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json index e65a5f63a955..0e842b147295 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json @@ -207,7 +207,7 @@ }, { "name": "OUTPUT", - "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type": "map", "values": ["null", "int"]}]}]}, + "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","int"],"default":null}]}}]}]}, "format": "{FORMAT}" } ], @@ -245,7 +245,7 @@ }, { "name": "OUTPUT", - "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type": "map", "values": ["null", "int"]}]}]}, + "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","int"],"default":null}]}}]}]}, "format": "AVRO" } ], @@ -280,7 +280,7 @@ }, { "name": "OUTPUT", - "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type": "map", "values": ["null", "int"]}]}]}, + "schema": {"type": "record", "name": "ignored", "fields": [{"name": "FOO", "type": ["null",{"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","int"],"default":null}]}}]}]}, "format": "{FORMAT}" } ], @@ -500,7 +500,7 @@ "topics": [ { "name": "OUTPUT", - "schema": {"type": "map", "values": ["null", "double"]}, + "schema": {"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","double"],"default":null}]}}, "format": "{FORMAT}" } ], @@ -527,7 +527,7 @@ "topics": [ { "name": "OUTPUT", - "schema": {"name": "ignored", "type": "record", "fields": [{"name": "FOO", "type": ["null",{"type": "map", "values": ["null", "double"]}]}]}, + "schema": {"name": "ignored", "type": "record", "fields": [{"name": "FOO", "type": ["null",{"type":"array","items":{"type":"record","name":"test","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","double"],"default":null}]}}]}]}, "format": "{FORMAT}" } ], diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/KsqlResourceFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/KsqlResourceFunctionalTest.java index 08e7d9bd2f5b..762cb6c0da47 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/KsqlResourceFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/KsqlResourceFunctionalTest.java @@ -38,6 +38,8 @@ import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.avro.AvroSchemas; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.SchemaUtil; import java.util.List; @@ -158,9 +160,10 @@ public void shouldInsertIntoValuesForAvroTopic() throws Exception { TEST_HARNESS.getSchemaRegistryClient() .register( "books" + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, - SchemaUtil.buildAvroSchema( + AvroSchemas.getAvroSchema( schema.valueSchema(), - "books" + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX + "books_value", + new KsqlConfig(REST_APP.getBaseConfig()) ) ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java index 8751a23a57cd..e583259ac971 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFunctionalTest.java @@ -84,6 +84,7 @@ public class StandaloneExecutorFunctionalTest { private String s1; private String s2; private String t1; + private KsqlConfig ksqlConfig; @BeforeClass public static void classSetUp() { @@ -103,8 +104,7 @@ public void setUp() throws Exception { .putAll(KsqlConfigTestUtil.baseTestConfig()) .put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, TEST_HARNESS.kafkaBootstrapServers()) .build(); - - final KsqlConfig ksqlConfig = new KsqlConfig(properties); + ksqlConfig = new KsqlConfig(properties); when(configStore.getKsqlConfig()).thenReturn(ksqlConfig); @@ -256,7 +256,7 @@ public void shouldFailOnAvroWithoutSchemasIfSchemaNotAvailable() { @Test public void shouldFailOnAvroWithoutSchemasIfSchemaNotEvolvable() { // Given: - givenIncompatibleSchemaExists(s1); + givenIncompatibleSchemaExists(s1, ksqlConfig); givenScript("" + "SET 'auto.offset.reset' = 'earliest';" @@ -297,7 +297,9 @@ public void shouldHandleComments() { TEST_HARNESS.verifyAvailableRows(s1, DATA_SIZE, JSON, DATA_SCHEMA); } - private static void givenIncompatibleSchemaExists(final String topicName) { + private static void givenIncompatibleSchemaExists( + final String topicName, + final KsqlConfig ksqlConfig) { final LogicalSchema logical = LogicalSchema.builder() .valueColumn(ColumnName.of("ORDERID"), SqlTypes.struct() .field("fred", SqlTypes.INTEGER) @@ -310,7 +312,7 @@ private static void givenIncompatibleSchemaExists(final String topicName) { SerdeOption.none() ); - TEST_HARNESS.ensureSchema(topicName, incompatiblePhysical); + TEST_HARNESS.ensureSchema(topicName, incompatiblePhysical, ksqlConfig); } private void givenScript(final String contents) { diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java index 27ce0df99626..e7eeb1174d4e 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java @@ -15,18 +15,13 @@ package io.confluent.ksql.serde.avro; -import static java.util.Objects.requireNonNull; - import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import io.confluent.ksql.schema.connect.SchemaWalker; import io.confluent.ksql.schema.connect.SchemaWalker.Visitor; import io.confluent.ksql.serde.connect.ConnectDataTranslator; import io.confluent.ksql.serde.connect.DataTranslator; import io.confluent.ksql.util.DecimalUtil; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -35,7 +30,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; -import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; /** @@ -60,9 +54,8 @@ public class AvroDataTranslator implements DataTranslator { ) { this.ksqlSchema = throwOnInvalidSchema(Objects.requireNonNull(schema, "schema")); - this.avroCompatibleSchema = buildAvroCompatibleSchema( - this.ksqlSchema, - new Context(Collections.singleton(schemaFullName), useNamedMaps, true) + this.avroCompatibleSchema = AvroSchemas.getAvroCompatibleConnectSchema( + schema, schemaFullName, useNamedMaps ); this.innerTranslator = new ConnectDataTranslator(avroCompatibleSchema); @@ -106,129 +99,6 @@ private static Struct convertStruct( return struct; } - private static final class Context { - private static final String DELIMITER = "_"; - - static final String MAP_KEY_NAME = "MapKey"; - static final String MAP_VALUE_NAME = "MapValue"; - - private final Iterable names; - private final boolean useNamedMaps; - private boolean root; - - private Context( - final Iterable names, - final boolean useNamedMaps, - final boolean root - ) { - this.names = requireNonNull(names, "names"); - this.useNamedMaps = useNamedMaps; - this.root = root; - } - - Context with(final String name) { - return new Context(Iterables.concat(names, ImmutableList.of(name)), useNamedMaps, root); - } - - public String name() { - return String.join(DELIMITER, names); - } - } - - private static String avroCompatibleFieldName(final Field field) { - // Currently the only incompatible field names expected are fully qualified - // column identifiers. Once quoted identifier support is introduced we will - // need to implement something more generic here. - return field.name().replace(".", "_"); - } - - private static Schema buildAvroCompatibleSchema( - final Schema schema, - final Context context - ) { - final boolean notRoot = !context.root; - context.root = false; - - final SchemaBuilder schemaBuilder; - switch (schema.type()) { - default: - if (notRoot || !schema.isOptional()) { - return schema; - } - - schemaBuilder = new SchemaBuilder(schema.type()); - break; - - case STRUCT: - schemaBuilder = buildAvroCompatibleStruct(schema, context); - break; - - case ARRAY: - schemaBuilder = buildAvroCompatibleArray(schema, context); - break; - - case MAP: - schemaBuilder = buildAvroCompatibleMap(schema, context); - break; - } - - if (schema.isOptional() && notRoot) { - schemaBuilder.optional(); - } - - return schemaBuilder.build(); - } - - private static SchemaBuilder buildAvroCompatibleMap( - final Schema schema, final Context context - ) { - final Schema keySchema = - buildAvroCompatibleSchema(schema.keySchema(), context.with(Context.MAP_KEY_NAME)); - - final Schema valueSchema = - buildAvroCompatibleSchema(schema.valueSchema(), context.with(Context.MAP_VALUE_NAME)); - - final SchemaBuilder schemaBuilder = SchemaBuilder.map( - keySchema, - valueSchema - ); - - if (context.useNamedMaps) { - schemaBuilder.name(context.name()); - } - - return schemaBuilder; - } - - private static SchemaBuilder buildAvroCompatibleArray( - final Schema schema, - final Context context - ) { - final Schema valueSchema = buildAvroCompatibleSchema(schema.valueSchema(), context); - - return SchemaBuilder.array(valueSchema); - } - - private static SchemaBuilder buildAvroCompatibleStruct( - final Schema schema, - final Context context - ) { - final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); - - if (schema.name() == null) { - schemaBuilder.name(context.name()); - } - - for (final Field f : schema.fields()) { - final String fieldName = avroCompatibleFieldName(f); - final Schema fieldSchema = buildAvroCompatibleSchema(f.schema(), context.with(f.name())); - - schemaBuilder.field(fieldName, fieldSchema); - } - - return schemaBuilder; - } - @SuppressWarnings("unchecked") private static Object replaceSchema(final Schema schema, final Object object) { if (object == null) { diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java new file mode 100644 index 000000000000..3bdf752f7d74 --- /dev/null +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java @@ -0,0 +1,176 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde.avro; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import io.confluent.connect.avro.AvroData; +import io.confluent.ksql.schema.ksql.PersistenceSchema; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Collections; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +public final class AvroSchemas { + private AvroSchemas() { + } + + public static org.apache.avro.Schema getAvroSchema( + final PersistenceSchema schema, + final String name, + final KsqlConfig ksqlConfig) { + final boolean useNamedMaps = ksqlConfig.getBoolean(KsqlConfig.KSQL_USE_NAMED_AVRO_MAPS); + return new AvroData(0).fromConnectSchema( + getAvroCompatibleConnectSchema(schema.serializedSchema(), name, useNamedMaps) + ); + } + + public static Schema getAvroCompatibleConnectSchema( + final Schema schema, + final String schemaFullName, + final boolean useNamedMaps) { + return buildAvroCompatibleSchema( + schema, + new Context(Collections.singleton(schemaFullName), useNamedMaps, true) + ); + } + + private static final class Context { + private static final String DELIMITER = "_"; + + static final String MAP_KEY_NAME = "MapKey"; + static final String MAP_VALUE_NAME = "MapValue"; + + private final Iterable names; + private final boolean useNamedMaps; + private boolean root; + + private Context( + final Iterable names, + final boolean useNamedMaps, + final boolean root + ) { + this.names = requireNonNull(names, "names"); + this.useNamedMaps = useNamedMaps; + this.root = root; + } + + Context with(final String name) { + return new Context(Iterables.concat(names, ImmutableList.of(name)), useNamedMaps, root); + } + + public String name() { + return String.join(DELIMITER, names); + } + } + + private static String avroCompatibleFieldName(final Field field) { + // Currently the only incompatible field names expected are fully qualified + // column identifiers. Once quoted identifier support is introduced we will + // need to implement something more generic here. + return field.name().replace(".", "_"); + } + + private static Schema buildAvroCompatibleSchema( + final Schema schema, + final Context context + ) { + final boolean notRoot = !context.root; + context.root = false; + + final SchemaBuilder schemaBuilder; + switch (schema.type()) { + default: + if (notRoot || !schema.isOptional()) { + return schema; + } + + schemaBuilder = new SchemaBuilder(schema.type()); + break; + + case STRUCT: + schemaBuilder = buildAvroCompatibleStruct(schema, context); + break; + + case ARRAY: + schemaBuilder = buildAvroCompatibleArray(schema, context); + break; + + case MAP: + schemaBuilder = buildAvroCompatibleMap(schema, context); + break; + } + + if (schema.isOptional() && notRoot) { + schemaBuilder.optional(); + } + + return schemaBuilder.build(); + } + + private static SchemaBuilder buildAvroCompatibleMap( + final Schema schema, final Context context + ) { + final Schema keySchema = + buildAvroCompatibleSchema(schema.keySchema(), context.with(Context.MAP_KEY_NAME)); + + final Schema valueSchema = + buildAvroCompatibleSchema(schema.valueSchema(), context.with(Context.MAP_VALUE_NAME)); + + final SchemaBuilder schemaBuilder = SchemaBuilder.map( + keySchema, + valueSchema + ); + + if (context.useNamedMaps) { + schemaBuilder.name(context.name()); + } + + return schemaBuilder; + } + + private static SchemaBuilder buildAvroCompatibleArray( + final Schema schema, + final Context context + ) { + final Schema valueSchema = buildAvroCompatibleSchema(schema.valueSchema(), context); + + return SchemaBuilder.array(valueSchema); + } + + private static SchemaBuilder buildAvroCompatibleStruct( + final Schema schema, + final Context context + ) { + final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + + if (schema.name() == null) { + schemaBuilder.name(context.name()); + } + + for (final Field f : schema.fields()) { + final String fieldName = avroCompatibleFieldName(f); + final Schema fieldSchema = buildAvroCompatibleSchema(f.schema(), context.with(f.name())); + + schemaBuilder.field(fieldName, fieldSchema); + } + + return schemaBuilder; + } +}