Skip to content

Commit

Permalink
fix: Create stream fails when multiple Protobuf schema definitions ex…
Browse files Browse the repository at this point in the history
…ist (#8933)
  • Loading branch information
spena committed Apr 13, 2022
1 parent 4b5feb0 commit 93bfe06
Show file tree
Hide file tree
Showing 22 changed files with 1,282 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.execution.expression.tree.Type;
Expand Down Expand Up @@ -490,6 +491,29 @@ private static CreateAsSelect addSchemaFieldsCas(
return statement.copyWith(newProperties);
}

private static void throwOnMultiSchemaDefinitions(
final ParsedSchema schema,
final Format schemaFormat,
final boolean isKey
) {
final List<String> schemaFullNames = schemaFormat.schemaFullNames(schema);
if (schemaFullNames.size() > 1) {
final String schemaFullNameConfig = isKey
? CommonCreateConfigs.KEY_SCHEMA_FULL_NAME
: CommonCreateConfigs.VALUE_SCHEMA_FULL_NAME;

throw new KsqlException(
(isKey ? "Key" : "Value") + " schema has multiple schema definitions. "
+ System.lineSeparator()
+ System.lineSeparator()
+ schemaFullNames.stream().map(n -> "- " + n).collect(Collectors.joining("\n"))
+ System.lineSeparator()
+ System.lineSeparator()
+ "Please specify a schema full name in the WITH clause using "
+ schemaFullNameConfig);
}
}

private static CreateSource addSchemaFields(
final ConfiguredStatement<CreateSource> preparedStatement,
final Optional<SchemaAndId> keySchema,
Expand All @@ -503,6 +527,20 @@ private static CreateSource addSchemaFields(
final Optional<String> keySchemaName;
final Optional<String> valueSchemaName;

if (keySchema.isPresent() && !properties.getKeySchemaFullName().isPresent()) {
final Format keyFormat = FormatFactory.of(
SourcePropertiesUtil.getKeyFormat(properties, statement.getName()));

throwOnMultiSchemaDefinitions(keySchema.get().rawSchema, keyFormat, true);
}

if (valueSchema.isPresent() && !properties.getValueSchemaFullName().isPresent()) {
final Format valueFormat = FormatFactory.of(
SourcePropertiesUtil.getValueFormat(properties));

throwOnMultiSchemaDefinitions(valueSchema.get().rawSchema, valueFormat, false);
}

// Only populate key and value schema names when schema ids are explicitly provided
if (properties.getKeySchemaId().isPresent() && keySchema.isPresent()) {
keySchemaName = Optional.ofNullable(keySchema.get().rawSchema.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,16 @@ public static boolean subjectExists(
return getLatestSchema(srClient, subject).isPresent();
}

public static Optional<ParsedSchema> getParsedSchema(
public static Optional<Integer> getLatestSchemaId(
final SchemaRegistryClient srClient,
final String topic,
final boolean isKey
) {
final String subject = KsqlConstants.getSRSubject(topic, isKey);
return getLatestSchema(srClient, subject).map(SchemaMetadata::getId);
}

public static Optional<ParsedSchema> getLatestParsedSchema(
final SchemaRegistryClient srClient,
final String topic,
final boolean isKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,43 @@ public void shouldReturnParsedSchemaFromSubjectValue() throws Exception {

// When:
final Optional<ParsedSchema> parsedSchema =
SchemaRegistryUtil.getParsedSchema(schemaRegistryClient, "bar", false);
SchemaRegistryUtil.getLatestParsedSchema(schemaRegistryClient, "bar", false);

// Then:
assertThat(parsedSchema.get(), equalTo(AVRO_SCHEMA));
}

@Test
public void shouldReturnSchemaIdFromSubjectKey() throws Exception {
// Given:
when(schemaMetadata.getId()).thenReturn(123);
when(schemaRegistryClient.getLatestSchemaMetadata("bar-key"))
.thenReturn(schemaMetadata);

// When:
final Optional<Integer> schemaId =
SchemaRegistryUtil.getLatestSchemaId(schemaRegistryClient, "bar", true);

// Then:
assertThat(schemaId.get(), equalTo(123));
}

@Test
public void shouldReturnSchemaIdFromSubjectValue() throws Exception {
// Given:
when(schemaMetadata.getId()).thenReturn(123);
when(schemaRegistryClient.getLatestSchemaMetadata("bar-value"))
.thenReturn(schemaMetadata);

// When:
final Optional<Integer> schemaId =
SchemaRegistryUtil.getLatestSchemaId(schemaRegistryClient, "bar", false);

// Then:
assertThat(schemaId.get(), equalTo(123));
}


@Test
public void shouldReturnParsedSchemaFromSubjectKey() throws Exception {
// Given:
Expand All @@ -94,7 +125,7 @@ public void shouldReturnParsedSchemaFromSubjectKey() throws Exception {

// When:
final Optional<ParsedSchema> parsedSchema =
SchemaRegistryUtil.getParsedSchema(schemaRegistryClient, "bar", true);
SchemaRegistryUtil.getLatestParsedSchema(schemaRegistryClient, "bar", true);

// Then:
assertThat(parsedSchema.get(), equalTo(AVRO_SCHEMA));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,24 @@

package io.confluent.ksql.test.serde.protobuf;

import static io.confluent.connect.protobuf.ProtobufDataConfig.SCHEMAS_CACHE_SIZE_CONFIG;
import static io.confluent.connect.protobuf.ProtobufDataConfig.WRAPPER_FOR_RAW_PRIMITIVES_CONFIG;

import com.google.common.collect.ImmutableMap;
import io.confluent.connect.protobuf.ProtobufConverter;
import io.confluent.connect.protobuf.ProtobufData;
import io.confluent.connect.protobuf.ProtobufDataConfig;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.ksql.serde.protobuf.ProtobufProperties;
import io.confluent.ksql.serde.protobuf.ProtobufSchemaTranslator;
import io.confluent.ksql.test.serde.ConnectSerdeSupplier;
import org.apache.kafka.connect.data.Schema;

public class ValueSpecProtobufSerdeSupplier extends ConnectSerdeSupplier<ProtobufSchema> {

private final boolean unwrapPrimitives;
private final ProtobufSchemaTranslator schemaTranslator;

public ValueSpecProtobufSerdeSupplier(final boolean unwrapPrimitives) {
public ValueSpecProtobufSerdeSupplier(final ProtobufProperties protobufProperties) {
super(ProtobufConverter::new);
this.unwrapPrimitives = unwrapPrimitives;
this.schemaTranslator = new ProtobufSchemaTranslator(protobufProperties);
}

@Override
protected Schema fromParsedSchema(final ProtobufSchema schema) {
return new ProtobufData(new ProtobufDataConfig(ImmutableMap.of(
SCHEMAS_CACHE_SIZE_CONFIG, 1,
WRAPPER_FOR_RAW_PRIMITIVES_CONFIG, unwrapPrimitives
))).toConnectSchema(schema);
return schemaTranslator.toConnectSchema(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static SerdeSupplier<?> getSerdeSupplier(
case AvroFormat.NAME: return new ValueSpecAvroSerdeSupplier();
case ProtobufFormat.NAME:
return new ValueSpecProtobufSerdeSupplier(
new ProtobufProperties(formatInfo.getProperties()).getUnwrapPrimitives());
new ProtobufProperties(formatInfo.getProperties()));
case JsonFormat.NAME: return new ValueSpecJsonSerdeSupplier(false, properties);
case JsonSchemaFormat.NAME: return new ValueSpecJsonSerdeSupplier(true, properties);
case DelimitedFormat.NAME: return new StringSerdeSupplier();
Expand Down
Loading

0 comments on commit 93bfe06

Please sign in to comment.