New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the multi version schema support #3876
Add the multi version schema support #3876
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation here is too customized to AvroSchema. I don't think it is a good implementation. Ideally I think multi-versioning should be handled by StructSchema
class. For StructSchema
, you need a writer
schema provider and a reader
schema provider. By default both writer
and reader
schema provider returns the schema generated by the pojo.
When a schema is used in producer, pulsar will fetch the latest schema from broker, and set reader schema to the broker side schema. so it has writer schema (generated from pojo) and reader schema (read from broker) to serialize the data.
When a schema is used in consumer, the writer schema provider will be the multi-version schema provider (which reads and caches schemas from broker), and the reader schema is the schema generated from POJO.
We should try to do this in an abstract way. So all the subclasses of StructSchema
will have a writer schema and a reader schema for serializing and deserializing data.
@@ -83,6 +84,7 @@ | |||
private final ConnectionPool cnxPool; | |||
private final Timer timer; | |||
private final ExecutorProvider externalExecutorProvider; | |||
private Map<String, MultiVersionSchema> multiVersionSchemaCatch = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean Cache
? multiVersionSchemaCache
?
@@ -152,6 +155,17 @@ | |||
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, | |||
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, | |||
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) { | |||
if(schema instanceof AvroSchema && schema.supportSchemaVersioning()){ | |||
Map<String, MultiVersionSchema> multiVersionSchemaCatch = client.getMultiVersionSchemaCatch(); | |||
if(multiVersionSchemaCatch.get(topic) != null){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(multiVersionSchemaCatch.get(topic) != null){ | |
MultiVersionSchema schema = multiVersionSchemaCache.get(topic); | |
if(schema == null) { | |
... | |
} |
private String name; | ||
private int age; | ||
@AvroDefault("\"man\"") | ||
private String sex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use gender
(male, and female)
@@ -152,6 +154,16 @@ | |||
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, | |||
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, | |||
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) { | |||
if(schema instanceof AvroSchema){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not schema.supportSchemaVersioning() ?
if(schemaFromCache == null){ | ||
((AvroSchema<T>) schema).setSchemaProvider(new MultiVersionGenericSchemaProvider(TopicName.get(topic), client)); | ||
supportSchemaVersioningSchemaCache.put(topic, schema); | ||
}else{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And some spaces
run Integration Tests |
run java8 tests |
1 similar comment
run java8 tests |
@congbobo184 can you rebase your PR to latest master? |
OK,I'will do it |
…chemaversion # Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 overall looks much clearer now. However I think we still need to push the reader and writer schema logic to StructSchema
.
@@ -31,6 +31,12 @@ | |||
* @param schemaVersion schema version | |||
* @return schema instance of the provided <tt>schemaVersion</tt> | |||
*/ | |||
Schema<T> getSchema(byte[] schemaVersion); | |||
Schema<T> getVersionSchema(byte[] schemaVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getSchema
is probably okay. or we can call it getSchemaByVersion
* | ||
* @return the current schema | ||
*/ | ||
Schema<T> getCurrentSchema() throws InterruptedException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe getLatestSchema
?
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
Outdated
Show resolved
Hide resolved
@@ -83,6 +83,7 @@ | |||
private final ConnectionPool cnxPool; | |||
private final Timer timer; | |||
private final ExecutorProvider externalExecutorProvider; | |||
private Map<String, Schema> supportSchemaVersioningSchemaCache = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either use a concurrent structure or you should synchronize on accessing supportSchemaVersioningSchemaCache
in ConsumerImpl
.
private static final ThreadLocal<BinaryDecoder> decoders = | ||
new ThreadLocal<>(); | ||
private boolean supportSchemaVersioning; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think the code change here should be pushed to StructSchema
. because reader
and writer
schemas should be applied to all struct
schemas, although only Avro can use them now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right, it should push to StructSchema
@@ -152,6 +154,16 @@ | |||
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, | |||
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, | |||
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) { | |||
if (schema != null && schema.supportSchemaVersioning()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I am bit confused about the logic here. why do we need to cache this?
Event we want to cache this, we should cache the MultiVersionGenericSchemaProvider
. but we shouldn't cache the schema. because user can pass in different type of schema instances.
I think we need to push the reader and writer, not reader schema and writer schema. Because, from the point of view of shcema, it shouldn't have the concept of the reader schema or writer schema. |
@congbobo184 yes |
run java8 tests |
run cpp tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 Really great contribution!
I think it is almost ready to go. However I think there is one place you didn't handle the exception very well, can you please take a look and fix it?
try { | ||
client.preProcessSchemaBeforeSubscribe(client, schema, topicName); | ||
} catch (Throwable t) { | ||
throw new RuntimeException(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 you should fail subscribeResult
here, no?
run java8 tests |
try { | ||
client.preProcessSchemaBeforeSubscribe(client, schema, topicName); | ||
} catch (Throwable t) { | ||
subscribeResult.completeExceptionally(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 don't you need to return here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, It should be
run java8 Tests |
/** | ||
* serialize the object into bytes | ||
* | ||
* @param pojo the pojo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the parameter name here correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change it named message.
import org.apache.pulsar.client.api.schema.GenericSchema; | ||
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; | ||
import org.apache.pulsar.client.api.schema.SchemaBuilder; | ||
import org.apache.pulsar.client.api.schema.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We should avoid this "*" kind import
import org.testng.annotations.Test; | ||
|
||
import static org.mockito.Matchers.any; | ||
import static org.mockito.Mockito.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here, avoid * kind import
@@ -230,7 +230,7 @@ public boolean isExpired(int messageTTLInSeconds) { | |||
|
|||
@Override | |||
public byte[] getSchemaVersion() { | |||
if (msgMetadataBuilder.hasSchemaVersion()) { | |||
if (msgMetadataBuilder != null && msgMetadataBuilder.hasSchemaVersion() ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove the addition space " " between ") )"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, over all lgtm. @congbobo184 Added some minor comments
* modify the test import * * remove space
OK,I will change these |
well done @congbobo184 ! this change makes pulsar be able to handle schema versions correctly! |
@sijie Thank you for giving me a lot of programming advice. |
Motivation Pulsar 2.4.0 Added schema versioning to support multi version messages produce and consume apache#3876 apache#3670 apache#4211 apache#4325 apache#4548. but the doc is not updated accordingly. Modifications Update the schema version in the pulsar registry doc for releases 2.4.0/2.4.1/2.4.2.
Motivation
Fix #3742
In order to decode the message correctly by AVRO schema, we need to know the schema what the message is.
Modification
For structSchema add reader and writer
Modifications
Verifying this change
Add multiVersionSchema
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (yes)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)