New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the multi version schema support #3876
Changes from 47 commits
54d58c0
2776e32
9714c9d
9447588
b01f509
5dae355
ee531dc
cea87d0
70ca1fe
c491e42
ed51af9
dd838ce
9597bc0
7571d7e
c58da0b
a2c45bf
d6b908a
d6c90bb
10af2c2
a3b6966
4295758
5abb561
d8a29a0
3350a80
18a9170
b74beed
c939809
db9cb75
0c7f5c4
3c5a03a
59a260c
468a861
ff0aebf
c2887f2
291ab9f
42407b9
5e4a991
c30f3f9
ffeec02
4ff3214
8dd3d79
c2e7724
2f87051
e210c86
564dcee
66819a7
cd5b485
0691bf3
091f69c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.client.api.schema; | ||
|
||
import org.apache.pulsar.common.schema.SchemaInfo; | ||
|
||
/** | ||
* Schema Provider. | ||
*/ | ||
public interface SchemaInfoProvider { | ||
|
||
/** | ||
* Retrieve the schema info of a given <tt>schemaVersion</tt>. | ||
* | ||
* @param schemaVersion schema version | ||
* @return schema info of the provided <tt>schemaVersion</tt> | ||
*/ | ||
SchemaInfo getSchemaByVersion(byte[] schemaVersion); | ||
|
||
/** | ||
* Retrieve the latest schema info. | ||
* | ||
* @return the latest schema | ||
*/ | ||
SchemaInfo getLatestSchema(); | ||
|
||
/** | ||
* Retrieve the topic name. | ||
* | ||
* @return the topic name | ||
*/ | ||
public String getTopicName(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.client.api.schema; | ||
|
||
public interface SchemaWriter<T> { | ||
|
||
/** | ||
* serialize the object into bytes | ||
* | ||
* @param pojo the pojo | ||
* @return the serialized bytes | ||
*/ | ||
byte[] write(T pojo); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -230,7 +230,7 @@ public byte[] getData() { | |
|
||
@Override | ||
public byte[] getSchemaVersion() { | ||
if (msgMetadataBuilder.hasSchemaVersion()) { | ||
if (msgMetadataBuilder != null && msgMetadataBuilder.hasSchemaVersion() ) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove the addition space " " between ") )" |
||
return msgMetadataBuilder.getSchemaVersion().toByteArray(); | ||
} else { | ||
return null; | ||
|
@@ -241,8 +241,9 @@ public byte[] getSchemaVersion() { | |
public T getValue() { | ||
// check if the schema passed in from client supports schema versioning or not | ||
// this is an optimization to only get schema version when necessary | ||
if (schema.supportSchemaVersioning()) { | ||
return schema.decode(getData(), getSchemaVersion()); | ||
byte [] schemaVersion = getSchemaVersion(); | ||
if (schema.supportSchemaVersioning() && schemaVersion != null) { | ||
return schema.decode(getData(), schemaVersion); | ||
} else { | ||
return schema.decode(getData()); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -742,6 +742,12 @@ private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S | |
|
||
List<CompletableFuture<Consumer<T>>> futureList; | ||
|
||
try { | ||
client.preProcessSchemaBeforeSubscribe(client, schema, topicName); | ||
} catch (Throwable t) { | ||
subscribeResult.completeExceptionally(t); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @congbobo184 don't you need to return here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, It should be |
||
} | ||
|
||
if (numPartitions > 1) { | ||
this.topics.putIfAbsent(topicName, numPartitions); | ||
allTopicPartitionsNumber.addAndGet(numPartitions); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the parameter name here correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change it named message.