-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-clients]Store key part of a KeyValue schema into pulsar message keys #4117
[pulsar-clients]Store key part of a KeyValue schema into pulsar message keys #4117
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tuteng thank you for working on this feature. However I think this change is taking a wrong approach.
The current problem of KeyValue schema is that KeyValue schema stores key and value as the whole in message payload. So the application can not leverage a lot of great features that pulsar has around message key (such as compaction, partitioning and etc).
As the caption of this PR states, the intention of this task is to provide a mechanism to store the key of the KeyValue
pair in pulsar message key. It is not about omitting the key.
So I think the right approach should be:
- introduce an encoding type in
KeyValueSchema
- we can call itKeyValueEncodingType
. so that when people construct a KeyValueSchema, they can specify which encoding type to use.
public enum KeyValueEncodingType {
// key is stored as message key, while value is stored as message payload
SEPARATED,
// key and value are stored as message payload
INLINE
}
-
the KeyValueSchema should store the
KeyValueEncodingType
into the properties of its schema info:kv.encoding.type
. -
You might need to change
KeyValueSchema
to provide a methoddecode(byte[] keyBytes, byte[] valBytes)
to decode bytes to KeyValue object. whether keyBytes is from message key or payload is determined byKeyValueEncodingType
(stored as part of keyvalue schema). -
This most tricky part is how to handle the
Key
generic type. due to historical reasons, we didn't provide a natural support of<K, V>
in current typed interfaces.
First, you might need to change the encoding path to handle key/value schema in TypedMessageBuilder. You need to do something like below:
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
// set key as the message key
msgMetadataBuilder.setPartitionKey(
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
// set value as the payload
this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
return this;
}
}
Second, you need to change the decode path in MessageImpl. You need to decode the key part from message key when INLINE
encoding type is used in key/value schema. so you need to do something like this:
if (schema.supportSchemaVersioning()) {
return schema.decode(getData(), getSchemaVersion());
} else if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return schema.decode(getKeyBytes(), getData());
} else {
return schema.decode(getData());
}
} else {
return schema.decode(getData());
}
I am not sure if these are the best to implement this. but that's what I can think of based on my understanding of schema right now.
Hope this can help you understand the current situation of key/value schema.
@jiazhai since the key/value schema was introduced by you for supporting kafka connect and debezium, please take a look at my comment and see if I understand this correctly or not. |
/cc @congbobo184 @codelipenghui this might be related to your multi-version schema support in zhaopin |
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java # pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java Handle conflict
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
Outdated
Show resolved
Hide resolved
rerun integration tests |
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
Outdated
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
Outdated
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValueEncodingType.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Outdated
Show resolved
Hide resolved
Update comment for enum KeyValueEncodingType Add null check for parameter keyValueEncodingType Update Unit Test
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java Handle conflict
Great contribution @tuteng ! Now we can naturally support key schema in message key :) |
run java8 tests |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. left some minor comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me, just a little suggestion
We need to prohibit users using TypedMessageBuilder.key()
when the user is using KeyValueSchema
and using KeyValueEncodingType. SEPARATED
as the serialization type.
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
Outdated
Show resolved
Hide resolved
…ingtype is SEPARATED Add unit test
run java8 tests |
Motivation
The current implementation of KeyValue schema stores key and value together as part of message payload. Ideally the key should be stored as part of message key.
It can be done by introducing a property in KeyValue schema to indicate whether store key in payload or as message key.
Modifications
Verifying this change
Unit test pass