Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the multi version schema support #3876

Merged
merged 49 commits into from Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
54d58c0
add the multi version schema support
congbobo Mar 21, 2019
2776e32
recover the code format
congbobo Mar 21, 2019
9714c9d
recover the code format
congbobo Mar 21, 2019
9447588
Modify the cache name
congbobo Mar 21, 2019
b01f509
set schema provider in struct schema
congbobo Mar 25, 2019
5dae355
Modify the jude type
congbobo Mar 26, 2019
ee531dc
Modify the judge condition
congbobo Mar 26, 2019
cea87d0
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Mar 27, 2019
70ca1fe
add the schema reader and writer
congbobo Apr 1, 2019
c491e42
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 1, 2019
ed51af9
generic schema decode by schema version
congbobo Apr 3, 2019
dd838ce
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 3, 2019
9597bc0
remove the dependency
congbobo Apr 3, 2019
7571d7e
modify the generic schema decode type name
congbobo Apr 3, 2019
c58da0b
modify the consumerImpl
congbobo Apr 9, 2019
a2c45bf
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 10, 2019
d6b908a
struct schema refactor.
congbobo Apr 10, 2019
d6c90bb
modify the ProtobufSchema
congbobo Apr 10, 2019
10af2c2
modify the ProtobufSchema
congbobo Apr 11, 2019
a3b6966
add schemInfo name
congbobo Apr 11, 2019
4295758
delete AutoProduceBytesSchema in producerBase
congbobo Apr 11, 2019
5abb561
return to the original condition of AutoConsumeSchema and AutoProduce…
congbobo Apr 11, 2019
d8a29a0
return to the original condition of AutoConsumeSchema and AutoProduce…
congbobo Apr 11, 2019
3350a80
return to the original condition of AutoConsumeSchema and AutoProduce…
congbobo Apr 11, 2019
18a9170
modify the setSchemaProvider location for create consumer
congbobo Apr 12, 2019
b74beed
modify the setSchemaProvider location for create consumer
congbobo Apr 12, 2019
c939809
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 12, 2019
db9cb75
1. schema provider logic in client
congbobo Apr 12, 2019
0c7f5c4
restore the consumeImpl
congbobo Apr 12, 2019
3c5a03a
delete the redundant /n
congbobo Apr 12, 2019
59a260c
restore the ConsumerImpl
congbobo Apr 12, 2019
468a861
restore the ConsumerImpl
congbobo Apr 12, 2019
ff0aebf
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 13, 2019
c2887f2
Add genericAvroReader test
congbobo Apr 13, 2019
291ab9f
add the license header
congbobo Apr 13, 2019
42407b9
* add the MultiVersionschemaInfoProvider Exception handle
congbobo Apr 14, 2019
5e4a991
* delete the superfluous
congbobo Apr 14, 2019
c30f3f9
* fix the HbaseGenericRecordSinkTest and SolrGenericRecordSinkTest, g…
congbobo Apr 15, 2019
ffeec02
* fix InfluxDBGenericRecordSinkTest
congbobo Apr 15, 2019
4ff3214
* fix the messsageImpl getValue logic
congbobo Apr 15, 2019
8dd3d79
* add schemaInfoProvider into multiTopicsConsumerImpl
congbobo Apr 22, 2019
c2e7724
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 22, 2019
2f87051
* modify the location for schema for preProcessSchemaBeforeSubscribe
congbobo Apr 22, 2019
e210c86
* modify the location of add schemaInfoProvider
congbobo Apr 22, 2019
564dcee
* modify the topic name to get schema Info in MultiTopicsConsumerImpl
congbobo Apr 22, 2019
66819a7
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 22, 2019
cd5b485
modify the exception in multi topic for getting provider
congbobo Apr 24, 2019
0691bf3
* add the return
congbobo Apr 24, 2019
091f69c
* modify the parameter name for schema writer method
congbobo Apr 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -80,6 +81,9 @@ default boolean supportSchemaVersioning() {
return false;
}

default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
}

/**
* Decode a byte array into an object using the schema definition and deserializer implementation
*
Expand Down Expand Up @@ -183,7 +187,17 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
return DefaultImplementation.newProtobufSchema(clazz);
return DefaultImplementation.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
}

/**
* Create a Protobuf schema type with schema definition.
*
* @param schemaDefinition schemaDefinition the definition of the schema
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.newProtobufSchema(schemaDefinition);
}

/**
Expand Down Expand Up @@ -300,7 +314,7 @@ static Schema<?> getSchema(SchemaInfo schemaInfo) {
* @param schemaInfo schema info
* @return a generic schema instance
*/
static GenericSchema generic(SchemaInfo schemaInfo) {
static GenericSchema<GenericRecord> generic(SchemaInfo schemaInfo) {
return DefaultImplementation.getGenericSchema(schemaInfo);
}
}
Expand Up @@ -24,7 +24,7 @@
/**
* A schema that serializes and deserializes between {@link GenericRecord} and bytes.
*/
public interface GenericSchema extends Schema<GenericRecord> {
public interface GenericSchema<T extends GenericRecord> extends Schema<T> {

/**
* Returns the list of fields.
Expand Down
Expand Up @@ -61,4 +61,11 @@ static <T> SchemaDefinitionBuilder<T> builder() {
* @return pojo schema
*/
public Class<T> getPojo();

/**
* Get supportSchemaVersioning schema definition
*
* @return the flag of supportSchemaVersioning
*/
public boolean getSupportSchemaVersioning();
}
Expand Up @@ -49,7 +49,7 @@ public interface SchemaDefinitionBuilder<T> {
* @param key property key
* @param value property value
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> addProperty(String key, String value);

Expand All @@ -58,7 +58,7 @@ public interface SchemaDefinitionBuilder<T> {
*
* @param pojo pojo schema definition
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withPojo(Class pojo);

Expand All @@ -67,10 +67,19 @@ public interface SchemaDefinitionBuilder<T> {
*
* @param jsonDefinition json schema definition
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);

/**
* Set schema whether decode by schema version
*
* @param supportSchemaVersioning decode by version
*
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning);

/**
* Build the schema definition.
*
Expand Down
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.common.schema.SchemaInfo;

/**
* Schema Provider.
*/
public interface SchemaInfoProvider {

/**
* Retrieve the schema info of a given <tt>schemaVersion</tt>.
*
* @param schemaVersion schema version
* @return schema info of the provided <tt>schemaVersion</tt>
*/
SchemaInfo getSchemaByVersion(byte[] schemaVersion);

/**
* Retrieve the latest schema info.
*
* @return the latest schema
*/
SchemaInfo getLatestSchema();

/**
* Retrieve the topic name.
*
* @return the topic name
*/
public String getTopicName();

}
Expand Up @@ -16,21 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.generic;
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.client.api.Schema;

/**
* Schema Provider.
*/
public interface SchemaProvider<T> {
public interface SchemaReader<T> {

/**
* Retrieve the schema instance of a given <tt>schemaVersion</tt>.
* serialize bytes convert pojo
*
* @param schemaVersion schema version
* @return schema instance of the provided <tt>schemaVersion</tt>
* @param bytes the data
* @return the serialized object
*/
Schema<T> getSchema(byte[] schemaVersion);

T read(byte[] bytes);
}
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

public interface SchemaWriter<T> {

/**
* serialize the message into bytes
*
* @param message the message for encode
* @return the serialized bytes
*/
byte[] write(T message);
}
Expand Up @@ -214,10 +214,10 @@ public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
.invoke(null,schemaDefinition));
}

public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", Class.class)
.invoke(null, clazz));
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", SchemaDefinition.class)
.invoke(null, schemaDefinition));
}

public static <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition) {
Expand Down Expand Up @@ -262,7 +262,7 @@ public static Schema<?> getSchema(SchemaInfo schemaInfo) {
"getSchema", SchemaInfo.class).invoke(null, schemaInfo));
}

public static GenericSchema getGenericSchema(SchemaInfo schemaInfo) {
public static GenericSchema<GenericRecord> getGenericSchema(SchemaInfo schemaInfo) {
return catchExceptions(
() -> (GenericSchema) getStaticMethod("org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl",
"of", SchemaInfo.class).invoke(null, schemaInfo));
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
Expand All @@ -31,6 +32,7 @@
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
@Builder
public class SchemaInfo {

@EqualsAndHashCode.Exclude
Expand Down
Expand Up @@ -230,7 +230,7 @@ public byte[] getData() {

@Override
public byte[] getSchemaVersion() {
if (msgMetadataBuilder.hasSchemaVersion()) {
if (msgMetadataBuilder != null && msgMetadataBuilder.hasSchemaVersion()) {
return msgMetadataBuilder.getSchemaVersion().toByteArray();
} else {
return null;
Expand All @@ -241,8 +241,9 @@ public byte[] getSchemaVersion() {
public T getValue() {
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
if (schema.supportSchemaVersioning()) {
return schema.decode(getData(), getSchemaVersion());
byte [] schemaVersion = getSchemaVersion();
if (schema.supportSchemaVersioning() && schemaVersion != null) {
return schema.decode(getData(), schemaVersion);
} else {
return schema.decode(getData());
}
Expand Down
Expand Up @@ -742,6 +742,13 @@ private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S

List<CompletableFuture<Consumer<T>>> futureList;

try {
client.preProcessSchemaBeforeSubscribe(client, schema, topicName);
} catch (Throwable t) {
subscribeResult.completeExceptionally(t);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@congbobo184 don't you need to return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, It should be

return;
}

if (numPartitions > 1) {
this.topics.putIfAbsent(topicName, numPartitions);
allTopicPartitionsNumber.addAndGet(numPartitions);
Expand Down