Skip to content

Commit

Permalink
feat: Support timestamp protobuf serde (#6927)
Browse files Browse the repository at this point in the history
* feat: Support timestamp protobuf serde

* added historical plans, address review comment

* make qtt schema parser include timestamp dependency

* undo previous commit, schema registry adds timestamp dependancies now!
  • Loading branch information
Zara Lim committed Feb 10, 2021
1 parent ef32ae2 commit 5ea1ce4
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 9 deletions.
Expand Up @@ -20,6 +20,9 @@
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer;
import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
Expand Down Expand Up @@ -158,7 +161,8 @@ private void registerSchemas(
config,
statementText,
registerIfSchemaExists,
getSRSubject(kafkaTopic, true)
getSRSubject(kafkaTopic, true),
true
);

registerSchema(
Expand All @@ -169,7 +173,8 @@ private void registerSchemas(
config,
statementText,
registerIfSchemaExists,
getSRSubject(kafkaTopic, false)
getSRSubject(kafkaTopic, false),
false
);
}

Expand All @@ -181,7 +186,8 @@ private void registerSchema(
final KsqlConfig config,
final String statementText,
final boolean registerIfSchemaExists,
final String subject
final String subject,
final boolean isKey
) {
final Format format = FormatFactory.of(formatInfo);
if (!format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
Expand All @@ -205,8 +211,22 @@ private void registerSchema(
final ParsedSchema parsedSchema = translator.toParsedSchema(
PersistenceSchema.from(schema, serdeFeatures)
);

srClient.register(subject, parsedSchema);
if (parsedSchema instanceof ProtobufSchema) {
final ProtobufSchema resolved = AbstractKafkaProtobufSerializer.resolveDependencies(
srClient,
true,
false,
true,
null,
new DefaultReferenceSubjectNameStrategy(),
topic,
isKey,
(ProtobufSchema) parsedSchema
);
srClient.register(subject, resolved);
} else {
srClient.register(subject, parsedSchema);
}
}
} catch (IOException | RestClientException e) {
throw new KsqlStatementException(
Expand Down
Expand Up @@ -45,6 +45,7 @@ static SchemaRegistryClient createProxy(final SchemaRegistryClient delegate) {
.forward("testCompatibility",
methodParams(String.class, ParsedSchema.class), delegate)
.swallow("deleteSubject", methodParams(String.class), Collections.emptyList())
.forward("getVersion", methodParams(String.class, ParsedSchema.class), delegate)
.build();
}

Expand Down
Expand Up @@ -35,7 +35,9 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlPlan;
Expand Down Expand Up @@ -69,6 +71,8 @@
import io.confluent.ksql.util.KsqlSchemaRegistryNotConfiguredException;
import io.confluent.ksql.util.KsqlStatementException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
Expand Down Expand Up @@ -96,6 +100,17 @@ public class SchemaRegisterInjectorTest {
+ "\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":"
+ "[{\"name\":\"F1\",\"type\":[\"null\",\"string\"],\"default\":null}],"
+ "\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}");
private static final ProtobufSchema TIMESTAMP_SCHEMA = new ProtobufSchema(
"syntax = \"proto3\"; package google.protobuf;"
+ "option java_package = \"com.google.protobuf\";"
+ "option java_outer_classname = \"TimestampProto\";\n"
+ "option java_multiple_files = true;"
+ "message Timestamp {int64 seconds = 1; int32 nanos = 2;}");
private static final List<SchemaReference> REFERENCE_LIST =
Arrays.asList(new SchemaReference("google/protobuf/timestamp.proto", "google/protobuf/timestamp.proto", 0));
private static final ProtobufSchema PROTOBUF_SCHEMA_WITH_REFS = new ProtobufSchema(
"syntax = \"proto3\"; import \"google/protobuf/timestamp.proto\";"
+ "message ConnectDefault1 {google.protobuf.Timestamp F1 = 1;}").copy(REFERENCE_LIST);

@Mock
private ServiceContext serviceContext;
Expand Down Expand Up @@ -360,6 +375,25 @@ public void shouldSupportPrimitiveValueSchemasInCreateAsStmts() throws Exception
verify(schemaRegistryClient).register("SINK-value", AVRO_UNWRAPPED_VALUE_SCHEMA);
}

@Test
public void shouldRegisterDependanciesForProtobuf() throws Exception {
// Given:
givenStatement("CREATE STREAM source (f1 TIMESTAMP) "
+ "WITH ("
+ " kafka_topic='expectedName', "
+ " key_format='KAFKA', "
+ " value_format='PROTOBUF', "
+ " partitions=1 "
+ ");");

// When:
injector.inject(statement);

// Then:
verify(schemaRegistryClient).register("google/protobuf/timestamp.proto", TIMESTAMP_SCHEMA);
verify(schemaRegistryClient).register("expectedName-value", PROTOBUF_SCHEMA_WITH_REFS);
}

private void givenStatement(final String sql) {
final PreparedStatement<?> preparedStatement =
parser.prepare(parser.parse(sql).get(0), metaStore);
Expand Down
Expand Up @@ -63,6 +63,7 @@ public static Collection<TestCase<SchemaRegistryClient>> getMethodsToTest() {
.ignore("testCompatibility", String.class, ParsedSchema.class)
.ignore("deleteSubject", String.class)
.ignore("getAllSubjects")
.ignore("getVersion", String.class, ParsedSchema.class)
.build();
}

Expand Down Expand Up @@ -159,5 +160,17 @@ public void shouldSwallowRegister() throws Exception {
// Then:
verifyZeroInteractions(delegate);
}

@Test
public void shouldGetVersion() throws Exception {
// Given:
when(delegate.getVersion("some subject", schema)).thenReturn(6);

// When:
final int version = sandboxedClient.getVersion("some subject", schema);

// Then:
assertThat(version, is(6));
}
}
}
Expand Up @@ -98,7 +98,7 @@ public static Optional<ParsedSchema> buildSchema(final JsonNode schema, final St
return Optional.of(new JsonSchema(schemaString));
} else if (format.equalsIgnoreCase(ProtobufFormat.NAME)) {
// since Protobuf schemas are not valid JSON, the schema JsonNode in
// this case is just a string
// this case is just a string.
final String schemaString = schema.textValue();
return Optional.of(new ProtobufSchema(schemaString));
}
Expand Down
@@ -0,0 +1,145 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ID STRING KEY, TIME TIMESTAMP) WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='PROTOBUF');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ID` STRING KEY, `TIME` TIMESTAMP",
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "PROTOBUF"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST2 AS SELECT *\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST2",
"schema" : "`ID` STRING KEY, `TIME` TIMESTAMP",
"topicName" : "TEST2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "PROTOBUF"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "TEST2",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "TEST2"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "PROTOBUF"
}
},
"sourceSchema" : "`ID` STRING KEY, `TIME` TIMESTAMP"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "TIME AS TIME" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "PROTOBUF"
}
},
"topicName" : "TEST2"
},
"queryId" : "CSAS_TEST2_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}

0 comments on commit 5ea1ce4

Please sign in to comment.