Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Create stream fails when multiple Protobuf schema definitions exist #8933

Merged
merged 6 commits into from
Apr 13, 2022

Conversation

spena
Copy link
Member

@spena spena commented Mar 25, 2022

Description

Fixes #5265

Ignore 1st commit 58e4703 which is being reviewed here -> #8984

This PR supports the KEY_SCHEMA_FULL_NAME and VALUE_SCHEMA_FULL_NAME properties in the WITH clause when creating a stream. These properties allows users to specify the full name of a protobuf schema instead of using a default name (current default is ConnectDefault1).

When registering a new schema, if the properties are given, then the schema will be registered in SR with the new name, i.e.

ksql> create stream t1(id int key, c1 double, c2 int) with (kafka_topic='t1', format='protobuf', key_schema_full_name='ProtobufKey1', value_schema_full_name='ProtobufValue1', partitions=1);

 Message        
----------------
 Stream created 
----------------

ksql> describe t1;

Name                 : T1
 Field | Type                   
--------------------------------
 ID    | INTEGER          (key) 
 C1    | DOUBLE                 
 C2    | INTEGER                
--------------------------------

$ curl -X GET http://localhost:8081/subjects/t1-key/versions/1/schema
syntax = "proto3";

message ProtobufKey1 {
  int32 ID = 1;
}

$ curl -X GET http://localhost:8081/subjects/t1-value/versions/1/schema
syntax = "proto3";

message ProtobufValue1 {
  double C1 = 1;
  int32 C2 = 2;
}

The KEY/VALUE properties can also be used to specify the name of the Protobuf message schema if multiple schema definitions are available in SR. i.e.

$ curl -X GET http://localhost:8081/subjects/s1-key/versions/2/schema
syntax = "proto3";

message ProtobufKey1 {
  int32 ID = 1;
}
message ProtobufKey2 {
  string ID = 1;
}

$ curl -X GET http://localhost:8081/subjects/s1-value/versions/2/schema
syntax = "proto3";

message ProtobufValue1 {
  double C1 = 1;
  int32 C2 = 2;
}
message ProtobufValue2 {
  int32 C1 = 1;
}

ksql> create stream s1 with (kafka_topic='s1', format='protobuf', key_schema_full_name='ProtobufKey2', value_schema_full_name='ProtobufValue2', partitions=1);

 Message        
----------------
 Stream created 
----------------

ksql> describe s1;

Name                 : S1
 Field | Type                   
--------------------------------
 ID    | VARCHAR(STRING)  (key) 
 C1    | INTEGER                
--------------------------------

ksql> drop stream s1;

ksql> create stream s1 with (kafka_topic='s1', format='protobuf', key_schema_full_name='ProtobufKey1', value_schema_full_name='ProtobufValue1', partitions=1);

 Message        
----------------
 Stream created 
----------------

ksql> describe s1;

Name                 : S1
 Field | Type                   
--------------------------------
 ID    | INTEGER          (key) 
 C1    | DOUBLE                 
 C2    | INTEGER                
--------------------------------

The KEY/VALUE properties are already supported for AVRO. This PR assumes those properties were meant for other formats too, but when Protobuf was implemented then these props weren't fixed.

Testing done

Manually. See the above examples.
I have some limitations with QTT that does not allow me to test the KEY/VALUE properties.
Reason:

I can't make the multiple protobuf schema work because QTT first prepares all input/output topics and schema before executing statements, which sets up Protobuf translators without the properties specified in the CREATE statements

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@spena spena requested a review from a team as a code owner March 25, 2022 21:38
@pgaref
Copy link
Member

pgaref commented Mar 26, 2022

Thanks for this @spena! A bit more general question: multiple schema definitions can exist and users can specify what they want to pick -- however I noticed that multiple schema versions can also exist? Are we assuming users always pick the latest?

@spena
Copy link
Member Author

spena commented Mar 28, 2022

@pgaref Yes, multiple schema versions can exist. There is another property to select the schema version or SCHEMA_ID. This property and the one for this PR may be combined, i.e.

create stream s1 with (schema_id='5', value_schema_full_name='Proto1');

Notice that I am not using a version in schema_id. Each new schema registered in SR has a schema ID, so we support ID only.

@spena spena force-pushed the p0_protobuf_schema branch 2 times, most recently from 1f5aa27 to 975caa7 Compare April 1, 2022 20:00
@CLAassistant
Copy link

CLAassistant commented Apr 1, 2022

CLA assistant check
All committers have signed the CLA.

@colinhicks
Copy link
Member

@spena does the latest commit for insert values also cover this bug? #6091

@spena spena force-pushed the p0_protobuf_schema branch 2 times, most recently from 9b7bca1 to de82ba2 Compare April 7, 2022 18:37
Copy link
Member

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aweseome! LGTM overall. Some small nits.

/**
* Returns a list of schema names found in the {@code ParsedSchema}.
*/
default List<String> getSchemaDefinitions(final ParsedSchema schema) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not applicable to avro or json_sr? They should have at least one definition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I just added the method to ConnectFormat and return a single name as default there. I still don't know how Json and Avro have multiple definitions, so they will return one single name. For others formats, such as delimited, it still returns Zero (the default from Format.java). I also changed the method name to schemaFullNames

@@ -80,4 +85,18 @@ protected ConnectSchemaTranslator getConnectSchemaTranslator(
return new ProtobufSerdeFactory(new ProtobufProperties(formatProps))
.createSerde(connectSchema, config, srFactory, targetType, isKey);
}

@Override
public List<String> getSchemaDefinitions(final ParsedSchema schema) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some test for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests

Comment on lines 81 to 91
return new ProtobufData(new ProtobufDataConfig(updatedConfigs))
.fromConnectSchema(injectSchemaFullName(schema));
}

private ProtobufSchema withSchemaFullName(final ProtobufSchema origSchema) {
return fullNameSchema.map(origSchema::copy).orElse(origSchema);
}

private Schema injectSchemaFullName(final Schema origSchema) {
return fullNameSchema
.map(fullName -> SchemaFullNameAppender.appendSchemaFullName(origSchema, fullName))
.orElse(origSchema);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add some tests for these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests

@spena spena force-pushed the p0_protobuf_schema branch 4 times, most recently from dab7996 to 45c2aec Compare April 11, 2022 20:46
@@ -106,6 +106,15 @@ public static boolean subjectExists(
return getLatestSchema(srClient, subject).isPresent();
}

public static Optional<Integer> getSchemaId(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getLatestSchemaId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 505 to 536
if (supportedProperties.contains(ConnectProperties.SCHEMA_ID)) {
if (!formatInfo.getProperties().containsKey(ConnectProperties.SCHEMA_ID)) {
SchemaRegistryUtil.getSchemaId(srClient, topicName, isKey)
.ifPresent(schemaId ->
propertiesBuilder.put(ConnectProperties.SCHEMA_ID, String.valueOf(schemaId)));
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? This seems to change existing logic of data serialization drastically

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need the Serde factory to get the Schema and extract the specified schema definition from SR. This happens in ProtobufSerdeFactory.createSerde() which calls SerdeUtils.getAndTranslateSchema() when the schemaId is present.

Also, by setting the SCHEMA_ID, the ProtobufSerdeFactory.getConverter() disables the AUTO_REGISTER_SCHEMAS
to prevent failing because multiple schema definitions on SR.

The same happens for Avro and JsonSR factories. I could refactor the interfaces and classes to accept the physical schema extracted, which is what I need. But seems schemaId does the same, so I reuse it.

What other things does the schemaId do that it's for concern?

Copy link
Member

@lihaosky lihaosky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. We will only inject schema id during insertion time when it's protobuf and has multiple messages.

One caveat is that if schema in SR has multiple messages, CREATE or replace with more columns and then insertion will always fail because old schema in SR has less columns and we don't do autoregistration to SR.

Approve the PR to unblock but @spena will merge after fixing the schema id injection logic and do more testing :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KSQL creates streams with wrong Protobuf schema if the Protobuf definition has multiple messages
6 participants