Skip to content

Commit

Permalink
feat: support PROTOBUF keys (#6692)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Dec 1, 2020
1 parent 72bc27e commit 821faac
Show file tree
Hide file tree
Showing 5 changed files with 324 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,311 @@
]
}
},
{
"name": "STRUCT BOOLEAN - key - no inference",
"comments": [
"note that null defaults to false in protobuf"
],
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 BOOLEAN KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": true}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": true}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": false}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT BOOLEAN - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {bool F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": true}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": true}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": false}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT INT - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 INT KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT INT - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {int32 F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT BIGINT - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 BIGINT KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT BIGINT - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {int64 F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT DOUBLE - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 DOUBLE KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1.1}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1.1}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT DOUBLE - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {double F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1.1}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1.1}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT STRING - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 VARCHAR KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": "foo"}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": "foo"}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": ""}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT STRING - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {string F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": "foo"}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": "foo"}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": ""}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT ARRAY STRING - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 ARRAY<VARCHAR> KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": []}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT ARRAY STRING - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {repeated string F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": []}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT multi field - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 INT KEY, F2 VARCHAR KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1, "F2": "foo"}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null, "F2": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1, "F2": "foo"}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0, "F2": ""}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "should filter out columns with unsupported types",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class KeyFormatUtils {

private static final List<Format> KEY_FORMATS_UNDER_DEVELOPMENT = ImmutableList.of(
FormatFactory.AVRO,
FormatFactory.PROTOBUF,
FormatFactory.JSON_SR
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ protected <T> Serde<T> getConnectSerde(
final Class<T> targetType,
final boolean isKey
) {
return ProtobufSerdeFactory.createSerde(connectSchema, config, srFactory, targetType);
return ProtobufSerdeFactory.createSerde(connectSchema, config, srFactory, targetType, isKey);
}
}

0 comments on commit 821faac

Please sign in to comment.