Skip to content

Commit

Permalink
fix: disable SR ID_COMPATIBILITY_STRICT for Protobuf to allow seriali…
Browse files Browse the repository at this point in the history
…zation with external references (#9369)
  • Loading branch information
spena committed Aug 31, 2022
1 parent 338b2fb commit 72608fe
Show file tree
Hide file tree
Showing 15 changed files with 1,556 additions and 4 deletions.
Expand Up @@ -18,7 +18,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
Expand Down Expand Up @@ -167,25 +166,34 @@ public static ParsedSchema withSchemaReferences(
ref -> ref.getSchema().canonicalString()
));

final List<io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference>
schemaReferences = references.stream()
.map(qttSchemaRef ->
new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(
qttSchemaRef.getName(),
qttSchemaRef.getName(),
firstVersion))
.collect(Collectors.toList());

switch (schema.schemaType()) {
case ProtobufFormat.NAME:
return new ProtobufSchema(
schema.canonicalString(),
ImmutableList.of(),
schemaReferences,
resolvedReferences,
firstVersion,
schema.name());
case AvroFormat.NAME:
return new AvroSchema(
schema.canonicalString(),
ImmutableList.of(),
schemaReferences,
resolvedReferences,
firstVersion
);
case JsonSchemaFormat.NAME:
return new JsonSchema(
schema.canonicalString(),
ImmutableList.of(),
schemaReferences,
resolvedReferences,
firstVersion
);
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.function.TestFunctionRegistry;
Expand Down Expand Up @@ -137,6 +138,33 @@ public class RestTestExecutor implements Closeable {
this.topicInfoCache = new TopicInfoCache(engine, serviceContext.getSchemaRegistryClient());
}

private void maybeRegisterTopicSchemas(final Collection<Topic> topics) {
final SchemaRegistryClient schemaRegistryClient = serviceContext.getSchemaRegistryClient();
final int firstVersion = 1;

for (final Topic topic : topics) {
try {
if (topic.getKeySchemaId().isPresent() && topic.getKeySchema().isPresent()) {
schemaRegistryClient.register(
KsqlConstants.getSRSubject(topic.getName(), true),
topic.getKeySchema().get(),
firstVersion /* QTT does not support subjects versions yet */,
topic.getKeySchemaId().get());
}

if (topic.getValueSchemaId().isPresent() && topic.getValueSchema().isPresent()) {
schemaRegistryClient.register(
KsqlConstants.getSRSubject(topic.getName(), false),
topic.getValueSchema().get(),
firstVersion /* QTT does not support subjects versions yet */,
topic.getValueSchemaId().get());
}
} catch (final Exception e) {
throw new KsqlException(e);
}
}
}

void buildAndExecuteQuery(final RestTestCase testCase) {
topicInfoCache.clear();

Expand All @@ -154,6 +182,7 @@ void buildAndExecuteQuery(final RestTestCase testCase) {
+ "responsesCount: " + expectedResponseSize);
}

maybeRegisterTopicSchemas(testCase.getTopics());
initializeTopics(testCase);
testCase.getProperties().forEach(restClient::setProperty);

Expand Down

0 comments on commit 72608fe

Please sign in to comment.