diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/schema/connect/SqlSchemaFormatter.java b/ksqldb-common/src/main/java/io/confluent/ksql/schema/connect/SqlSchemaFormatter.java index bf0d3cffb528..eb28363151d0 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/schema/connect/SqlSchemaFormatter.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/schema/connect/SqlSchemaFormatter.java @@ -161,6 +161,9 @@ public String visitMap(final Schema schema, final String key, final String value } public String visitStruct(final Schema schema, final List fields) { + if (fields.isEmpty()) { + return "STRUCT< >"; + } return fields.stream() .collect(Collectors.joining(", ", STRUCT_START, STRUCTURED_END)); } diff --git a/ksqldb-common/src/test/java/io/confluent/ksql/schema/connect/SqlSchemaFormatterTest.java b/ksqldb-common/src/test/java/io/confluent/ksql/schema/connect/SqlSchemaFormatterTest.java index 025e8605e752..38cd1ec9bb30 100644 --- a/ksqldb-common/src/test/java/io/confluent/ksql/schema/connect/SqlSchemaFormatterTest.java +++ b/ksqldb-common/src/test/java/io/confluent/ksql/schema/connect/SqlSchemaFormatterTest.java @@ -172,6 +172,15 @@ public void shouldFormatOptionalMap() { assertThat(STRICT.format(schema), is("MAP")); } + @Test + public void shouldFormatEmptyStruct() { + // Given: + final Schema struct = SchemaBuilder.struct().optional().build(); + + // Then: + assertThat(DEFAULT.format(struct), is("STRUCT< >")); + } + @Test public void shouldFormatStruct() { // Given: diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorFunctionalTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorFunctionalTest.java index fe5a8b69b17f..3c36cb823b03 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorFunctionalTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorFunctionalTest.java @@ -396,6 +396,12 @@ public void shouldIgnoreConnectMapWithUnsupportedKey() { ); } + @Test + public void shouldInferEmptyStruct() { + final Schema emptyStruct = SchemaBuilder.struct().optional().build(); + shouldInferConnectType(emptyStruct, emptyStruct); + } + @Test public void shouldInferComplexConnectSchema() { final Schema arrayInner = SchemaBuilder.struct() diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_empty_struct_creation/5.5.0_1581572087372/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_empty_struct_creation/5.5.0_1581572087372/spec.json index 9a8ec17612a1..2cdaf562c241 100644 --- a/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_empty_struct_creation/5.5.0_1581572087372/spec.json +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/create-struct_-_empty_struct_creation/5.5.0_1581572087372/spec.json @@ -101,7 +101,7 @@ } ], "schemas" : { "CSAS_BIG_STRUCT_0.KsqlTopic.Source" : "STRUCT NOT NULL", - "CSAS_BIG_STRUCT_0.BIG_STRUCT" : "STRUCT> NOT NULL" + "CSAS_BIG_STRUCT_0.BIG_STRUCT" : "STRUCT> NOT NULL" }, "configs" : { "ksql.extension.dir" : "ext",