Skip to content

Commit

Permalink
Add the multi version schema support (#3876)
Browse files Browse the repository at this point in the history
### Motivation

Fix #3742

In order to decode the message correctly by AVRO schema, we need to know the schema what the message is.

### Modification

- Introduced Schema Reader and Schema Writer for StructSchema.
   - Reader is used to decode message
   - Writer is used to encode message
- The implementations of StructSchema, provides their schema reader and writer implementations. 
- Introduced a schema reader cache for caching the readers for different schema versions.
  • Loading branch information
congbobo184 authored and sijie committed Apr 28, 2019
1 parent d1d5aba commit d5ff082
Show file tree
Hide file tree
Showing 48 changed files with 1,337 additions and 534 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -80,6 +81,9 @@ default boolean supportSchemaVersioning() {
return false;
}

default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
}

/**
* Decode a byte array into an object using the schema definition and deserializer implementation
*
Expand Down Expand Up @@ -183,7 +187,17 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
return DefaultImplementation.newProtobufSchema(clazz);
return DefaultImplementation.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
}

/**
* Create a Protobuf schema type with schema definition.
*
* @param schemaDefinition schemaDefinition the definition of the schema
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.newProtobufSchema(schemaDefinition);
}

/**
Expand Down Expand Up @@ -300,7 +314,7 @@ static Schema<?> getSchema(SchemaInfo schemaInfo) {
* @param schemaInfo schema info
* @return a generic schema instance
*/
static GenericSchema generic(SchemaInfo schemaInfo) {
static GenericSchema<GenericRecord> generic(SchemaInfo schemaInfo) {
return DefaultImplementation.getGenericSchema(schemaInfo);
}
}
Expand Up @@ -24,7 +24,7 @@
/**
* A schema that serializes and deserializes between {@link GenericRecord} and bytes.
*/
public interface GenericSchema extends Schema<GenericRecord> {
public interface GenericSchema<T extends GenericRecord> extends Schema<T> {

/**
* Returns the list of fields.
Expand Down
Expand Up @@ -61,4 +61,11 @@ static <T> SchemaDefinitionBuilder<T> builder() {
* @return pojo schema
*/
public Class<T> getPojo();

/**
* Get supportSchemaVersioning schema definition
*
* @return the flag of supportSchemaVersioning
*/
public boolean getSupportSchemaVersioning();
}
Expand Up @@ -49,7 +49,7 @@ public interface SchemaDefinitionBuilder<T> {
* @param key property key
* @param value property value
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> addProperty(String key, String value);

Expand All @@ -58,7 +58,7 @@ public interface SchemaDefinitionBuilder<T> {
*
* @param pojo pojo schema definition
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withPojo(Class pojo);

Expand All @@ -67,10 +67,19 @@ public interface SchemaDefinitionBuilder<T> {
*
* @param jsonDefinition json schema definition
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);

/**
* Set schema whether decode by schema version
*
* @param supportSchemaVersioning decode by version
*
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning);

/**
* Build the schema definition.
*
Expand Down
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.common.schema.SchemaInfo;

/**
* Schema Provider.
*/
public interface SchemaInfoProvider {

/**
* Retrieve the schema info of a given <tt>schemaVersion</tt>.
*
* @param schemaVersion schema version
* @return schema info of the provided <tt>schemaVersion</tt>
*/
SchemaInfo getSchemaByVersion(byte[] schemaVersion);

/**
* Retrieve the latest schema info.
*
* @return the latest schema
*/
SchemaInfo getLatestSchema();

/**
* Retrieve the topic name.
*
* @return the topic name
*/
public String getTopicName();

}
Expand Up @@ -16,21 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.generic;
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.client.api.Schema;

/**
* Schema Provider.
*/
public interface SchemaProvider<T> {
public interface SchemaReader<T> {

/**
* Retrieve the schema instance of a given <tt>schemaVersion</tt>.
* serialize bytes convert pojo
*
* @param schemaVersion schema version
* @return schema instance of the provided <tt>schemaVersion</tt>
* @param bytes the data
* @return the serialized object
*/
Schema<T> getSchema(byte[] schemaVersion);

T read(byte[] bytes);
}
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

public interface SchemaWriter<T> {

/**
* serialize the message into bytes
*
* @param message the message for encode
* @return the serialized bytes
*/
byte[] write(T message);
}
Expand Up @@ -214,10 +214,10 @@ public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
.invoke(null,schemaDefinition));
}

public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", Class.class)
.invoke(null, clazz));
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", SchemaDefinition.class)
.invoke(null, schemaDefinition));
}

public static <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition) {
Expand Down Expand Up @@ -262,7 +262,7 @@ public static Schema<?> getSchema(SchemaInfo schemaInfo) {
"getSchema", SchemaInfo.class).invoke(null, schemaInfo));
}

public static GenericSchema getGenericSchema(SchemaInfo schemaInfo) {
public static GenericSchema<GenericRecord> getGenericSchema(SchemaInfo schemaInfo) {
return catchExceptions(
() -> (GenericSchema) getStaticMethod("org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl",
"of", SchemaInfo.class).invoke(null, schemaInfo));
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
Expand All @@ -31,6 +32,7 @@
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
@Builder
public class SchemaInfo {

@EqualsAndHashCode.Exclude
Expand Down
Expand Up @@ -230,7 +230,7 @@ public byte[] getData() {

@Override
public byte[] getSchemaVersion() {
if (msgMetadataBuilder.hasSchemaVersion()) {
if (msgMetadataBuilder != null && msgMetadataBuilder.hasSchemaVersion()) {
return msgMetadataBuilder.getSchemaVersion().toByteArray();
} else {
return null;
Expand All @@ -241,8 +241,9 @@ public byte[] getSchemaVersion() {
public T getValue() {
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
if (schema.supportSchemaVersioning()) {
return schema.decode(getData(), getSchemaVersion());
byte [] schemaVersion = getSchemaVersion();
if (schema.supportSchemaVersioning() && schemaVersion != null) {
return schema.decode(getData(), schemaVersion);
} else {
return schema.decode(getData());
}
Expand Down
Expand Up @@ -742,6 +742,13 @@ private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S

List<CompletableFuture<Consumer<T>>> futureList;

try {
client.preProcessSchemaBeforeSubscribe(client, schema, topicName);
} catch (Throwable t) {
subscribeResult.completeExceptionally(t);
return;
}

if (numPartitions > 1) {
this.topics.putIfAbsent(topicName, numPartitions);
allTopicPartitionsNumber.addAndGet(numPartitions);
Expand Down

0 comments on commit d5ff082

Please sign in to comment.