From 821faacda8382fce1b74e4517dfd6ab704e93d46 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 1 Dec 2020 10:31:45 -0800 Subject: [PATCH] feat: support PROTOBUF keys (#6692) --- .../query-validation-tests/protobuf.json | 305 ++++++++++++++++++ .../confluent/ksql/serde/KeyFormatUtils.java | 1 + .../ksql/serde/protobuf/ProtobufFormat.java | 2 +- .../serde/protobuf/ProtobufSerdeFactory.java | 25 +- .../protobuf/ProtobufSerdeFactoryTest.java | 4 +- 5 files changed, 324 insertions(+), 13 deletions(-) diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/protobuf.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/protobuf.json index 7b4753a34f93..6a92337ac20f 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/protobuf.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/protobuf.json @@ -293,6 +293,311 @@ ] } }, + { + "name": "STRUCT BOOLEAN - key - no inference", + "comments": [ + "note that null defaults to false in protobuf" + ], + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (F1 BOOLEAN KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": true}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": true}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": false}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT BOOLEAN - key - inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "topics": [ + { + "name": "input_topic", + "keySchema": "syntax = \"proto3\"; message ConnectDefault1 {bool F1 = 1;}", + "keyFormat": "PROTOBUF" + } + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": true}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": true}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": false}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT INT - key - no inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (F1 INT KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": 1}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": 1}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT INT - key - inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "topics": [ + { + "name": "input_topic", + "keySchema": "syntax = \"proto3\"; message ConnectDefault1 {int32 F1 = 1;}", + "keyFormat": "PROTOBUF" + } + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": 1}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": 1}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT BIGINT - key - no inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (F1 BIGINT KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT BIGINT - key - inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "topics": [ + { + "name": "input_topic", + "keySchema": "syntax = \"proto3\"; message ConnectDefault1 {int64 F1 = 1;}", + "keyFormat": "PROTOBUF" + } + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT DOUBLE - key - no inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (F1 DOUBLE KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": 1.1}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": 1.1}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT DOUBLE - key - inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "topics": [ + { + "name": "input_topic", + "keySchema": "syntax = \"proto3\"; message ConnectDefault1 {double F1 = 1;}", + "keyFormat": "PROTOBUF" + } + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": 1.1}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": 1.1}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT STRING - key - no inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (F1 VARCHAR KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": "foo"}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": "foo"}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": ""}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT STRING - key - inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "topics": [ + { + "name": "input_topic", + "keySchema": "syntax = \"proto3\"; message ConnectDefault1 {string F1 = 1;}", + "keyFormat": "PROTOBUF" + } + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": "foo"}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": "foo"}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": ""}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT ARRAY STRING - key - no inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (F1 ARRAY KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": []}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT ARRAY STRING - key - inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "topics": [ + { + "name": "input_topic", + "keySchema": "syntax = \"proto3\"; message ConnectDefault1 {repeated string F1 = 1;}", + "keyFormat": "PROTOBUF" + } + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": []}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, + { + "name": "STRUCT multi field - key - no inference", + "properties": { + "ksql.key.format.enabled": true + }, + "statements": [ + "CREATE STREAM INPUT (F1 INT KEY, F2 VARCHAR KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": {"F1": 1, "F2": "foo"}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": {"F1": null, "F2": null}, "value": {"FOO": 10}}, + {"topic": "input_topic", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"F1": 1, "F2": "foo"}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": {"F1": 0, "F2": ""}, "value": {"FOO": 10}}, + {"topic": "OUTPUT", "key": null, "value": null} + ] + }, { "name": "should filter out columns with unsupported types", "statements": [ diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/KeyFormatUtils.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/KeyFormatUtils.java index 359a32e62e15..f68414bf0541 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/KeyFormatUtils.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/KeyFormatUtils.java @@ -30,6 +30,7 @@ public final class KeyFormatUtils { private static final List KEY_FORMATS_UNDER_DEVELOPMENT = ImmutableList.of( FormatFactory.AVRO, + FormatFactory.PROTOBUF, FormatFactory.JSON_SR ); diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java index bed661b296cd..f7b73fc4ca76 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufFormat.java @@ -64,6 +64,6 @@ protected Serde getConnectSerde( final Class targetType, final boolean isKey ) { - return ProtobufSerdeFactory.createSerde(connectSchema, config, srFactory, targetType); + return ProtobufSerdeFactory.createSerde(connectSchema, config, srFactory, targetType, isKey); } } diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java index 928e9f8b4222..1b0a66f45bdd 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactory.java @@ -46,21 +46,23 @@ static Serde createSerde( final ConnectSchema schema, final KsqlConfig ksqlConfig, final Supplier srFactory, - final Class targetType - ) { + final Class targetType, + final boolean isKey) { validate(schema); final Supplier> serializer = () -> createSerializer( schema, ksqlConfig, srFactory, - targetType + targetType, + isKey ); final Supplier> deserializer = () -> createDeserializer( schema, ksqlConfig, srFactory, - targetType + targetType, + isKey ); // Sanity check: @@ -81,9 +83,10 @@ private static KsqlConnectSerializer createSerializer( final ConnectSchema schema, final KsqlConfig ksqlConfig, final Supplier srFactory, - final Class targetType + final Class targetType, + final boolean isKey ) { - final ProtobufConverter converter = getConverter(srFactory.get(), ksqlConfig); + final ProtobufConverter converter = getConverter(srFactory.get(), ksqlConfig, isKey); return new KsqlConnectSerializer<>( schema, @@ -97,9 +100,10 @@ private static KsqlConnectDeserializer createDeserializer( final ConnectSchema schema, final KsqlConfig ksqlConfig, final Supplier srFactory, - final Class targetType + final Class targetType, + final boolean isKey ) { - final ProtobufConverter converter = getConverter(srFactory.get(), ksqlConfig); + final ProtobufConverter converter = getConverter(srFactory.get(), ksqlConfig, isKey); return new KsqlConnectDeserializer<>( converter, @@ -110,7 +114,8 @@ private static KsqlConnectDeserializer createDeserializer( private static ProtobufConverter getConverter( final SchemaRegistryClient schemaRegistryClient, - final KsqlConfig ksqlConfig + final KsqlConfig ksqlConfig, + final boolean isKey ) { final Map protobufConfig = ksqlConfig .originalsWithPrefix(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX); @@ -121,7 +126,7 @@ private static ProtobufConverter getConverter( ); final ProtobufConverter converter = new ProtobufConverter(schemaRegistryClient); - converter.configure(protobufConfig, false); + converter.configure(protobufConfig, isKey); return converter; } diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactoryTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactoryTest.java index 32cdfea309bb..48f59b814206 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactoryTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/protobuf/ProtobufSerdeFactoryTest.java @@ -57,7 +57,7 @@ public void shouldThrowOnDecimal() { // When: final Exception e = assertThrows( KsqlException.class, - () -> ProtobufSerdeFactory.createSerde(schema, config, srClientFactory, Struct.class) + () -> ProtobufSerdeFactory.createSerde(schema, config, srClientFactory, Struct.class, false) ); // Then: @@ -73,7 +73,7 @@ public void shouldNotThrowOnNonDecimal() { .build(); // When: - ProtobufSerdeFactory.createSerde(schema, config, srClientFactory, Struct.class); + ProtobufSerdeFactory.createSerde(schema, config, srClientFactory, Struct.class, false); // Then (did not throw) }