From a77cebe88dae366a2e5367bc172379cfad87abf7 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 11 Feb 2020 13:13:20 -0800 Subject: [PATCH] feat: support Protobuf in ksqlDB (#4469) --- config/ksql-server.properties | 2 +- .../SchemaRegistryTopicSchemaSupplier.java | 45 +--- .../KsqlSchemaRegistryClientFactory.java | 7 + .../DefaultSchemaInjectorFunctionalTest.java | 10 +- ...SchemaRegistryTopicSchemaSupplierTest.java | 50 +---- .../KsqlSchemaRegistryClientFactoryTest.java | 4 +- .../datagen/SchemaRegistryClientFactory.java | 6 +- ksql-functional-tests/pom.xml | 10 + .../confluent/ksql/test/model/TopicNode.java | 58 +---- .../test/serde/avro/AvroSerdeSupplier.java | 35 --- .../ValueSpecProtobufSerdeSupplier.java | 34 +++ .../ksql/test/tools/TestCaseBuilderUtil.java | 17 +- .../ksql/test/tools/TestExecutorUtil.java | 27 +-- .../io/confluent/ksql/test/tools/Topic.java | 12 +- .../confluent/ksql/test/utils/SerdeUtil.java | 46 +++- .../ksql/test/EndToEndEngineTestUtil.java | 16 -- .../ksql/test/SchemaTranslationTest.java | 25 ++- .../ksql/test/rest/RestTestExecutor.java | 2 +- .../ksql/test/tools/StubKafkaServiceTest.java | 4 +- .../query-validation-tests/collect-list.json | 20 +- .../query-validation-tests/collect-set.json | 10 +- .../query-validation-tests/concat.json | 4 +- .../query-validation-tests/delimited.json | 2 +- .../query-validation-tests/elements.json | 20 +- .../query-validation-tests/group-by.json | 8 +- .../query-validation-tests/histogram.json | 4 +- .../query-validation-tests/initcap.json | 2 +- .../join-with-custom-timestamp.json | 10 +- .../query-validation-tests/joins.json | 201 ++++++++++++++++-- .../query-validation-tests/protobuf.json | 69 ++++++ .../query-validation-tests/replace.json | 2 +- .../query-validation-tests/serdes.json | 132 +++++++++++- .../query-validation-tests/simple-struct.json | 10 +- .../query-validation-tests/substring.json | 6 +- .../query-validation-tests/topk-group-by.json | 8 +- .../resources/query-validation-tests/url.json | 20 +- .../schema-validation-tests/avro-basic.json | 1 + .../schema-validation-tests/real-schema.json | 1 + ksql-serde/pom.xml | 17 +- .../java/io/confluent/ksql/serde/Format.java | 31 ++- .../confluent/ksql/serde/FormatFactory.java | 5 +- .../confluent/ksql/serde/avro/AvroFormat.java | 17 ++ .../ksql/serde/protobuf/ProtobufFormat.java | 59 +++++ .../serde/protobuf/ProtobufSerdeFactory.java | 115 ++++++++++ pom.xml | 26 +++ 45 files changed, 888 insertions(+), 322 deletions(-) delete mode 100644 ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/AvroSerdeSupplier.java create mode 100644 ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/protobuf/ValueSpecProtobufSerdeSupplier.java create mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/protobuf.json create mode 100644 ksql-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java create mode 100644 ksql-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java diff --git a/config/ksql-server.properties b/config/ksql-server.properties index 1737ca0ba58a..ab323599654f 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -62,4 +62,4 @@ bootstrap.servers=localhost:9092 # ksql.connect.worker.config=config/connect.properties # Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry: -#ksql.schema.registry.url=? +# ksql.schema.registry.url=? diff --git a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegistryTopicSchemaSupplier.java b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegistryTopicSchemaSupplier.java index 4572a656448f..af658e990847 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegistryTopicSchemaSupplier.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegistryTopicSchemaSupplier.java @@ -16,21 +16,18 @@ package io.confluent.ksql.schema.ksql.inference; import com.google.common.annotations.VisibleForTesting; -import io.confluent.connect.avro.AvroData; -import io.confluent.connect.avro.AvroDataConfig; import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.ksql.links.DocumentationLinks; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.connect.ConnectSchemaTranslator; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; -import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.function.Function; -import org.apache.avro.Schema.Parser; import org.apache.http.HttpStatus; import org.apache.kafka.connect.data.Schema; @@ -40,29 +37,26 @@ public class SchemaRegistryTopicSchemaSupplier implements TopicSchemaSupplier { private final SchemaRegistryClient srClient; - private final Function toAvroTranslator; - private final Function toConnectTranslator; private final Function toKsqlTranslator; + private final Function formatFactory; public SchemaRegistryTopicSchemaSupplier(final SchemaRegistryClient srClient) { this( srClient, - schema -> new Parser().parse(schema), - new AvroData(new AvroDataConfig(Collections.emptyMap()))::toConnectSchema, - new ConnectSchemaTranslator()::toKsqlSchema); + new ConnectSchemaTranslator()::toKsqlSchema, + FormatFactory::fromName + ); } @VisibleForTesting SchemaRegistryTopicSchemaSupplier( final SchemaRegistryClient srClient, - final Function toAvroTranslator, - final Function toConnectTranslator, - final Function toKsqlTranslator + final Function toKsqlTranslator, + final Function formatFactory ) { this.srClient = Objects.requireNonNull(srClient, "srClient"); - this.toAvroTranslator = Objects.requireNonNull(toAvroTranslator, "toAvroTranslator"); - this.toConnectTranslator = Objects.requireNonNull(toConnectTranslator, "toConnectTranslator"); this.toKsqlTranslator = Objects.requireNonNull(toKsqlTranslator, "toKsqlTranslator"); + this.formatFactory = Objects.requireNonNull(formatFactory, "formatFactory"); } @Override @@ -100,32 +94,15 @@ public SchemaResult fromParsedSchema( final int id, final ParsedSchema parsedSchema ) { - try { - final Schema connectSchema; - - switch (parsedSchema.schemaType()) { - case AvroSchema.TYPE: - connectSchema = toConnectSchema(parsedSchema.canonicalString()); - break; - case "JSON": - case "PROTOBUF": - default: - throw new KsqlException("Unsupported schema type: " + parsedSchema.schemaType()); - } - + final Format format = formatFactory.apply(parsedSchema.schemaType()); + final Schema connectSchema = toKsqlTranslator.apply(format.toConnectSchema(parsedSchema)); return SchemaResult.success(SchemaAndId.schemaAndId(connectSchema, id)); } catch (final Exception e) { return notCompatible(topic, parsedSchema.canonicalString(), e); } } - private Schema toConnectSchema(final String avroSchemaString) { - final org.apache.avro.Schema avroSchema = toAvroTranslator.apply(avroSchemaString); - final Schema connectSchema = toConnectTranslator.apply(avroSchema); - return toKsqlTranslator.apply(connectSchema); - } - private static SchemaResult notFound(final String topicName) { return SchemaResult.failure(new KsqlException( "Avro schema for message values on topic " + topicName diff --git a/ksql-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java index 87631d2ac031..2e51c4290838 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactory.java @@ -16,10 +16,15 @@ package io.confluent.ksql.schema.registry; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import io.confluent.kafka.schemaregistry.SchemaProvider; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.RestService; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.ksql.util.KsqlConfig; +import java.util.List; import java.util.Map; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -41,6 +46,7 @@ public class KsqlSchemaRegistryClientFactory { interface SchemaRegistryClientFactory { CachedSchemaRegistryClient create(RestService service, int identityMapCapacity, + List providers, Map clientConfigs, Map httpHeaders); } @@ -113,6 +119,7 @@ public SchemaRegistryClient get() { return schemaRegistryClientFactory.create( restService, 1000, + ImmutableList.of(new AvroSchemaProvider(), new ProtobufSchemaProvider()), schemaRegistryClientConfigs, httpHeaders ); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorFunctionalTest.java index 84481a6917b2..e425b9bfe709 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorFunctionalTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorFunctionalTest.java @@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.connect.avro.AvroData; -import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.metastore.MetaStore; @@ -69,7 +69,7 @@ public class DefaultSchemaInjectorFunctionalTest { @Mock private SchemaRegistryClient srClient; @Mock - private ParsedSchema schema; + private AvroSchema avroSchema; @Mock private MetaStore metaStore; private DefaultSchemaInjector schemaInjector; @@ -499,9 +499,9 @@ private void shouldInferSchema( try { when(srClient.getLatestSchemaMetadata(any())) .thenReturn(new SchemaMetadata(1, 1, avroSchema.toString())); - when(srClient.getSchemaBySubjectAndId(any(), anyInt())).thenReturn(schema); - when(schema.schemaType()).thenReturn("AVRO"); - when(schema.canonicalString()).thenReturn(avroSchema.toString()); + when(srClient.getSchemaBySubjectAndId(any(), anyInt())).thenReturn(this.avroSchema); + when(this.avroSchema.schemaType()).thenReturn("AVRO"); + when(this.avroSchema.rawSchema()).thenReturn(avroSchema); } catch (final Exception e) { throw new AssertionError(e); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegistryTopicSchemaSupplierTest.java b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegistryTopicSchemaSupplierTest.java index 86ea3f2c6fe1..c92be5ec7b96 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegistryTopicSchemaSupplierTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegistryTopicSchemaSupplierTest.java @@ -30,6 +30,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaResult; +import io.confluent.ksql.serde.Format; import io.confluent.ksql.util.KsqlException; import java.io.IOException; import java.util.Optional; @@ -57,26 +58,22 @@ public class SchemaRegistryTopicSchemaSupplierTest { @Mock private SchemaRegistryClient srClient; @Mock - private Function toAvroTranslator; - @Mock - private Function toConnectTranslator; - @Mock private Function toKsqlTranslator; @Mock - private org.apache.avro.Schema avroSchema; - @Mock private ParsedSchema parsedSchema; @Mock private Schema connectSchema; @Mock private Schema ksqlSchema; + @Mock + private Format format; private SchemaRegistryTopicSchemaSupplier supplier; @Before public void setUp() throws Exception { supplier = new SchemaRegistryTopicSchemaSupplier( - srClient, toAvroTranslator, toConnectTranslator, toKsqlTranslator); + srClient, toKsqlTranslator, f -> format); when(srClient.getLatestSchemaMetadata(any())) .thenReturn(new SchemaMetadata(SCHEMA_ID, -1, AVRO_SCHEMA)); @@ -88,11 +85,7 @@ public void setUp() throws Exception { when(parsedSchema.canonicalString()).thenReturn(AVRO_SCHEMA); - when(toAvroTranslator.apply(any())) - .thenReturn(avroSchema); - - when(toConnectTranslator.apply(any())) - .thenReturn(connectSchema); + when(format.toConnectSchema(parsedSchema)).thenReturn(connectSchema); when(toKsqlTranslator.apply(any())) .thenReturn(ksqlSchema); @@ -226,28 +219,10 @@ public void shouldThrowFromGetValueWithIdSchemaOnOtherException() throws Excepti supplier.getValueSchema(TOPIC_NAME, Optional.of(42)); } - @Test - public void shouldReturnErrorFromGetValueSchemaIfCanNotConvertToAvroSchema() { - // Given: - when(toAvroTranslator.apply(any())) - .thenThrow(new RuntimeException("it went boom")); - - // When: - final SchemaResult result = supplier.getValueSchema(TOPIC_NAME, Optional.empty()); - - // Then: - assertThat(result.schemaAndId, is(Optional.empty())); - assertThat(result.failureReason.get().getMessage(), containsString( - "Unable to verify if the schema for topic some-topic is compatible with KSQL.")); - assertThat(result.failureReason.get().getMessage(), containsString( - "it went boom")); - assertThat(result.failureReason.get().getMessage(), containsString(AVRO_SCHEMA)); - } - @Test public void shouldReturnErrorFromGetValueSchemaIfCanNotConvertToConnectSchema() { // Given: - when(toConnectTranslator.apply(any())) + when(format.toConnectSchema(any())) .thenThrow(new RuntimeException("it went boom")); // When: @@ -299,21 +274,12 @@ public void shouldRequestCorrectSchemaOnGetValueSchemaWithId() throws Exception } @Test - public void shouldPassWriteSchemaToAvroTranslator() { - // When: - supplier.getValueSchema(TOPIC_NAME, Optional.empty()); - - // Then: - verify(toAvroTranslator).apply(AVRO_SCHEMA); - } - - @Test - public void shouldPassWriteSchemaToConnectTranslator() { + public void shouldPassWriteSchemaToFormat() { // When: supplier.getValueSchema(TOPIC_NAME, Optional.empty()); // Then: - verify(toConnectTranslator).apply(avroSchema); + verify(format).toConnectSchema(parsedSchema); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactoryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactoryTest.java index d7d034dcb7e2..734754ceaa31 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactoryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/schema/registry/KsqlSchemaRegistryClientFactoryTest.java @@ -66,7 +66,7 @@ public class KsqlSchemaRegistryClientFactoryTest { @Before public void setUp() { - when(srClientFactory.create(any(), anyInt(), any(), any())) + when(srClientFactory.create(any(), anyInt(), any(), any(), any())) .thenReturn(mock(CachedSchemaRegistryClient.class)); when(restServiceSupplier.get()).thenReturn(restService); @@ -166,7 +166,7 @@ public void shouldPassBasicAuthCredentialsToSchemaRegistryClient() { // Then: verify(restService).setSslSocketFactory(isA(SSL_CONTEXT.getSocketFactory().getClass())); - srClientFactory.create(same(restService), anyInt(), eq(expectedConfigs), any()); + srClientFactory.create(same(restService), anyInt(), any(), eq(expectedConfigs), any()); } private static Map defaultConfigs() { diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/SchemaRegistryClientFactory.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/SchemaRegistryClientFactory.java index 78e9aa60c607..6dc5cb2a7280 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/SchemaRegistryClientFactory.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/SchemaRegistryClientFactory.java @@ -15,8 +15,11 @@ package io.confluent.ksql.datagen; +import com.google.common.collect.ImmutableList; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.util.KsqlConfig; @@ -42,8 +45,9 @@ static Optional getSrClient( } return Optional.of(new CachedSchemaRegistryClient( - ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY), + ImmutableList.of(ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY)), 100, + ImmutableList.of(new AvroSchemaProvider(), new ProtobufSchemaProvider()), ksqlConfig.originalsWithPrefix(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX) )); } diff --git a/ksql-functional-tests/pom.xml b/ksql-functional-tests/pom.xml index beedd30ffac8..4680cd625f5d 100644 --- a/ksql-functional-tests/pom.xml +++ b/ksql-functional-tests/pom.xml @@ -146,6 +146,16 @@ test + + com.google.protobuf + protobuf-java + + + + com.google.protobuf + protobuf-java-util + + diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/model/TopicNode.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/model/TopicNode.java index d696be93db7d..1aa05cb22aee 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/model/TopicNode.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/model/TopicNode.java @@ -19,29 +19,16 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.NullNode; -import io.confluent.connect.avro.AvroData; -import io.confluent.ksql.name.ColumnName; -import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; -import io.confluent.ksql.schema.ksql.SchemaConverters; -import io.confluent.ksql.schema.ksql.types.SqlStruct; -import io.confluent.ksql.test.TestFrameworkException; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.ksql.test.tools.Topic; import io.confluent.ksql.test.tools.exceptions.InvalidFieldException; +import io.confluent.ksql.test.utils.SerdeUtil; import java.util.Optional; -import org.apache.avro.Schema; public final class TopicNode { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String FORMAT_REPLACE_ERROR = - "To use {FORMAT} in your topics please set the 'format' test case element"; - private final String name; - private final String format; - private final Optional avroSchema; + private final Optional schema; private final int numPartitions; private final int replicas; @@ -53,8 +40,7 @@ public final class TopicNode { @JsonProperty("replicas") final Integer replicas ) { this.name = name == null ? "" : name; - this.avroSchema = buildAvroSchema(requireNonNull(schema, "schema")); - this.format = format == null ? "" : format; + this.schema = SerdeUtil.buildSchema(requireNonNull(schema, "schema"), format); this.numPartitions = numPartitions == null ? 1 : numPartitions; this.replicas = replicas == null ? 1 : replicas; @@ -64,41 +50,7 @@ public final class TopicNode { } public Topic build() { - return new Topic(name, numPartitions, replicas, avroSchema); + return new Topic(name, numPartitions, replicas, schema); } - private LogicalSchema logicalSchema() { - if (!avroSchema.isPresent()) { - throw new TestFrameworkException("Test framework requires " - + "the schema of any topic using format KAFKA"); - } - - final org.apache.kafka.connect.data.Schema valueSchema = new AvroData(1) - .toConnectSchema(avroSchema.get()); - - final SqlStruct valueType = (SqlStruct) SchemaConverters.connectToSqlConverter() - .toSqlType(valueSchema); - - final Builder schemaBuilder = LogicalSchema.builder(); - - valueType.fields().forEach(field -> schemaBuilder.valueColumn( - ColumnName.of(field.name()), - field.type())); - - return schemaBuilder.build(); - } - - private static Optional buildAvroSchema(final JsonNode schema) { - if (schema instanceof NullNode) { - return Optional.empty(); - } - - try { - final String schemaString = OBJECT_MAPPER.writeValueAsString(schema); - final Schema.Parser parser = new Schema.Parser(); - return Optional.of(parser.parse(schemaString)); - } catch (final Exception e) { - throw new InvalidFieldException("schema", "failed to parse", e); - } - } } \ No newline at end of file diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/AvroSerdeSupplier.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/AvroSerdeSupplier.java deleted file mode 100644 index 7f395a248ee3..000000000000 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/avro/AvroSerdeSupplier.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.test.serde.avro; - -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import io.confluent.ksql.test.serde.SerdeSupplier; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; - -public class AvroSerdeSupplier implements SerdeSupplier { - @Override - public Serializer getSerializer(final SchemaRegistryClient schemaRegistryClient) { - return new KafkaAvroSerializer(schemaRegistryClient); - } - - @Override - public Deserializer getDeserializer(final SchemaRegistryClient schemaRegistryClient) { - return new KafkaAvroDeserializer(schemaRegistryClient); - } -} \ No newline at end of file diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/protobuf/ValueSpecProtobufSerdeSupplier.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/protobuf/ValueSpecProtobufSerdeSupplier.java new file mode 100644 index 000000000000..43292d309eb1 --- /dev/null +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/serde/protobuf/ValueSpecProtobufSerdeSupplier.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.test.serde.protobuf; + +import io.confluent.connect.protobuf.ProtobufConverter; +import io.confluent.connect.protobuf.ProtobufData; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.ksql.test.serde.ConnectSerdeSupplier; +import org.apache.kafka.connect.data.Schema; + +public class ValueSpecProtobufSerdeSupplier extends ConnectSerdeSupplier { + + public ValueSpecProtobufSerdeSupplier() { + super(ProtobufConverter::new); + } + + @Override + protected Schema fromParsedSchema(final ProtobufSchema schema) { + return new ProtobufData(1).toConnectSchema(schema); + } +} diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestCaseBuilderUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestCaseBuilderUtil.java index 49eb7b157ce0..bf1f2b628f00 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestCaseBuilderUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestCaseBuilderUtil.java @@ -19,7 +19,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; -import io.confluent.connect.avro.AvroData; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.MetaStoreImpl; @@ -30,7 +30,6 @@ import io.confluent.ksql.parser.SqlBaseParser; import io.confluent.ksql.parser.tree.CreateSource; import io.confluent.ksql.schema.ksql.SchemaConverters; -import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.test.model.RecordNode; import io.confluent.ksql.test.model.TopicNode; @@ -146,25 +145,27 @@ private static Topic createTopicFromStatement( final KsqlTopic ksqlTopic = TopicFactory.create(statement.getProperties()); final ValueFormat valueFormat = ksqlTopic.getValueFormat(); - final Optional avroSchema; - if (valueFormat.getFormat() == FormatFactory.AVRO) { + final Optional schema; + if (valueFormat.getFormat().supportsSchemaInference()) { // add avro schema final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); statement.getElements().forEach(e -> schemaBuilder.field( e.getName().name(), SchemaConverters.sqlToConnectConverter().toConnectSchema(e.getType().getSqlType())) ); - avroSchema = Optional.of(new AvroData(1) - .fromConnectSchema(addNames(schemaBuilder.build()))); + + schema = Optional.of( + valueFormat.getFormat().toParsedSchema(addNames(schemaBuilder.build())) + ); } else { - avroSchema = Optional.empty(); + schema = Optional.empty(); } return new Topic( ksqlTopic.getKafkaTopicName(), KsqlConstants.legacyDefaultSinkPartitionCount, KsqlConstants.legacyDefaultSinkReplicaCount, - avroSchema + schema ); }; diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java index 3281e203f81f..adc31036c733 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutorUtil.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.KsqlExecutionContext; @@ -42,7 +43,6 @@ import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.schema.ksql.inference.DefaultSchemaInjector; import io.confluent.ksql.schema.ksql.inference.SchemaRegistryTopicSchemaSupplier; -import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; @@ -62,7 +62,6 @@ import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; -import org.apache.avro.Schema; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; @@ -152,14 +151,13 @@ private static Topic buildSinkTopic( ) { final String kafkaTopicName = sinkDataSource.getKafkaTopicName(); - final Optional avroSchema = - getAvroSchema(sinkDataSource, schemaRegistryClient); + final Optional schema = getSchema(sinkDataSource, schemaRegistryClient); final Topic sinkTopic = new Topic( kafkaTopicName, KsqlConstants.legacyDefaultSinkPartitionCount, KsqlConstants.legacyDefaultSinkReplicaCount, - avroSchema + schema ); if (stubKafkaService.topicExists(sinkTopic)) { @@ -170,14 +168,18 @@ private static Topic buildSinkTopic( return sinkTopic; } - private static Optional getAvroSchema( + private static Optional getSchema( final DataSource dataSource, final SchemaRegistryClient schemaRegistryClient) { - if (dataSource.getKsqlTopic().getValueFormat().getFormat() == FormatFactory.AVRO) { + if (dataSource.getKsqlTopic().getValueFormat().getFormat().supportsSchemaInference()) { try { - final SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata( - dataSource.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); - return Optional.of(new org.apache.avro.Schema.Parser().parse(schemaMetadata.getSchema())); + final String subject = + dataSource.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX; + + final SchemaMetadata metadata = schemaRegistryClient.getLatestSchemaMetadata(subject); + return Optional.of( + schemaRegistryClient.getSchemaBySubjectAndId(subject, metadata.getId()) + ); } catch (final Exception e) { // do nothing } @@ -232,10 +234,9 @@ private static void initializeTopics( topic.getNumPartitions(), topic.getReplicas()); - topic.getAvroSchema().ifPresent(schema -> { + topic.getSchema().ifPresent(schema -> { try { - srClient - .register(topic.getName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, schema); + srClient.register(topic.getName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, schema); } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/Topic.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/Topic.java index 795cc66ce71a..d63cc1bcebe7 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/Topic.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/Topic.java @@ -17,24 +17,24 @@ import static java.util.Objects.requireNonNull; +import io.confluent.kafka.schemaregistry.ParsedSchema; import java.util.Optional; -import org.apache.avro.Schema; public class Topic { private final String name; private final int numPartitions; private final short replicas; - private final Optional avroSchema; + private final Optional schema; public Topic( final String name, final int numPartitions, final int replicas, - final Optional avroSchema + final Optional schema ) { this.name = requireNonNull(name, "name"); - this.avroSchema = requireNonNull(avroSchema, "schema"); + this.schema = requireNonNull(schema, "schema"); this.numPartitions = numPartitions; this.replicas = (short) replicas; } @@ -43,8 +43,8 @@ public String getName() { return name; } - public Optional getAvroSchema() { - return avroSchema; + public Optional getSchema() { + return schema; } public int getNumPartitions() { diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java index 2bf50f659194..462067dde69b 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/utils/SerdeUtil.java @@ -15,7 +15,12 @@ package io.confluent.ksql.test.utils; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; import com.google.common.collect.ImmutableMap; +import io.confluent.connect.avro.AvroData; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import io.confluent.ksql.model.WindowType; @@ -26,12 +31,15 @@ import io.confluent.ksql.serde.delimited.DelimitedFormat; import io.confluent.ksql.serde.json.JsonFormat; import io.confluent.ksql.serde.kafka.KafkaFormat; +import io.confluent.ksql.serde.protobuf.ProtobufFormat; import io.confluent.ksql.test.serde.SerdeSupplier; import io.confluent.ksql.test.serde.avro.ValueSpecAvroSerdeSupplier; import io.confluent.ksql.test.serde.json.ValueSpecJsonSerdeSupplier; import io.confluent.ksql.test.serde.kafka.KafkaSerdeSupplier; +import io.confluent.ksql.test.serde.protobuf.ValueSpecProtobufSerdeSupplier; import io.confluent.ksql.test.serde.string.StringSerdeSupplier; import io.confluent.ksql.test.tools.exceptions.InvalidFieldException; +import java.util.Optional; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; @@ -44,6 +52,8 @@ public final class SerdeUtil { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private SerdeUtil() { } @@ -52,19 +62,39 @@ public static SerdeSupplier getSerdeSupplier( final LogicalSchema schema ) { switch (format.name()) { - case AvroFormat.NAME: - return new ValueSpecAvroSerdeSupplier(); - case JsonFormat.NAME: - return new ValueSpecJsonSerdeSupplier(); - case DelimitedFormat.NAME: - return new StringSerdeSupplier(); - case KafkaFormat.NAME: - return new KafkaSerdeSupplier(schema); + case AvroFormat.NAME: return new ValueSpecAvroSerdeSupplier(); + case ProtobufFormat.NAME: return new ValueSpecProtobufSerdeSupplier(); + case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(); + case DelimitedFormat.NAME: return new StringSerdeSupplier(); + case KafkaFormat.NAME: return new KafkaSerdeSupplier(schema); default: throw new InvalidFieldException("format", "unsupported value: " + format); } } + public static Optional buildSchema(final JsonNode schema, final String format) { + if (schema instanceof NullNode) { + return Optional.empty(); + } + + // format == null is the legacy case + if (format == null || format.equalsIgnoreCase(AvroFormat.NAME)) { + try { + final String schemaString = OBJECT_MAPPER.writeValueAsString(schema); + final org.apache.avro.Schema avroSchema = + new org.apache.avro.Schema.Parser().parse(schemaString); + return Optional.of( + new AvroFormat() + .toParsedSchema(new AvroData(1).toConnectSchema(avroSchema)) + ); + } catch (final Exception e) { + throw new InvalidFieldException("schema", "failed to parse", e); + } + } + + throw new InvalidFieldException("schema", "not supported with format: " + format); + } + @SuppressWarnings("unchecked") public static SerdeSupplier getKeySerdeSupplier( final KeyFormat keyFormat, diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/EndToEndEngineTestUtil.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/EndToEndEngineTestUtil.java index 71485eb9d305..bbb2956fe2d3 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/EndToEndEngineTestUtil.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/EndToEndEngineTestUtil.java @@ -18,19 +18,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Maps; import io.confluent.connect.avro.AvroData; import io.confluent.ksql.test.tools.TestCase; import io.confluent.ksql.test.tools.TestExecutor; -import io.confluent.ksql.test.tools.exceptions.InvalidFieldException; import java.nio.file.Path; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; @@ -233,17 +230,4 @@ static String buildTestName( return fileName + " - " + testName + pf; } - static Optional buildAvroSchema(final JsonNode schema) { - if (schema instanceof NullNode) { - return Optional.empty(); - } - - try { - final String schemaString = OBJECT_MAPPER.writeValueAsString(schema); - final org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser(); - return Optional.of(parser.parse(schemaString)); - } catch (final Exception e) { - throw new InvalidFieldException("schema", "failed to parse", e); - } - } } \ No newline at end of file diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/SchemaTranslationTest.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/SchemaTranslationTest.java index b7b33e20316a..04a576b70ef3 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/SchemaTranslationTest.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/SchemaTranslationTest.java @@ -2,7 +2,6 @@ import static io.confluent.ksql.test.EndToEndEngineTestUtil.avroToJson; import static io.confluent.ksql.test.EndToEndEngineTestUtil.avroToValueSpec; -import static io.confluent.ksql.test.EndToEndEngineTestUtil.buildAvroSchema; import static io.confluent.ksql.test.EndToEndEngineTestUtil.buildTestName; import static java.util.Objects.requireNonNull; @@ -11,6 +10,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import io.confluent.avro.random.generator.Generator; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.ksql.serde.avro.AvroFormat; import io.confluent.ksql.test.loader.JsonTestLoader; import io.confluent.ksql.test.loader.TestFile; import io.confluent.ksql.test.tools.Record; @@ -18,7 +19,9 @@ import io.confluent.ksql.test.tools.Topic; import io.confluent.ksql.test.tools.VersionBounds; import io.confluent.ksql.test.tools.conditions.PostConditions; +import io.confluent.ksql.test.tools.exceptions.InvalidFieldException; import io.confluent.ksql.test.tools.exceptions.MissingFieldException; +import io.confluent.ksql.test.utils.SerdeUtil; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; @@ -133,15 +136,25 @@ public Stream buildTests(final Path testPath) { static class SttCaseNode { private final String name; - private final Schema schema; + private final AvroSchema schema; SttCaseNode( @JsonProperty("name") final String name, + @JsonProperty("format") final String format, @JsonProperty("schema") final JsonNode schema ) { + if (!format.equalsIgnoreCase(AvroFormat.NAME)) { + throw new InvalidFieldException( + "format", + "unsupported format for schema translation test: " + format + ); + } + this.name = name == null ? "" : name; - this.schema = buildAvroSchema(requireNonNull(schema, "schema")) - .orElseThrow(() -> new MissingFieldException("schema")); + this.schema = (AvroSchema) + SerdeUtil + .buildSchema(requireNonNull(schema, "schema"), format) + .orElseThrow(() -> new MissingFieldException("schema")); if (this.name.isEmpty()) { throw new MissingFieldException("name"); @@ -160,10 +173,10 @@ Stream buildTests(final Path testPath) { Optional.of(schema) ); - final List inputRecords = generateInputRecords(srcTopic, schema); + final List inputRecords = generateInputRecords(srcTopic, schema.rawSchema()); final List outputRecords = getOutputRecords(OUTPUT_TOPIC, inputRecords); - final String csasStatement = schema.getFields() + final String csasStatement = schema.rawSchema().getFields() .stream() .map(Schema.Field::name) .collect( diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java index e840a5c4f782..f35a963d97ae 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java @@ -166,7 +166,7 @@ private void initializeTopics(final List topics) { createJob ); - topic.getAvroSchema().ifPresent(schema -> { + topic.getSchema().ifPresent(schema -> { try { serviceContext.getSchemaRegistryClient() .register(topic.getName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, schema); diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/tools/StubKafkaServiceTest.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/tools/StubKafkaServiceTest.java index a5b5f0ff2cb4..e4c78fab8092 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/tools/StubKafkaServiceTest.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/tools/StubKafkaServiceTest.java @@ -18,11 +18,11 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.ksql.test.tools.stubs.StubKafkaRecord; import io.confluent.ksql.test.tools.stubs.StubKafkaService; import java.util.List; import java.util.Optional; -import org.apache.avro.Schema; import org.apache.kafka.clients.producer.ProducerRecord; import org.hamcrest.CoreMatchers; import org.junit.Before; @@ -35,7 +35,7 @@ public class StubKafkaServiceTest { @Mock - private Schema avroSchema; + private ParsedSchema avroSchema; @Mock private ProducerRecord producerRecord; @Mock diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/collect-list.json b/ksql-functional-tests/src/test/resources/query-validation-tests/collect-list.json index 5bc2a6c0c787..4155a8026ccb 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/collect-list.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/collect-list.json @@ -10,7 +10,7 @@ "tests": [ { "name": "collect_list int", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE integer) WITH (kafka_topic='test_topic',value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" @@ -30,7 +30,7 @@ }, { "name": "collect_list long", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" @@ -50,7 +50,7 @@ }, { "name": "collect_list double", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE double) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" @@ -70,7 +70,7 @@ }, { "name": "collect_list string", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE varchar) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" @@ -92,7 +92,7 @@ }, { "name": "collect_list bool map", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE map) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value['key1']) AS collected FROM test group by id;" @@ -110,7 +110,7 @@ }, { "name": "collect_list int table", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE TABLE TEST (ROWKEY BIGINT KEY, ID bigint, VALUE integer) WITH (kafka_topic='test_topic',value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" @@ -132,7 +132,7 @@ }, { "name": "collect_list long table", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE TABLE TEST (ROWKEY BIGINT KEY, ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" @@ -154,7 +154,7 @@ }, { "name": "collect_list double table", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE TABLE TEST (ROWKEY BIGINT KEY, ID bigint, VALUE double) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" @@ -176,7 +176,7 @@ }, { "name": "collect_list string table", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE TABLE TEST (ROWKEY BIGINT KEY, ID bigint, VALUE varchar) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value) as collected FROM test group by id;" @@ -201,7 +201,7 @@ }, { "name": "collect_list bool map table", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE TABLE TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE map) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_list(value['key1']) AS collected FROM test group by id;" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/collect-set.json b/ksql-functional-tests/src/test/resources/query-validation-tests/collect-set.json index b409311ef655..6f86487098dc 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/collect-set.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/collect-set.json @@ -10,7 +10,7 @@ "tests": [ { "name": "collect_set int", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE integer) WITH (kafka_topic='test_topic',value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_set(value) as collected FROM test group by id;" @@ -30,7 +30,7 @@ }, { "name": "collect_set long", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_set(value) as collected FROM test group by id;" @@ -50,7 +50,7 @@ }, { "name": "collect_set double", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE double) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_set(value) as collected FROM test group by id;" @@ -70,7 +70,7 @@ }, { "name": "collect_set string", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE varchar) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_set(value) as collected FROM test group by id;" @@ -92,7 +92,7 @@ }, { "name": "collect_set bool map", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE map) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, collect_set(value['key1']) AS collected FROM test group by id;" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/concat.json b/ksql-functional-tests/src/test/resources/query-validation-tests/concat.json index 51cfc0923a4a..cfcdafe491e4 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/concat.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/concat.json @@ -5,7 +5,7 @@ "tests": [ { "name": "concat fields using CONCAT", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT CONCAT('prefix-', CONCAT(source, '-postfix')) AS THING FROM TEST;" @@ -21,7 +21,7 @@ }, { "name": "concat fields using '+' operator", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT 'prefix-' + source + '-postfix' AS THING FROM TEST;" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json index a2828d8265ff..dde39bc6ef34 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json @@ -24,7 +24,7 @@ }, { "name": "select delimited value_format into another format", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter=',');", "CREATE STREAM S2 WITH(value_format='{FORMAT}') as SELECT id, name, value FROM test;" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json index 14db162b6fcb..436640e89090 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -41,7 +41,7 @@ }, { "name": "validate with elements OK", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 INT) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -113,7 +113,7 @@ }, { "name": "validate boolean elements OK", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 BOOLEAN) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -143,7 +143,7 @@ }, { "name": "validate int elements OK", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 INT) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -173,7 +173,7 @@ }, { "name": "validate bigint elements OK", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 BIGINT) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -203,7 +203,7 @@ }, { "name": "validate double elements OK", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 DOUBLE) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -223,7 +223,7 @@ }, { "name": "validate string elements OK", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 STRING) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -256,7 +256,7 @@ }, { "name": "validate array element OK", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 ARRAY) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -289,7 +289,7 @@ }, { "name": "validate map element OK", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 MAP) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -322,7 +322,7 @@ }, { "name": "validate struct element OK", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (V0 STRUCT) WITH (kafka_topic='input', value_format='{FORMAT}');", "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" @@ -359,7 +359,7 @@ ], "expectedException": { "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Cannot register avro schema for OUTPUT as the schema is incompatible with the current schema version registered for the topic.\nKSQL schema: {\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"C1\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}\nRegistered schema: {\"type\":\"record\",\"name\":\"blah\",\"fields\":[{\"name\":\"C1\",\"type\":\"double\"}]}" + "message": "Cannot register avro schema for OUTPUT as the schema is incompatible with the current schema version registered for the topic.\nKSQL schema: {\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"C1\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}\nRegistered schema: {\"type\":\"record\",\"name\":\"blah\",\"fields\":[{\"name\":\"C1\",\"type\":\"double\"}],\"connect.name\":\"blah\"}" }, "topics": [ { diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/group-by.json b/ksql-functional-tests/src/test/resources/query-validation-tests/group-by.json index 6d62aedc8684..d034e607f9fb 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/group-by.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/group-by.json @@ -51,7 +51,7 @@ "CREATE STREAM TEST (data VARCHAR) WITH (kafka_topic='test_topic', KEY='data', value_format='{FORMAT}');", "CREATE TABLE OUTPUT AS SELECT data, COUNT(*) FROM TEST GROUP BY DATA;" ], - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "inputs": [ {"topic": "test_topic", "key": "d1", "value": {"DATA": "d1"}, "timestamp": 1}, {"topic": "test_topic", "key": "d2", "value": {"DATA": "d2"}, "timestamp": 2}, @@ -167,7 +167,7 @@ "CREATE STREAM TEST (ROWKEY INT KEY, f1 INT, f2 VARCHAR) WITH (kafka_topic='test_topic', KEY='f1', value_format='{FORMAT}');", "CREATE TABLE OUTPUT AS SELECT f1, f2, COUNT(*) FROM TEST GROUP BY f2, f1;" ], - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "inputs": [ {"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}}, {"topic": "test_topic", "key": 2, "value": {"F1": 2, "F2": "b"}}, @@ -260,7 +260,7 @@ "CREATE TABLE TEST (ROWKEY INT KEY, f1 INT, f2 VARCHAR) WITH (kafka_topic='test_topic', KEY='f1', value_format='{FORMAT}');", "CREATE TABLE OUTPUT AS SELECT f1, f2, COUNT(*) FROM TEST GROUP BY f2, f1;" ], - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "inputs": [ {"topic": "test_topic", "key": 1, "value": {"F1": 1, "F2": "a"}}, {"topic": "test_topic", "key": 2, "value": {"F1": 2, "F2": "b"}}, @@ -368,7 +368,7 @@ "CREATE STREAM TEST (data VARCHAR) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", "CREATE TABLE OUTPUT AS SELECT data, COUNT(*) FROM TEST GROUP BY DATA;" ], - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "inputs": [ {"topic": "test_topic", "value": {"DATA": "d1"}}, {"topic": "test_topic", "value": {"DATA": "d2"}}, diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/histogram.json b/ksql-functional-tests/src/test/resources/query-validation-tests/histogram.json index d5bb57adfe35..c6ad36e70d33 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/histogram.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/histogram.json @@ -10,7 +10,7 @@ "tests": [ { "name": "histogram string", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, VALUE varchar) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, histogram(value) as counts FROM test group by id;" @@ -32,7 +32,7 @@ }, { "name": "histogram on a table", - "format": ["AVRO","JSON"], + "format": ["AVRO","JSON", "PROTOBUF"], "statements": [ "CREATE TABLE TEST (ID bigint, NAME varchar, REGION string) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", "CREATE TABLE COUNT_BY_REGION AS SELECT region, histogram(name) AS COUNTS FROM TEST GROUP BY region;" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/initcap.json b/ksql-functional-tests/src/test/resources/query-validation-tests/initcap.json index e7150bba8607..45dc5ad1b344 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/initcap.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/initcap.json @@ -5,7 +5,7 @@ "tests": [ { "name": "do initcap", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "properties": { "ksql.functions.substring.legacy.args": false }, diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/join-with-custom-timestamp.json b/ksql-functional-tests/src/test/resources/query-validation-tests/join-with-custom-timestamp.json index 1c5f237ef871..5dac6aae0560 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/join-with-custom-timestamp.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/join-with-custom-timestamp.json @@ -10,7 +10,7 @@ "tests": [ { "name": "stream stream inner join with ts", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM S1 (ROWKEY BIGINT KEY, ID bigint, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='{FORMAT}', key='ID');", "CREATE STREAM S2 (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 varchar) WITH (kafka_topic='s2', value_format='{FORMAT}', key='ID');", @@ -31,7 +31,7 @@ }, { "name": "stream stream inner join with ts extractor both sides", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM S1 (ROWKEY BIGINT KEY, ID bigint, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='{FORMAT}', key='ID');", "CREATE STREAM S2 (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 varchar, RTS bigint) WITH (timestamp='RTS', kafka_topic='s2', value_format='{FORMAT}', key='ID');", @@ -52,7 +52,7 @@ }, { "name": "stream table join with ts extractor both sides", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM S1 (ROWKEY BIGINT KEY, ID bigint, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='{FORMAT}', key='ID');", "CREATE TABLE T1 (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 varchar, RTS bigint) WITH (timestamp='RTS', kafka_topic='t1', value_format='{FORMAT}', key='ID');", @@ -73,7 +73,7 @@ }, { "name": "table table inner join with ts", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE TABLE S1 (ROWKEY BIGINT KEY, ID bigint, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 varchar) WITH (kafka_topic='s2', value_format='{FORMAT}', key='ID');", @@ -94,7 +94,7 @@ }, { "name": "table table inner join with ts extractor both sides", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE TABLE S1 (ROWKEY BIGINT KEY, ID bigint, NAME varchar, TS bigint) WITH (timestamp='TS', kafka_topic='s1', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 varchar, RTS bigint) WITH (timestamp='RTS', kafka_topic='s2', value_format='{FORMAT}', key='ID');", diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json b/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json index 97da7a6c2788..bea933d148d7 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json @@ -41,6 +41,47 @@ ] } }, + { + "name": "stream stream left join - PROTOBUF", + "format": ["PROTOBUF"], + "statements": [ + "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}', key='ID');", + "CREATE STREAM TEST_STREAM (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');", + "CREATE STREAM LEFT_OUTER_JOIN as SELECT t.id, name, value, f1, f2 FROM test t left join TEST_STREAM tt WITHIN 11 seconds ON t.id = tt.id;" + ], + "inputs": [ + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}, + {"topic": "right_topic", "key": 0, "value": {"ID": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "left_topic", "key": 10, "value": {"ID": 10, "NAME": "100", "VALUE": 5}, "timestamp": 11000}, + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 13000}, + {"topic": "right_topic", "key": 0, "value": {"ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "right_topic", "key": 100, "value": {"ID": 100, "F1": "newblah", "F2": 150}, "timestamp": 16000}, + {"topic": "left_topic", "key": 90, "value": {"ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 17000}, + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 30000} + ], + "outputs": [ + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "window": {"start": 0, "end": 11000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 0, "T_ROWKEY": 0, "T_ID": 0, "T_NAME": "zero", "T_VALUE": 0}, "timestamp": 0}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000009-store-changelog", "window": {"start": 10000, "end": 21000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 10000, "TT_ROWKEY": 0, "TT_ID": 0, "TT_F1": "blah", "TT_F2": 50}, "timestamp": 10000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "window": {"start": 11000, "end": 22000, "type": "time"}, "key": 10, "value": {"T_ROWTIME": 11000, "T_ROWKEY": 10, "T_ID": 10, "T_NAME": "100", "T_VALUE": 5}, "timestamp": 11000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "window": {"start": 13000, "end": 24000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 13000, "T_ROWKEY": 0, "T_ID": 0, "T_NAME": "foo", "T_VALUE": 100}, "timestamp": 13000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000009-store-changelog", "window": {"start": 15000, "end": 26000, "type": "time"}, "key": 0, "value": {"TT_ROWTIME": 15000, "TT_ROWKEY": 0, "TT_ID": 0, "TT_F1": "a", "TT_F2": 10}, "timestamp": 15000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-OUTEROTHER-0000000009-store-changelog", "window": {"start": 16000, "end": 27000, "type": "time"}, "key": 100, "value": {"TT_ROWTIME": 16000, "TT_ROWKEY": 100, "TT_ID": 100, "TT_F1": "newblah", "TT_F2": 150}, "timestamp": 16000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "window": {"start": 17000, "end": 28000, "type": "time"}, "key": 90, "value": {"T_ROWTIME": 17000, "T_ROWKEY": 90, "T_ID": 90, "T_NAME": "ninety", "T_VALUE": 90}, "timestamp": 17000}, + {"topic": "_confluent-ksql-some.ksql.service.idquery_CSAS_LEFT_OUTER_JOIN_0-KSTREAM-JOINTHIS-0000000008-store-changelog", "window": {"start": 30000, "end": 41000, "type": "time"}, "key": 0, "value": {"T_ROWTIME": 30000, "T_ROWKEY": 0, "T_ID": 0, "T_NAME": "bar", "T_VALUE": 99}, "timestamp": 30000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "", "F2": 0}, "timestamp": 0}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": "", "F2": 0}, "timestamp": 11000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "blah", "F2": 50}, "timestamp": 13000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "LEFT_OUTER_JOIN", "key": 90, "value": {"T_ID": 90, "NAME": "ninety", "VALUE": 90, "F1": "", "F2": 0}, "timestamp": 17000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "", "F2": 0}, "timestamp": 30000} + ], + "post": { + "sources": [ + {"name": "LEFT_OUTER_JOIN", "type": "stream", "keyField": "T_ID"} + ] + } + }, { "name": "stream stream left join - KAFKA", "statements": [ @@ -55,7 +96,7 @@ }, { "name": "stream stream left join with rowkey - rekey", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "enabled": false, "comment": "disabled until https://github.com/confluentinc/ksql/issues/4094 is done", "statements": [ @@ -90,7 +131,7 @@ }, { "name": "stream stream left join - rekey", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "enabled": false, "comment": "disabled until https://github.com/confluentinc/ksql/issues/4094 is done", "statements": [ @@ -244,7 +285,7 @@ }, { "name": "stream stream inner join", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}', key='ID');", "CREATE STREAM TEST_STREAM (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');", @@ -273,7 +314,7 @@ }, { "name": "stream stream inner join all left fields some right", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}', key='ID');", "CREATE STREAM TEST_STREAM (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');", @@ -297,7 +338,7 @@ }, { "name": "stream stream inner join all right fields some left", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}', key='ID');", "CREATE STREAM TEST_STREAM (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');", @@ -329,7 +370,7 @@ }, { "name": "stream stream inner join all fields", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar) WITH (kafka_topic='left_topic', value_format='{FORMAT}', key='ID');", "CREATE STREAM TEST_STREAM (ROWKEY BIGINT KEY, ID bigint, F1 varchar) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');", @@ -366,7 +407,7 @@ }, { "name": "stream stream inner join with different before and after windows", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}', key='ID');", "CREATE STREAM TEST_STREAM (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');", @@ -389,7 +430,7 @@ }, { "name": "stream stream inner join with out of order messages", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}', key='ID');", "CREATE STREAM TEST_STREAM (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');", @@ -511,6 +552,43 @@ ] } }, + { + "name": "stream stream outer join - PROTOBUF", + "statements": [ + "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='PROTOBUF', key='ID');", + "CREATE STREAM TEST_STREAM (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='PROTOBUF', key='ID');", + "CREATE STREAM LEFT_OUTER_JOIN as SELECT t.id, name, value, f1, f2 FROM test t FULL OUTER join TEST_STREAM tt WITHIN 11 seconds on t.id = tt.id;" + ], + "inputs": [ + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}, + {"topic": "right_topic", "key": 0, "value": {"ID": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "left_topic", "key": 10, "value": {"ID": 10, "NAME": "100", "VALUE": 5}, "timestamp": 11000}, + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 13000}, + {"topic": "right_topic", "key": 0, "value": {"ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 30000}, + {"topic": "left_topic", "key": 90, "value": {"ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 17000}, + {"topic": "right_topic", "key": 100, "value": {"ID": 100, "F1": "newblah", "F2": 150}, "timestamp": 20000} + + ], + "outputs": [ + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "", "F2": 0}, "timestamp": 0}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": "", "F2": 0}, "timestamp": 11000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "blah", "F2": 50}, "timestamp": 13000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "", "F2": 0}, "timestamp": 30000}, + {"topic": "LEFT_OUTER_JOIN", "key": 90, "value": {"T_ID": 90, "NAME": "ninety", "VALUE": 90, "F1": "", "F2": 0}, "timestamp": 17000}, + {"topic": "LEFT_OUTER_JOIN", "key": 100, "value": {"T_ID": 0, "NAME": "", "VALUE": 0, "F1": "newblah", "F2": 150}, "timestamp": 20000} + ], + "post": { + "comments": [ + "key field is null because field 'T_ID' does not always match the key of the output record due to it being a full outer join" + ], + "sources": [ + {"name": "LEFT_OUTER_JOIN", "type": "stream", "keyField": null} + ] + } + }, { "name": "stream stream outer join - right join key in projection", "statements": [ @@ -580,6 +658,37 @@ ] } }, + { + "name": "table table left join - PROTOBUF", + "statements": [ + "CREATE TABLE TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='PROTOBUF', key='ID');", + "CREATE TABLE TEST_TABLE (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='PROTOBUF', key='ID');", + "CREATE TABLE LEFT_OUTER_JOIN as SELECT t.id, name, value, f1, f2 FROM test t left join TEST_TABLE tt on t.id = tt.id;" + ], + "inputs": [ + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}, + {"topic": "right_topic", "key": 0, "value": {"ID": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "left_topic", "key": 10, "value": {"ID": 10, "NAME": "100", "VALUE": 5}, "timestamp": 11000}, + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 13000}, + {"topic": "right_topic", "key": 0, "value": {"ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 16000}, + {"topic": "left_topic", "key": 90, "value": {"ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 17000} + ], + "outputs": [ + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "", "F2": 0}, "timestamp": 0}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "LEFT_OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": "", "F2": 0}, "timestamp": 11000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "blah", "F2": 50}, "timestamp": 13000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "LEFT_OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "a", "F2": 10}, "timestamp": 16000}, + {"topic": "LEFT_OUTER_JOIN", "key": 90, "value": {"T_ID": 90, "NAME": "ninety", "VALUE": 90, "F1": "", "F2": 0}, "timestamp": 17000} + ], + "post": { + "sources": [ + {"name": "LEFT_OUTER_JOIN", "type": "table", "keyField": "T_ID"} + ] + } + }, { "name": "table table left join - join key not in projection", "statements": [ @@ -678,7 +787,7 @@ }, { "name": "table table inner join", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE TABLE TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE TEST_TABLE (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='{FORMAT}', key='ID');", @@ -802,6 +911,40 @@ ] } }, + { + "name": "table table outer join - PROTOBUF", + "statements": [ + "CREATE TABLE TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='left_topic', value_format='PROTOBUF', key='ID');", + "CREATE TABLE TEST_TABLE (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='right_topic', value_format='PROTOBUF', key='ID');", + "CREATE TABLE OUTER_JOIN as SELECT t.id, name, value, f1, f2 FROM test t FULL OUTER join TEST_TABLE tt on t.id = tt.id;" + ], + "inputs": [ + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}, + {"topic": "right_topic", "key": 0, "value": {"ID": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "left_topic", "key": 10, "value": {"ID": 10, "NAME": "100", "VALUE": 5}, "timestamp": 11000}, + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 13000}, + {"topic": "right_topic", "key": 0, "value": {"ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "right_topic", "key": 15, "value": {"ID": 15, "F1": "c", "F2": 20}, "timestamp": 15500}, + {"topic": "left_topic", "key": 0, "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 16000} + ], + "outputs": [ + {"topic": "OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "", "F2": 0}, "timestamp": 0}, + {"topic": "OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "zero", "VALUE": 0, "F1": "blah", "F2": 50}, "timestamp": 10000}, + {"topic": "OUTER_JOIN", "key": 10, "value": {"T_ID": 10, "NAME": "100", "VALUE": 5, "F1": "", "F2": 0}, "timestamp": 11000}, + {"topic": "OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "blah", "F2": 50}, "timestamp": 13000}, + {"topic": "OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "OUTER_JOIN", "key": 15, "value": {"T_ID": 0, "NAME": "", "VALUE": 0, "F1": "c", "F2": 20}, "timestamp": 15500}, + {"topic": "OUTER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "a", "F2": 10}, "timestamp": 16000} + ], + "post": { + "comments": [ + "key field is null because field 'T_ID' does not always match the key of the output record due to it being a full outer join" + ], + "sources": [ + {"name": "OUTER_JOIN", "type": "table", "keyField": null} + ] + } + }, { "name": "table table outer join - right join key in projection", "statements": [ @@ -865,6 +1008,34 @@ ] } }, + { + "name": "stream table left join - PROTOBUF", + "statements": [ + "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='PROTOBUF', key='ID');", + "CREATE TABLE TEST_TABLE (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='test_table', value_format='PROTOBUF', key='ID');", + "CREATE STREAM LEFT_JOIN as SELECT t.id, name, value, f1, f2 FROM test t left join test_table tt on t.id = tt.id;" + ], + "inputs": [ + {"topic": "test_table", "key": 0, "value": {"ID": 0, "F1": "zero", "F2": 0}, "timestamp": 0}, + {"topic": "test_table", "key": 10, "value": {"ID": 10, "F1": "100", "F2": 5}, "timestamp": 10000}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "blah", "VALUE": 50}, "timestamp": 10000}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 10000}, + {"topic": "test_table", "key": 0, "value": {"ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 15000}, + {"topic": "test_topic", "key": 90, "value": {"ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 15000} + ], + "outputs": [ + {"topic": "LEFT_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "blah", "VALUE": 50, "F1": "zero", "F2": 0}, "timestamp": 10000}, + {"topic": "LEFT_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "zero", "F2": 0}, "timestamp": 10000}, + {"topic": "LEFT_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "LEFT_JOIN", "key": 90, "value": {"T_ID": 90, "NAME": "ninety", "VALUE": 90, "F1": "", "F2": 0}, "timestamp": 15000} + ], + "post": { + "sources": [ + {"name": "LEFT_JOIN", "type": "stream", "keyField": "T_ID"} + ] + } + }, { "name": "stream table left join - join key not in projection", "statements": [ @@ -926,7 +1097,7 @@ }, { "name": "stream table inner join", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE TEST_TABLE (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='test_table', value_format='{FORMAT}', key='ID');", @@ -1011,7 +1182,7 @@ }, { "name": "join using ROWKEY in the criteria", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE TEST_TABLE (ROWKEY BIGINT KEY, ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='test_table', value_format='{FORMAT}', key='ID');", @@ -1793,7 +1964,7 @@ }, { "name": "on INT column - KAFKA", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM L (l0 INT, l1 INT) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", "CREATE STREAM R (r0 INT, r1 INT) WITH (kafka_topic='right_topic', value_format='{FORMAT}');", @@ -1814,7 +1985,7 @@ }, { "name": "on BIGINT column - KAFKA", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM L (l0 BIGINT, l1 INT) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", "CREATE STREAM R (r0 BIGINT, r1 INT) WITH (kafka_topic='right_topic', value_format='{FORMAT}');", @@ -1835,7 +2006,7 @@ }, { "name": "on DOUBLE column - KAFKA", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM L (l0 DOUBLE, l1 INT) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", "CREATE STREAM R (r0 DOUBLE, r1 INT) WITH (kafka_topic='right_topic', value_format='{FORMAT}');", @@ -1856,7 +2027,7 @@ }, { "name": "on STRING column - KAFKA", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM L (l0 STRING, l1 INT) WITH (kafka_topic='left_topic', value_format='{FORMAT}');", "CREATE STREAM R (r0 STRING, r1 INT) WITH (kafka_topic='right_topic', value_format='{FORMAT}');", diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/protobuf.json b/ksql-functional-tests/src/test/resources/query-validation-tests/protobuf.json new file mode 100644 index 000000000000..0a475cf36347 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/protobuf.json @@ -0,0 +1,69 @@ +{ + "tests": [ + { + "name": "protobuf primitives", + "statements": [ + "CREATE STREAM INPUT (i INTEGER, l BIGINT, d DOUBLE, b BOOLEAN, s STRING) WITH (kafka_topic='input', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT as SELECT * FROM input;" + ], + "inputs": [{"topic": "input", "value": {"i": 1, "l": 1, "d": 1.2, "b": true, "s": "foo"}}], + "outputs": [{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.2, "B": true, "S": "foo"}}] + }, + { + "name": "protobuf containers", + "statements": [ + "CREATE STREAM INPUT (astr ARRAY, mstr MAP) WITH (kafka_topic='input', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT as SELECT * FROM input;" + ], + "inputs": [{"topic": "input", "value": {"astr": ["1", "2"], "mstr": {"1": "a"}}}], + "outputs": [{"topic": "OUTPUT", "value": {"ASTR": ["1", "2"], "MSTR": {"1": "a"}}}] + }, + { + "name": "protobuf defaults - top level nulls are defaulted to 0", + "statements": [ + "CREATE STREAM INPUT (i INTEGER) WITH (kafka_topic='input', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT as SELECT * FROM input;" + ], + "inputs": [ + {"topic": "input", "value": {"i": 0}}, + {"topic": "input", "value": {"i": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"I": 0}}, + {"topic": "OUTPUT", "value": {"I": 0}} + ] + }, + { + "name": "protobuf defaults - nested nulls are defaulted to 0", + "statements": [ + "CREATE STREAM INPUT (i ARRAY) WITH (kafka_topic='input', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT as SELECT * FROM input;" + ], + "inputs": [ + {"topic": "input", "value": {"i": [0]}}, + {"topic": "input", "value": {"i": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"I": [0]}}, + {"topic": "OUTPUT", "value": {"I": []}} + ] + }, + { + "name": "protobuf defaults - empty struct fills defaults and nulls remain nulls", + "statements": [ + "CREATE STREAM INPUT (s STRUCT) WITH (kafka_topic='input', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT as SELECT * FROM input;" + ], + "inputs": [ + {"topic": "input", "value": {"s": {"foo": 0}}}, + {"topic": "input", "value": {"s": {}}}, + {"topic": "input", "value": {"s": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"S": {"FOO": 0}}}, + {"topic": "OUTPUT", "value": {"S": {"FOO": 0}}}, + {"topic": "OUTPUT", "value": {"S": null}} + ] + } + ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/replace.json b/ksql-functional-tests/src/test/resources/query-validation-tests/replace.json index 98475afbcc53..c6dcecb7070c 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/replace.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/replace.json @@ -5,7 +5,7 @@ "tests": [ { "name": "replace", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "properties": { "ksql.functions.substring.legacy.args": false }, diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json b/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json index dc322587c0fe..f4dc072edcb8 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/serdes.json @@ -93,6 +93,21 @@ {"topic": "OUTPUT", "value": null} ] }, + { + "name": "deserialize nested primitive - PROTOBUF", + "statements": [ + "CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "value": {"FOO": 10}}, + {"topic": "input_topic", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"FOO": 10}}, + {"topic": "OUTPUT", "value": null} + ] + }, { "name": "deserialize anonymous array - value", "comments": [ @@ -166,6 +181,23 @@ {"topic": "OUTPUT", "value": null} ] }, + { + "name": "deserialize nested array - value - PROTOBUF", + "statements": [ + "CREATE STREAM INPUT (foo ARRAY) WITH (WRAP_SINGLE_VALUE=true, kafka_topic='input_topic', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "value": {"FOO": ["a", "b", "c"]}}, + {"topic": "input_topic", "value": {"FOO": null}}, + {"topic": "input_topic", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"FOO": ["a", "b", "c"]}}, + {"topic": "OUTPUT", "value": {"FOO": []}}, + {"topic": "OUTPUT", "value": null} + ] + }, { "name": "deserialize anonymous array - value - non-nullable - AVRO", "statements": [ @@ -296,6 +328,32 @@ ] } }, + { + "name": "deserialize nested map - value - PROTOBUF", + "statements": [ + "CREATE STREAM INPUT (foo MAP) WITH (kafka_topic='input_topic', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "value": {"FOO": {"a": 1, "b": 2, "c": 3}}}, + {"topic": "input_topic", "value": {"FOO": null}}, + {"topic": "input_topic", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"FOO": {"a": 1, "b": 2, "c": 3}}}, + {"topic": "OUTPUT", "value": {"FOO": {}}}, + {"topic": "OUTPUT", "value": null} + ], + "post": { + "sources": [ + { + "name": "INPUT", + "type": "stream", + "schema": "ROWKEY STRING KEY, FOO MAP" + } + ] + } + }, { "name": "deserialize anonymous map - value - non-nullable - AVRO", "statements": [ @@ -424,6 +482,23 @@ {"topic": "OUTPUT", "value": null} ] }, + { + "name": "serialize nested primitive - value - PROTOBUF", + "statements": [ + "CREATE STREAM INPUT (foo BOOLEAN) WITH (kafka_topic='input_topic', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT WITH (WRAP_SINGLE_VALUE=true) AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "value": {"FOO": true}}, + {"topic": "input_topic", "value": {"FOO": null}}, + {"topic": "input_topic", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"FOO": true}}, + {"topic": "OUTPUT", "value": {"FOO": false}}, + {"topic": "OUTPUT", "value": null} + ] + }, { "name": "serialize anonymous array - value", "format": ["JSON", "AVRO"], @@ -476,6 +551,23 @@ {"topic": "OUTPUT", "value": null} ] }, + { + "name": "serialize nested array - value - PROTOBUF", + "statements": [ + "CREATE STREAM INPUT (foo ARRAY) WITH (kafka_topic='input_topic', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT WITH (WRAP_SINGLE_VALUE=true) AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "value": {"FOO": [12, 34, 999]}}, + {"topic": "input_topic", "value": {"FOO": null}}, + {"topic": "input_topic", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"FOO": [12, 34, 999]}}, + {"topic": "OUTPUT", "value": {"FOO": []}}, + {"topic": "OUTPUT", "value": null} + ] + }, { "name": "serialize anonymous map - value", "format": ["JSON", "AVRO"], @@ -528,6 +620,23 @@ {"topic": "OUTPUT", "value": null} ] }, + { + "name": "serialize nested map - value - PROTOBUF", + "statements": [ + "CREATE STREAM INPUT (foo MAP) WITH (kafka_topic='input_topic', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT WITH (WRAP_SINGLE_VALUE=true) AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "value": {"FOO": {"a": 1.1, "b": 2.2, "c": 3.456}}}, + {"topic": "input_topic", "value": {"FOO": null}}, + {"topic": "input_topic", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"FOO": {"a": 1.1, "b": 2.2, "c": 3.456}}}, + {"topic": "OUTPUT", "value": {"FOO": {}}}, + {"topic": "OUTPUT", "value": null} + ] + }, { "name": "serialize anonymous struct - value", "format": ["JSON", "AVRO"], @@ -582,6 +691,25 @@ {"topic": "OUTPUT", "value": null} ] }, + { + "name": "serialize nested struct - value - PROTOBUF", + "statements": [ + "CREATE STREAM INPUT (foo STRUCT) WITH (kafka_topic='input_topic', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT WITH (WRAP_SINGLE_VALUE=true) AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "value": {"FOO": {"F0": 1}}}, + {"topic": "input_topic", "value": {"FOO": {"F0": null}}}, + {"topic": "input_topic", "value": {"FOO": null}}, + {"topic": "input_topic", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"FOO": {"F0": 1}}}, + {"topic": "OUTPUT", "value": {"FOO": {"F0": 0}}}, + {"topic": "OUTPUT", "value": {"FOO": null}}, + {"topic": "OUTPUT", "value": null} + ] + }, { "name": "serialization should pick up value wrapping from config", "statements": [ @@ -613,7 +741,7 @@ }, { "name": "C*AS should throw for multi-field schema if WRAP_SINGLE_VALUE=true", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE STREAM INPUT (f0 STRING, f1 STRING) WITH (kafka_topic='input_topic', value_format='{FORMAT}');", "CREATE STREAM OUTPUT WITH(WRAP_SINGLE_VALUE=true) AS SELECT * FROM INPUT;" @@ -625,7 +753,7 @@ }, { "name": "C*AS should throw for multi-field schema if WRAP_SINGLE_VALUE=false", - "format": ["JSON", "AVRO"], + "format": ["JSON", "AVRO", "PROTOBUF"], "statements": [ "CREATE TABLE INPUT (f0 STRING, f1 STRING) WITH (kafka_topic='input_topic', value_format='{FORMAT}');", "CREATE TABLE OUTPUT WITH(WRAP_SINGLE_VALUE=false) AS SELECT * FROM INPUT;" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/simple-struct.json b/ksql-functional-tests/src/test/resources/query-validation-tests/simple-struct.json index 5c3e7dce6068..994efa11ce3f 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/simple-struct.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/simple-struct.json @@ -10,7 +10,7 @@ ], "tests": [ { - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM orders (ordertime bigint, orderid bigint, itemid STRUCT< ITEMID BIGINT, NAME VARCHAR, CATEGORY STRUCT< ID BIGINT, NAME VARCHAR>>, ORDERUNITS double, ARRAYCOL array, MAPCOL map, address STRUCT < number bigint, street varchar, city varchar, state varchar, zipcode bigint>) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", "CREATE STREAM s1 AS SELECT * FROM orders;" @@ -635,7 +635,7 @@ }, { "name": "simple struct select filter", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM orders (ordertime bigint, orderid bigint, itemid STRUCT< ITEMID BIGINT, NAME VARCHAR, CATEGORY STRUCT< ID BIGINT, NAME VARCHAR>>, ORDERUNITS double, ARRAYCOL array, MAPCOL map, address STRUCT < number bigint, street varchar, city varchar, state varchar, zipcode bigint>) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", "CREATE STREAM S2 AS SELECT itemid->name FROM orders where itemid->name = 'Item_6';" @@ -856,7 +856,7 @@ }, { "name": "simple struct select filter 2", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM orders (ordertime bigint, orderid bigint, itemid STRUCT< ITEMID BIGINT, NAME VARCHAR, CATEGORY STRUCT< ID BIGINT, NAME VARCHAR>>, ORDERUNITS double, ARRAYCOL array, MAPCOL map, address STRUCT < number bigint, street varchar, city varchar, state varchar, zipcode bigint>) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", "CREATE STREAM S3 AS SELECT itemid->category->id, address->street , address->zipcode as zipcode, address->state as state FROM orders WHERE address->state LIKE '%_9';" @@ -1251,7 +1251,7 @@ } ], "name": "simples struct select filter 3", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM orders (ordertime bigint, orderid bigint, itemid STRUCT< ITEMID BIGINT, NAME VARCHAR, CATEGORY STRUCT< ID BIGINT, NAME VARCHAR>>, ORDERUNITS double, ARRAYCOL array, MAPCOL map, address STRUCT < number bigint, street varchar, city varchar, state varchar, zipcode bigint>) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", "CREATE STREAM S4 AS SELECT itemid->itemid, itemid as iid, itemid->category->name as catname FROM orders WHERE itemid->itemid = 6 OR itemid->category->name = 'Food';" @@ -1516,7 +1516,7 @@ } ], "name": "simple struct select filter 4", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM orders (ordertime bigint, orderid bigint, itemid STRUCT< ITEMID BIGINT, NAME VARCHAR, CATEGORY STRUCT< ID BIGINT, NAME VARCHAR>>, ORDERUNITS double, ARRAYCOL array, MAPCOL map, address STRUCT < number bigint, street varchar, city varchar, state varchar, zipcode bigint>) WITH (kafka_topic='test_topic', value_format='{FORMAT}');", "CREATE STREAM S5 as SELECT itemid->itemid * 10 as itemid, concat(itemid->category->name, '_HELLO') as cname, len(address->state) as state_length FROM orders WHERE address->state LIKE '%1' OR address->state LIKE '%9';" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/substring.json b/ksql-functional-tests/src/test/resources/query-validation-tests/substring.json index 8d4ae1e6b813..fa60e62339e4 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/substring.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/substring.json @@ -8,7 +8,7 @@ "tests": [ { "name": "do substring with just pos", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" @@ -26,7 +26,7 @@ }, { "name": "do substring with pos and length", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" @@ -44,7 +44,7 @@ }, { "name": "should default to current mode for new queries", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 6) AS SUBSTRING FROM TEST;" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/topk-group-by.json b/ksql-functional-tests/src/test/resources/query-validation-tests/topk-group-by.json index 1d8de0381978..ed62ba8028f2 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/topk-group-by.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/topk-group-by.json @@ -10,7 +10,7 @@ "tests": [ { "name": "topk integer", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic',value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, topk(value, 3) as topk FROM test group by id;" @@ -32,7 +32,7 @@ }, { "name": "topk long", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, topk(value, 3) as topk FROM test group by id;" @@ -54,7 +54,7 @@ }, { "name": "topk double", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE double) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, topk(value, 3) as topk FROM test group by id;" @@ -76,7 +76,7 @@ }, { "name": "topk string", - "format": ["AVRO", "JSON"], + "format": ["AVRO", "JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (ROWKEY BIGINT KEY, ID bigint, NAME varchar, VALUE string) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", "CREATE TABLE S2 as SELECT id, topk(value, 3) as topk FROM test group by id;" diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/url.json b/ksql-functional-tests/src/test/resources/query-validation-tests/url.json index ddfcc5ec3a04..7a6ff3b425ea 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/url.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/url.json @@ -6,7 +6,7 @@ "tests": [ { "name": "encode a url parameter using ENCODE_URL_PARAM", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_ENCODE_PARAM(url) as ENCODED FROM TEST;" @@ -24,7 +24,7 @@ }, { "name": "decode a url parameter using DECODE_URL_PARAM", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_DECODE_PARAM(url) as DECODED FROM TEST;" @@ -42,7 +42,7 @@ }, { "name": "extract a fragment from a URL using URL_EXTRACT_FRAGMENT", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_EXTRACT_FRAGMENT(url) as FRAGMENT FROM TEST;" @@ -58,7 +58,7 @@ }, { "name": "extract a host from a URL using URL_EXTRACT_HOST", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_EXTRACT_HOST(url) as HOST FROM TEST;" @@ -74,7 +74,7 @@ }, { "name": "extract a parameter from a URL using URL_EXTRACT_PARAMETER", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_EXTRACT_PARAMETER(url,'one') as PARAM_A, URL_EXTRACT_PARAMETER(url,'two') as PARAM_B FROM TEST;" @@ -88,7 +88,7 @@ }, { "name": "chain a call to URL_EXTRACT_PARAMETER with URL_DECODE_PARAM", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_DECODE_PARAM(URL_EXTRACT_PARAMETER(url,'two')) as PARAM FROM TEST;" @@ -102,7 +102,7 @@ }, { "name": "extract a path from a URL using URL_EXTRACT_PATH", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_EXTRACT_PATH(url) as PATH FROM TEST;" @@ -120,7 +120,7 @@ }, { "name": "extract a port from a URL using URL_EXTRACT_PORT", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_EXTRACT_PORT(url) as PORT FROM TEST;" @@ -136,7 +136,7 @@ }, { "name": "extract a protocol from a URL using URL_EXTRACT_PROTOCOL", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_EXTRACT_PROTOCOL(url) as PROTOCOL FROM TEST;" @@ -154,7 +154,7 @@ }, { "name": "extract a query from a URL using URL_EXTRACT_QUERY", - "format": ["JSON"], + "format": ["JSON", "PROTOBUF"], "statements": [ "CREATE STREAM TEST (url VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT URL_EXTRACT_QUERY(url) as Q FROM TEST;" diff --git a/ksql-functional-tests/src/test/resources/schema-validation-tests/avro-basic.json b/ksql-functional-tests/src/test/resources/schema-validation-tests/avro-basic.json index 712eaaabfb1b..0cd6903fac26 100644 --- a/ksql-functional-tests/src/test/resources/schema-validation-tests/avro-basic.json +++ b/ksql-functional-tests/src/test/resources/schema-validation-tests/avro-basic.json @@ -2,6 +2,7 @@ "tests": [ { "name": "basic avro schema", + "format": "AVRO", "schema": { "type": "record", "name": "test_input_schema", diff --git a/ksql-functional-tests/src/test/resources/schema-validation-tests/real-schema.json b/ksql-functional-tests/src/test/resources/schema-validation-tests/real-schema.json index 50de33d90ca9..4ff9bea0a76b 100644 --- a/ksql-functional-tests/src/test/resources/schema-validation-tests/real-schema.json +++ b/ksql-functional-tests/src/test/resources/schema-validation-tests/real-schema.json @@ -2,6 +2,7 @@ "tests": [ { "name": "connect example schema (twitter)", + "format": "AVRO", "schema": { "type": "record", "name": "Status", diff --git a/ksql-serde/pom.xml b/ksql-serde/pom.xml index d0506adfde36..26b599422333 100644 --- a/ksql-serde/pom.xml +++ b/ksql-serde/pom.xml @@ -50,10 +50,19 @@ ${confluent.version} - - io.confluent - kafka-connect-avro-converter - ${confluent.version} + + io.confluent + kafka-connect-avro-converter + + + + io.confluent + kafka-protobuf-provider + + + + io.confluent + kafka-connect-protobuf-converter diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java index d207ce3ac284..ee1d573b10f7 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java @@ -18,23 +18,28 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.ksql.util.KsqlException; import java.util.Map; import java.util.Set; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.kafka.connect.data.Schema; /** * A {@code Format} is a serialization specification of a Kafka topic * in ksqlDB. The builtin formats are specified in the {@link FormatFactory} * class. + * + * @apiNote implementations are expected to be Thread Safe */ +@ThreadSafe public interface Format { /** * The name of the {@code Format} specification. If this format supports * Confluent Schema Registry integration (either builtin or custom via the * {@code ParsedSchema} plugin support), this should match the value returned - * by {@link io.confluent.kafka.schemaregistry.ParsedSchema#name()}. Note that - * this value is case-sensitive. + * by {@link ParsedSchema#name()}. Note that this value is case-sensitive. * * @return the name of this Format * @see #supportsSchemaInference() @@ -58,8 +63,7 @@ default boolean supportsWrapping() { * omit the table elements and instead determine the schema from a Confluent * Schema Registry query. If this method returns {@code true}, it is expected * that the {@link #name()} corresponds with the schema format name returned - * by {@link io.confluent.kafka.schemaregistry.ParsedSchema#name()} for this - * format. + * by {@link ParsedSchema#name()} for this format. * * @return {@code true} if this {@code Format} supports schema inference * through Confluent Schema Registry @@ -68,6 +72,25 @@ default boolean supportsSchemaInference() { return false; } + /** + * Converts the {@link ParsedSchema} returned by Confluent Schema Registry + * into a Connect Schema, which ksqlDB can use to infer the stream or table + * schema. + * + *

If this Format {@link #supportsSchemaInference()}, it is expected that + * this method will be implemented.

+ * + * @param schema the {@code ParsedSchema} returned from Schema Registry + * @return the corresponding Kafka Connect schema for the {@code schema} param + */ + default Schema toConnectSchema(ParsedSchema schema) { + throw new KsqlException("Format does not implement Schema Registry support: " + name()); + } + + default ParsedSchema toParsedSchema(Schema schema) { + throw new KsqlException("Format does not implement Schema Registry support: " + name()); + } + /** * If the format accepts custom properties in the WITH clause of the statement, * then this will take the properties and validate the key-value pairs. diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/FormatFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/FormatFactory.java index 6f8448d55d5c..18c5f793ad97 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/FormatFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/FormatFactory.java @@ -19,6 +19,7 @@ import io.confluent.ksql.serde.delimited.DelimitedFormat; import io.confluent.ksql.serde.json.JsonFormat; import io.confluent.ksql.serde.kafka.KafkaFormat; +import io.confluent.ksql.serde.protobuf.ProtobufFormat; import io.confluent.ksql.util.KsqlException; /** @@ -28,6 +29,7 @@ public final class FormatFactory { public static final Format AVRO = new AvroFormat(); public static final Format JSON = new JsonFormat(); + public static final Format PROTOBUF = new ProtobufFormat(); public static final Format KAFKA = new KafkaFormat(); public static final Format DELIMITED = new DelimitedFormat(); @@ -46,10 +48,11 @@ public static Format of(final FormatInfo formatInfo) { return format; } - private static Format fromName(final String name) { + public static Format fromName(final String name) { switch (name) { case AvroFormat.NAME: return AVRO; case JsonFormat.NAME: return JSON; + case ProtobufFormat.NAME: return PROTOBUF; case KafkaFormat.NAME: return KAFKA; case DelimitedFormat.NAME: return DELIMITED; default: diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroFormat.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroFormat.java index b704ca731922..e384cdf87aa3 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroFormat.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/avro/AvroFormat.java @@ -15,19 +15,26 @@ package io.confluent.ksql.serde.avro; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.confluent.connect.avro.AvroData; +import io.confluent.connect.avro.AvroDataConfig; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KsqlSerdeFactory; import io.confluent.ksql.util.KsqlConstants; import java.util.Set; +import org.apache.kafka.connect.data.Schema; public final class AvroFormat implements Format { public static final String FULL_SCHEMA_NAME = "fullSchemaName"; public static final String NAME = AvroSchema.TYPE; + private final AvroData avroData = new AvroData(new AvroDataConfig(ImmutableMap.of())); + @Override public String name() { return NAME; @@ -38,6 +45,16 @@ public boolean supportsSchemaInference() { return true; } + @Override + public Schema toConnectSchema(final ParsedSchema schema) { + return avroData.toConnectSchema(((AvroSchema) schema).rawSchema()); + } + + @Override + public ParsedSchema toParsedSchema(final Schema schema) { + return new AvroSchema(avroData.fromConnectSchema(schema)); + } + @Override public Set getSupportedProperties() { return ImmutableSet.of(FULL_SCHEMA_NAME); diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java new file mode 100644 index 000000000000..f4a194e7ae47 --- /dev/null +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde.protobuf; + +import com.google.common.collect.ImmutableMap; +import io.confluent.connect.protobuf.ProtobufData; +import io.confluent.connect.protobuf.ProtobufDataConfig; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KsqlSerdeFactory; +import org.apache.kafka.connect.data.Schema; + +public class ProtobufFormat implements Format { + + public static final String NAME = ProtobufSchema.TYPE; + + private final ProtobufData protobufData = + new ProtobufData(new ProtobufDataConfig(ImmutableMap.of())); + + @Override + public String name() { + return NAME; + } + + @Override + public boolean supportsSchemaInference() { + return true; + } + + @Override + public Schema toConnectSchema(final ParsedSchema schema) { + return protobufData.toConnectSchema((ProtobufSchema) schema); + } + + @Override + public ParsedSchema toParsedSchema(final Schema schema) { + return protobufData.fromConnectSchema(schema); + } + + @Override + public KsqlSerdeFactory getSerdeFactory(final FormatInfo info) { + return new ProtobufSerdeFactory(); + } +} diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java new file mode 100644 index 000000000000..82f8d5046a41 --- /dev/null +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java @@ -0,0 +1,115 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde.protobuf; + +import io.confluent.connect.protobuf.ProtobufConverter; +import io.confluent.connect.protobuf.ProtobufConverterConfig; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.schema.ksql.PersistenceSchema; +import io.confluent.ksql.serde.KsqlSerdeFactory; +import io.confluent.ksql.serde.connect.ConnectDataTranslator; +import io.confluent.ksql.serde.connect.KsqlConnectDeserializer; +import io.confluent.ksql.serde.connect.KsqlConnectSerializer; +import io.confluent.ksql.serde.tls.ThreadLocalDeserializer; +import io.confluent.ksql.serde.tls.ThreadLocalSerializer; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; + +public class ProtobufSerdeFactory implements KsqlSerdeFactory { + + @Override + public void validate(final PersistenceSchema schema) { + // Supports all types + } + + @Override + public Serde createSerde( + final PersistenceSchema schema, + final KsqlConfig ksqlConfig, + final Supplier schemaRegistryClientFactory + ) { + final Supplier> serializer = () -> createSerializer( + schema, + ksqlConfig, + schemaRegistryClientFactory + ); + final Supplier> deserializer = () -> createDeserializer( + schema, + ksqlConfig, + schemaRegistryClientFactory + ); + + // Sanity check: + serializer.get(); + deserializer.get(); + + return Serdes.serdeFrom( + new ThreadLocalSerializer<>(serializer), + new ThreadLocalDeserializer<>(deserializer) + ); + } + + private KsqlConnectSerializer createSerializer( + final PersistenceSchema schema, + final KsqlConfig ksqlConfig, + final Supplier schemaRegistryClientFactory + ) { + final ProtobufConverter converter = getConverter(schemaRegistryClientFactory.get(), ksqlConfig); + + return new KsqlConnectSerializer( + schema.serializedSchema(), + new ConnectDataTranslator(schema.serializedSchema()), + converter + ); + } + + private KsqlConnectDeserializer createDeserializer( + final PersistenceSchema schema, + final KsqlConfig ksqlConfig, + final Supplier schemaRegistryClientFactory + ) { + final ProtobufConverter converter = getConverter(schemaRegistryClientFactory.get(), ksqlConfig); + + return new KsqlConnectDeserializer( + converter, + new ConnectDataTranslator(schema.serializedSchema()) + ); + } + + private ProtobufConverter getConverter( + final SchemaRegistryClient schemaRegistryClient, + final KsqlConfig ksqlConfig + ) { + final Map protobufConfig = ksqlConfig + .originalsWithPrefix(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX); + + protobufConfig.put( + ProtobufConverterConfig.SCHEMA_REGISTRY_URL_CONFIG, + ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY) + ); + + final ProtobufConverter converter = new ProtobufConverter(schemaRegistryClient); + converter.configure(protobufConfig, false); + + return converter; + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 128e61dd0e22..b01482b2c749 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ 1.4 3.3.1 24.1.1-jre + 3.11.1 2.0.0 1 3.0.7 @@ -284,6 +285,19 @@ common-utils ${confluent.version} + + + io.confluent + kafka-protobuf-provider + ${confluent.version} + + + + io.confluent + kafka-connect-protobuf-converter + ${confluent.version} + + @@ -304,6 +318,18 @@ ${guava.version} + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + + com.google.protobuf + protobuf-java-util + ${protobuf.version} + + com.github.rholder guava-retrying