Skip to content

Commit

Permalink
feat: Alternative representations for nullable protobuf fields
Browse files Browse the repository at this point in the history
The protobuf support in schema registry by default represents an
optional field (corresponding in KSQL to a nullable SQL column) by
a singular protobuf field. For primitive types, singular protobuf
fields cannot distinguish default values (such as 0 for int32 and
"" for string) from `null`.

This change introduces a new option for KSQL `CREATE` statements
that can enable two alternative representations of nullable fields.
One is to use nullable protobuf wrappers
(e.g. using `google.protobuf.StringValue` for strings), the other
is to use the `optional` keyword. Both options will allow KSQL and
downstream consumers to clearly distinguish between default values
and `null`.

New unit tests are added to cover that the properties are
propagated from the `CREATE` statement to the `Serde`
implementation. We use query translation tests to test
end-to-end testcases.
  • Loading branch information
lucasbru committed Nov 4, 2022
1 parent d15be74 commit 169b087
Show file tree
Hide file tree
Showing 12 changed files with 505 additions and 8 deletions.
Expand Up @@ -47,6 +47,15 @@ public final class CommonCreateConfigs {
public static final String KEY_FORMAT_PROPERTY = "KEY_FORMAT";
public static final String FORMAT_PROPERTY = "FORMAT";
public static final String WRAP_SINGLE_VALUE = "WRAP_SINGLE_VALUE";
public static final String KEY_PROTOBUF_NULLABLE_REPRESENTATION =
"KEY_PROTOBUF_NULLABLE_REPRESENTATION";
public static final String VALUE_PROTOBUF_NULLABLE_REPRESENTATION =
"VALUE_PROTOBUF_NULLABLE_REPRESENTATION";

public enum ProtobufNullableConfigValues {
OPTIONAL,
WRAPPER
}

public static final String VALUE_DELIMITER_PROPERTY = "VALUE_DELIMITER";
public static final String KEY_DELIMITER_PROPERTY = "KEY_DELIMITER";
Expand Down Expand Up @@ -134,6 +143,32 @@ public static void addToConfigDef(
+ "serialized as a named field within a record. If set to false, KSQL expects the "
+ "field to have been serialized as an anonymous value."
)
.define(
KEY_PROTOBUF_NULLABLE_REPRESENTATION,
ConfigDef.Type.STRING,
null,
ConfigValidators.enumValues(ProtobufNullableConfigValues.class),
Importance.LOW,
"If supplied, protobuf schema generation will use fields that distinguish "
+ "null from default values for primitive values. The value `"
+ ProtobufNullableConfigValues.OPTIONAL.name()
+ "` will enable using the `optional` on all fields, whereas `"
+ ProtobufNullableConfigValues.WRAPPER.name()
+ "` will use wrappers for all primitive value fields, including strings."
)
.define(
VALUE_PROTOBUF_NULLABLE_REPRESENTATION,
ConfigDef.Type.STRING,
null,
ConfigValidators.enumValues(ProtobufNullableConfigValues.class),
Importance.LOW,
"If supplied, protobuf schema generation will use fields that distinguish "
+ "null from default values for primitive values. The value `"
+ ProtobufNullableConfigValues.OPTIONAL.name()
+ "` will enable using the `optional` on all fields, whereas `"
+ ProtobufNullableConfigValues.WRAPPER.name()
+ "` will use wrappers for all primitive value fields, including strings."
)
.define(
VALUE_AVRO_SCHEMA_FULL_NAME,
ConfigDef.Type.STRING,
Expand Down
Expand Up @@ -63,6 +63,13 @@ public Deserializer<Object> getDeserializer(

protected abstract Schema fromParsedSchema(T schema);

protected void configureConverter(final Converter c, final boolean isKey) {
c.configure(
ImmutableMap.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "foo"),
isKey
);
}

private final class SpecSerializer implements Serializer<Object> {

private final SchemaRegistryClient srClient;
Expand All @@ -72,10 +79,7 @@ private final class SpecSerializer implements Serializer<Object> {
SpecSerializer(final SchemaRegistryClient srClient, final boolean isKey) {
this.srClient = Objects.requireNonNull(srClient, "srClient");
this.converter = converterFactory.apply(srClient);
converter.configure(
ImmutableMap.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "foo"),
isKey
);
configureConverter(converter, isKey);
this.isKey = isKey;
}

Expand Down Expand Up @@ -111,10 +115,7 @@ private class SpecDeserializer implements Deserializer<Object> {

SpecDeserializer(final SchemaRegistryClient srClient, final boolean isKey) {
this.converter = converterFactory.apply(srClient);
converter.configure(
ImmutableMap.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "foo"),
isKey
);
configureConverter(converter, isKey);
}

@Override
Expand Down
Expand Up @@ -15,12 +15,16 @@

package io.confluent.ksql.test.serde.protobuf;

import com.google.common.collect.ImmutableMap;
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.ksql.serde.protobuf.ProtobufProperties;
import io.confluent.ksql.serde.protobuf.ProtobufSchemaTranslator;
import io.confluent.ksql.test.serde.ConnectSerdeSupplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.storage.Converter;

public class ValueSpecProtobufSerdeSupplier extends ConnectSerdeSupplier<ProtobufSchema> {

Expand All @@ -35,4 +39,15 @@ public ValueSpecProtobufSerdeSupplier(final ProtobufProperties protobufPropertie
protected Schema fromParsedSchema(final ProtobufSchema schema) {
return schemaTranslator.toConnectSchema(schema);
}

protected void configureConverter(final Converter c, final boolean isKey) {
c.configure(
ImmutableMap.<String, Object>builder()
.putAll(schemaTranslator.getConfigs())
.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "foo")
.build(),
isKey
);
}

}
Expand Up @@ -79,6 +79,51 @@
"inputs": [{"topic": "input", "value": {"i": 1, "l": 1, "d": 1.2, "b": true, "s": "foo"}}],
"outputs": [{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.2, "B": true, "S": "foo"}}]
},
{
"name": "protobuf primitives with null",
"statements": [
"CREATE STREAM INPUT (K STRING KEY, i INTEGER, l BIGINT, d DOUBLE, b BOOLEAN, s STRING) WITH (kafka_topic='input', value_format='PROTOBUF');",
"CREATE STREAM OUTPUT as SELECT * FROM input;"
],
"inputs": [
{"topic": "input", "value": {"i": null, "l": 0, "d": null, "b": false, "s": null}},
{"topic": "input", "value": {"i": 0, "l": null, "d": 0.0, "b": null, "s": ""}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"i": 0, "l": 0, "d": 0.0, "b": false, "s": ""}},
{"topic": "OUTPUT", "value": {"i": 0, "l": 0, "d": 0.0, "b": false, "s": ""}}
]
},
{
"name": "protobuf primitives with null and nullable as optional",
"statements": [
"CREATE STREAM INPUT (K STRING KEY, i INTEGER, l BIGINT, d DOUBLE, b BOOLEAN, s STRING) WITH (kafka_topic='input', value_format='PROTOBUF', value_protobuf_nullable_representation='OPTIONAL');",
"CREATE STREAM OUTPUT WITH (value_format='PROTOBUF', value_protobuf_nullable_representation='OPTIONAL') as SELECT * FROM input;"
],
"inputs": [
{"topic": "input", "value": {"i": null, "l": 0, "d": null, "b": false, "s": null}},
{"topic": "input", "value": {"i": 0, "l": null, "d": 0.0, "b": null, "s": ""}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"i": null, "l": 0, "d": null, "b": false, "s": null}},
{"topic": "OUTPUT", "value": {"i": 0, "l": null, "d": 0.0, "b": null, "s": ""}}
]
},
{
"name": "protobuf primitives with null and nullable as wrapper",
"statements": [
"CREATE STREAM INPUT (K STRING KEY, i INTEGER, l BIGINT, d DOUBLE, b BOOLEAN, s STRING) WITH (kafka_topic='input', value_format='PROTOBUF', value_protobuf_nullable_representation='WRAPPER');",
"CREATE STREAM OUTPUT WITH (value_format='PROTOBUF', value_protobuf_nullable_representation='WRAPPER') as SELECT * FROM input;"
],
"inputs": [
{"topic": "input", "value": {"i": null, "l": 0, "d": null, "b": false, "s": null}},
{"topic": "input", "value": {"i": 0, "l": null, "d": 0.0, "b": null, "s": ""}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"i": null, "l": 0, "d": null, "b": false, "s": null}},
{"topic": "OUTPUT", "value": {"i": 0, "l": null, "d": 0.0, "b": null, "s": ""}}
]
},
{
"name": "protobuf containers",
"statements": [
Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.ColumnReferenceParser;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.properties.with.CommonCreateConfigs.ProtobufNullableConfigValues;
import io.confluent.ksql.properties.with.CreateAsConfigs;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
Expand Down Expand Up @@ -154,6 +155,22 @@ public Map<String, String> getKeyFormatProperties(
builder.put(ProtobufProperties.UNWRAP_PRIMITIVES, ProtobufProperties.UNWRAP);
}

final String nullableRep = props.getString(
CommonCreateConfigs.KEY_PROTOBUF_NULLABLE_REPRESENTATION);
if (ProtobufFormat.NAME.equalsIgnoreCase(keyFormat) && nullableRep != null) {
switch (ProtobufNullableConfigValues.valueOf(nullableRep)) {
case WRAPPER:
builder.put(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_WRAPPER);
break;
case OPTIONAL:
builder.put(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_OPTIONAL);
break;
default:
}
}

final Optional<Integer> keySchemaId = getKeySchemaId();
keySchemaId.ifPresent(id -> builder.put(ConnectProperties.SCHEMA_ID, String.valueOf(id)));

Expand All @@ -180,6 +197,22 @@ public Map<String, String> getValueFormatProperties(final String valueFormat) {
builder.put(ProtobufProperties.UNWRAP_PRIMITIVES, ProtobufProperties.UNWRAP);
}

final String nullableRep = props.getString(
CommonCreateConfigs.VALUE_PROTOBUF_NULLABLE_REPRESENTATION);
if (ProtobufFormat.NAME.equalsIgnoreCase(valueFormat) && nullableRep != null) {
switch (ProtobufNullableConfigValues.valueOf(nullableRep)) {
case WRAPPER:
builder.put(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_WRAPPER);
break;
case OPTIONAL:
builder.put(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_OPTIONAL);
break;
default:
}
}

final Optional<Integer> valueSchemaId = getValueSchemaId();
valueSchemaId.ifPresent(id ->
builder.put(ConnectProperties.SCHEMA_ID, String.valueOf(id)));
Expand Down
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.parser.ColumnReferenceParser;
import io.confluent.ksql.parser.DurationParser;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.properties.with.CommonCreateConfigs.ProtobufNullableConfigValues;
import io.confluent.ksql.properties.with.CreateConfigs;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
Expand Down Expand Up @@ -194,6 +195,22 @@ public Map<String, String> getKeyFormatProperties(final String keyFormat, final
builder.put(ProtobufProperties.UNWRAP_PRIMITIVES, ProtobufProperties.UNWRAP);
}

final String nullableRep = props.getString(
CommonCreateConfigs.KEY_PROTOBUF_NULLABLE_REPRESENTATION);
if (ProtobufFormat.NAME.equalsIgnoreCase(keyFormat) && nullableRep != null) {
switch (ProtobufNullableConfigValues.valueOf(nullableRep)) {
case WRAPPER:
builder.put(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_WRAPPER);
break;
case OPTIONAL:
builder.put(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_OPTIONAL);
break;
default:
}
}

final Optional<Integer> keySchemaId = getKeySchemaId();
keySchemaId.ifPresent(id -> builder.put(ConnectProperties.SCHEMA_ID, String.valueOf(id)));

Expand Down Expand Up @@ -231,6 +248,22 @@ public Map<String, String> getValueFormatProperties(final String valueFormat) {
builder.put(ProtobufProperties.UNWRAP_PRIMITIVES, ProtobufProperties.UNWRAP);
}

final String nullableRep = props.getString(
CommonCreateConfigs.VALUE_PROTOBUF_NULLABLE_REPRESENTATION);
if (ProtobufFormat.NAME.equalsIgnoreCase(valueFormat) && nullableRep != null) {
switch (ProtobufNullableConfigValues.valueOf(nullableRep)) {
case WRAPPER:
builder.put(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_WRAPPER);
break;
case OPTIONAL:
builder.put(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_OPTIONAL);
break;
default:
}
}

final Optional<Integer> valueSchemaId = getValueSchemaId();
valueSchemaId.ifPresent(id ->
builder.put(ConnectProperties.SCHEMA_ID, String.valueOf(id)));
Expand Down
Expand Up @@ -19,11 +19,13 @@
import static io.confluent.ksql.parser.properties.with.CreateSourceAsProperties.from;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.FORMAT_PROPERTY;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.KEY_FORMAT_PROPERTY;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.KEY_PROTOBUF_NULLABLE_REPRESENTATION;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.KEY_SCHEMA_FULL_NAME;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.KEY_SCHEMA_ID;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.TIMESTAMP_FORMAT_PROPERTY;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.VALUE_AVRO_SCHEMA_FULL_NAME;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.VALUE_FORMAT_PROPERTY;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.VALUE_PROTOBUF_NULLABLE_REPRESENTATION;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.VALUE_SCHEMA_FULL_NAME;
import static io.confluent.ksql.properties.with.CommonCreateConfigs.VALUE_SCHEMA_ID;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -43,6 +45,7 @@
import io.confluent.ksql.execution.expression.tree.StringLiteral;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.properties.with.CommonCreateConfigs.ProtobufNullableConfigValues;
import io.confluent.ksql.properties.with.CreateConfigs;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
Expand Down Expand Up @@ -477,4 +480,89 @@ public void shouldThrowIfSourceConnectorPropertyProvided() {
// Then:
assertThat(e.getMessage(), containsString("Invalid config variable(s) in the WITH clause: SOURCED_BY_CONNECTOR"));
}

@Test
public void shouldGetProtobufKeyFormatPropertiesWithNullableAsWrapper() {
// Given:
final CreateSourceAsProperties props = CreateSourceAsProperties
.from(ImmutableMap.of(
FORMAT_PROPERTY, new StringLiteral("PROTOBUF"),
KEY_PROTOBUF_NULLABLE_REPRESENTATION, new StringLiteral(
ProtobufNullableConfigValues.WRAPPER.name())
));

// When / Then:
assertThat(props.getKeyFormatProperties("foo", "PROTOBUF"),
hasEntry(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_WRAPPER));
}

@Test
public void shouldGetProtobufKeyFormatPropertiesWithNullableAsOptional() {
// Given:
final CreateSourceAsProperties props = CreateSourceAsProperties
.from(ImmutableMap.of(
FORMAT_PROPERTY, new StringLiteral("PROTOBUF"),
KEY_PROTOBUF_NULLABLE_REPRESENTATION, new StringLiteral(
ProtobufNullableConfigValues.OPTIONAL.name())));

// When / Then:
assertThat(props.getKeyFormatProperties("foo", "PROTOBUF"),
hasEntry(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_OPTIONAL));
}

@Test
public void shouldGetProtobufKeyFormatPropertiesWithoutNullableRepresentation() {
// Given:
final CreateSourceAsProperties props = CreateSourceAsProperties
.from(ImmutableMap.of(FORMAT_PROPERTY, new StringLiteral("PROTOBUF")));

// When / Then:
assertThat(props.getKeyFormatProperties("foo", "PROTOBUF"),
not(hasKey(ProtobufProperties.NULLABLE_REPRESENTATION)));
}

@Test
public void shouldGetProtobufValueFormatPropertiesWithNullableAsWrapper() {
// Given:

final CreateSourceAsProperties props = CreateSourceAsProperties
.from(ImmutableMap.of(
FORMAT_PROPERTY, new StringLiteral("PROTOBUF"),
VALUE_PROTOBUF_NULLABLE_REPRESENTATION, new StringLiteral(
ProtobufNullableConfigValues.WRAPPER.name())));

// When / Then:
assertThat(props.getValueFormatProperties("PROTOBUF"),
hasEntry(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_WRAPPER));
}

@Test
public void shouldGetProtobufValueFormatPropertiesWithNullableAsOptional() {
// Given:

final CreateSourceAsProperties props = CreateSourceAsProperties
.from(ImmutableMap.of(
FORMAT_PROPERTY, new StringLiteral("PROTOBUF"),
VALUE_PROTOBUF_NULLABLE_REPRESENTATION, new StringLiteral(
ProtobufNullableConfigValues.OPTIONAL.name())));

// When / Then:
assertThat(props.getValueFormatProperties("PROTOBUF"),
hasEntry(ProtobufProperties.NULLABLE_REPRESENTATION,
ProtobufProperties.NULLABLE_AS_OPTIONAL));
}

@Test
public void shouldGetProtobufValueFormatPropertiesWithoutNullableRepresentation() {
// Given:
final CreateSourceAsProperties props = CreateSourceAsProperties
.from(ImmutableMap.of(FORMAT_PROPERTY, new StringLiteral("PROTOBUF")));

// When / Then:
assertThat(props.getValueFormatProperties("PROTOBUF"),
not(hasKey(ProtobufProperties.NULLABLE_REPRESENTATION)));
}
}

0 comments on commit 169b087

Please sign in to comment.