Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support PROTOBUF keys #6692

Merged
merged 1 commit into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,311 @@
]
}
},
{
"name": "STRUCT BOOLEAN - key - no inference",
"comments": [
"note that null defaults to false in protobuf"
],
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 BOOLEAN KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this works with declaring the key simply as BOOLEAN rather than STRUCT<BOOLEAN>? I thought keys were always forced to be unwrapped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are forced unwrapped if the format supports unwrapping, otherwise it uses what the format supports:

https://github.com/confluentinc/ksql/blob/master/ksqldb-engine/src/main/java/io/confluent/ksql/serde/SerdeFeaturesFactory.java#L90-L104

  private static Optional<SerdeFeature> getKeyWrapping(
      final boolean singleKey,
      final Format keyFormat
  ) {
    // Until ksqlDB supports WRAP_SINGLE_KEYS in the WITH clause, we explicitly set
    // UNWRAP_SINGLE_KEYS for any key format that supports both wrapping and unwrapping to avoid
    // ambiguity later:
    if (singleKey
        && keyFormat.supportsFeature(SerdeFeature.UNWRAP_SINGLES)
        && keyFormat.supportsFeature(SerdeFeature.WRAP_SINGLES)
    ) {
      return Optional.of(SerdeFeature.UNWRAP_SINGLES);
    }
    return Optional.empty();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting. We should add tests for converting between protobuf and other key formats, and also joins between protobuf and other key formats. The rest of this PR LGTM, though! (Feel free to add in a separate PR if that's preferable.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I'll do this in a separate PR. I think the likelihood of an org using two formats is pretty low so it's OK if we release this with some rough edges around that (but definitely good to just have it working).

"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": true}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": true}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": false}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT BOOLEAN - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {bool F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": true}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": true}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": false}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT INT - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 INT KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT INT - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {int32 F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT BIGINT - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 BIGINT KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT BIGINT - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {int64 F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 998877665544332211}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT DOUBLE - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 DOUBLE KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1.1}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1.1}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT DOUBLE - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {double F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1.1}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1.1}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT STRING - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 VARCHAR KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": "foo"}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": "foo"}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": ""}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT STRING - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {string F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": "foo"}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": "foo"}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": ""}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT ARRAY STRING - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 ARRAY<VARCHAR> KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": []}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT ARRAY STRING - key - inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"topics": [
{
"name": "input_topic",
"keySchema": "syntax = \"proto3\"; message ConnectDefault1 {repeated string F1 = 1;}",
"keyFormat": "PROTOBUF"
}
],
"inputs": [
{"topic": "input_topic", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": ["foo", "bar"]}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": []}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "STRUCT multi field - key - no inference",
"properties": {
"ksql.key.format.enabled": true
},
"statements": [
"CREATE STREAM INPUT (F1 INT KEY, F2 VARCHAR KEY, foo INT) WITH (kafka_topic='input_topic', format='PROTOBUF');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": {"F1": 1, "F2": "foo"}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": {"F1": null, "F2": null}, "value": {"FOO": 10}},
{"topic": "input_topic", "key": null, "value": null}
],
"outputs": [
{"topic": "OUTPUT", "key": {"F1": 1, "F2": "foo"}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": {"F1": 0, "F2": ""}, "value": {"FOO": 10}},
{"topic": "OUTPUT", "key": null, "value": null}
]
},
{
"name": "should filter out columns with unsupported types",
"statements": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class KeyFormatUtils {

private static final List<Format> KEY_FORMATS_UNDER_DEVELOPMENT = ImmutableList.of(
FormatFactory.AVRO,
FormatFactory.PROTOBUF,
FormatFactory.JSON_SR
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ protected <T> Serde<T> getConnectSerde(
final Class<T> targetType,
final boolean isKey
) {
return ProtobufSerdeFactory.createSerde(connectSchema, config, srFactory, targetType);
return ProtobufSerdeFactory.createSerde(connectSchema, config, srFactory, targetType, isKey);
}
}
Loading