Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the multi version schema support #3876

Merged
merged 49 commits into from Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
54d58c0
add the multi version schema support
congbobo Mar 21, 2019
2776e32
recover the code format
congbobo Mar 21, 2019
9714c9d
recover the code format
congbobo Mar 21, 2019
9447588
Modify the cache name
congbobo Mar 21, 2019
b01f509
set schema provider in struct schema
congbobo Mar 25, 2019
5dae355
Modify the jude type
congbobo Mar 26, 2019
ee531dc
Modify the judge condition
congbobo Mar 26, 2019
cea87d0
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Mar 27, 2019
70ca1fe
add the schema reader and writer
congbobo Apr 1, 2019
c491e42
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 1, 2019
ed51af9
generic schema decode by schema version
congbobo Apr 3, 2019
dd838ce
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 3, 2019
9597bc0
remove the dependency
congbobo Apr 3, 2019
7571d7e
modify the generic schema decode type name
congbobo Apr 3, 2019
c58da0b
modify the consumerImpl
congbobo Apr 9, 2019
a2c45bf
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 10, 2019
d6b908a
struct schema refactor.
congbobo Apr 10, 2019
d6c90bb
modify the ProtobufSchema
congbobo Apr 10, 2019
10af2c2
modify the ProtobufSchema
congbobo Apr 11, 2019
a3b6966
add schemInfo name
congbobo Apr 11, 2019
4295758
delete AutoProduceBytesSchema in producerBase
congbobo Apr 11, 2019
5abb561
return to the original condition of AutoConsumeSchema and AutoProduce…
congbobo Apr 11, 2019
d8a29a0
return to the original condition of AutoConsumeSchema and AutoProduce…
congbobo Apr 11, 2019
3350a80
return to the original condition of AutoConsumeSchema and AutoProduce…
congbobo Apr 11, 2019
18a9170
modify the setSchemaProvider location for create consumer
congbobo Apr 12, 2019
b74beed
modify the setSchemaProvider location for create consumer
congbobo Apr 12, 2019
c939809
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 12, 2019
db9cb75
1. schema provider logic in client
congbobo Apr 12, 2019
0c7f5c4
restore the consumeImpl
congbobo Apr 12, 2019
3c5a03a
delete the redundant /n
congbobo Apr 12, 2019
59a260c
restore the ConsumerImpl
congbobo Apr 12, 2019
468a861
restore the ConsumerImpl
congbobo Apr 12, 2019
ff0aebf
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 13, 2019
c2887f2
Add genericAvroReader test
congbobo Apr 13, 2019
291ab9f
add the license header
congbobo Apr 13, 2019
42407b9
* add the MultiVersionschemaInfoProvider Exception handle
congbobo Apr 14, 2019
5e4a991
* delete the superfluous
congbobo Apr 14, 2019
c30f3f9
* fix the HbaseGenericRecordSinkTest and SolrGenericRecordSinkTest, g…
congbobo Apr 15, 2019
ffeec02
* fix InfluxDBGenericRecordSinkTest
congbobo Apr 15, 2019
4ff3214
* fix the messsageImpl getValue logic
congbobo Apr 15, 2019
8dd3d79
* add schemaInfoProvider into multiTopicsConsumerImpl
congbobo Apr 22, 2019
c2e7724
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 22, 2019
2f87051
* modify the location for schema for preProcessSchemaBeforeSubscribe
congbobo Apr 22, 2019
e210c86
* modify the location of add schemaInfoProvider
congbobo Apr 22, 2019
564dcee
* modify the topic name to get schema Info in MultiTopicsConsumerImpl
congbobo Apr 22, 2019
66819a7
Merge remote-tracking branch 'apache/master' into schema_analysis_bys…
congbobo Apr 22, 2019
cd5b485
modify the exception in multi topic for getting provider
congbobo Apr 24, 2019
0691bf3
* add the return
congbobo Apr 24, 2019
091f69c
* modify the parameter name for schema writer method
congbobo Apr 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -61,4 +61,11 @@ static <T> SchemaDefinitionBuilder<T> builder() {
* @return pojo schema
*/
public Class<T> getPojo();

/**
* Get supportSchemaVersioning schema definition
*
* @return the flag of supportSchemaVersioning
*/
public boolean getSupportSchemaVersioning();
}
Expand Up @@ -49,7 +49,7 @@ public interface SchemaDefinitionBuilder<T> {
* @param key property key
* @param value property value
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> addProperty(String key, String value);

Expand All @@ -58,7 +58,7 @@ public interface SchemaDefinitionBuilder<T> {
*
* @param pojo pojo schema definition
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withPojo(Class pojo);

Expand All @@ -67,10 +67,19 @@ public interface SchemaDefinitionBuilder<T> {
*
* @param jsonDefinition json schema definition
*
* @return record schema definition
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);

/**
* Set schema whether decode by schema version
*
* @param supportSchemaVersioning decode by version
*
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning);

/**
* Build the schema definition.
*
Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.generic;
package org.apache.pulsar.client.api.schema;

import org.apache.pulsar.client.api.Schema;

Expand All @@ -31,6 +31,12 @@ public interface SchemaProvider<T> {
* @param schemaVersion schema version
* @return schema instance of the provided <tt>schemaVersion</tt>
*/
Schema<T> getSchema(byte[] schemaVersion);
Schema<T> getVersionSchema(byte[] schemaVersion);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSchema is probably okay. or we can call it getSchemaByVersion

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a blank line before /**

* Retrieve the current schema.
*
* @return the current schema
*/
Schema<T> getCurrentSchema() throws InterruptedException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe getLatestSchema?


}
Expand Up @@ -63,6 +63,8 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericSchemaProvider;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
Expand Down Expand Up @@ -152,6 +154,16 @@ enum SubscriptionMode {
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
if (schema != null && schema.supportSchemaVersioning()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I am bit confused about the logic here. why do we need to cache this?

Event we want to cache this, we should cache the MultiVersionGenericSchemaProvider. but we shouldn't cache the schema. because user can pass in different type of schema instances.

Map<String, Schema> supportSchemaVersioningSchemaCache = client.getSupportSchemaVersioningSchemaCache();
Schema<T> schemaFromCache = supportSchemaVersioningSchemaCache.get(topic);
if (schemaFromCache == null) {
((AvroSchema<T>) schema).setSchemaProvider(new MultiVersionGenericSchemaProvider(TopicName.get(topic), client));
supportSchemaVersioningSchemaCache.put(topic, schema);
} else {
schema = schemaFromCache;
}
}
if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors);
Expand Down
Expand Up @@ -83,6 +83,7 @@ public class PulsarClientImpl implements PulsarClient {
private final ConnectionPool cnxPool;
private final Timer timer;
private final ExecutorProvider externalExecutorProvider;
private Map<String, Schema> supportSchemaVersioningSchemaCache = new HashMap<>();
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either use a concurrent structure or you should synchronize on accessing supportSchemaVersioningSchemaCache in ConsumerImpl.


enum State {
Open, Closing, Closed
Expand Down Expand Up @@ -736,4 +737,8 @@ private static Mode convertRegexSubscriptionMode(RegexSubscriptionMode regexSubs
return null;
}
}

public Map<String, Schema> getSupportSchemaVersioningSchemaCache(){
return this.supportSchemaVersioningSchemaCache;
}
}
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.client.impl.schema;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
Expand All @@ -30,26 +33,42 @@
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericSchemaProvider;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* An AVRO schema implementation.
*/
@Slf4j
public class AvroSchema<T> extends StructSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);

private ReflectDatumWriter<T> datumWriter;
private ReflectDatumReader<T> reader;
private ReflectDatumReader<T> datumReader;
private BinaryEncoder encoder;
private ByteArrayOutputStream byteArrayOutputStream;

private static final ThreadLocal<BinaryDecoder> decoders =
new ThreadLocal<>();
private boolean supportSchemaVersioning;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think the code change here should be pushed to StructSchema. because reader and writer schemas should be applied to all struct schemas, although only Avro can use them now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, it should push to StructSchema

private final LoadingCache<byte[], ReflectDatumReader<T>> cache = CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], ReflectDatumReader<T>>() {
@Override
public ReflectDatumReader<T> load(byte[] schemaVersion) throws Exception {
return loadReader(schemaVersion);
}
});

// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
Expand Down Expand Up @@ -85,8 +104,9 @@ private AvroSchema(org.apache.avro.Schema schema,
schemaDefinition.getProperties());
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
this.datumWriter = new ReflectDatumWriter<>(this.schema);
this.reader = new ReflectDatumReader<>(this.schema);
this.datumWriter = new ReflectDatumWriter<>(schema);
this.datumReader = new ReflectDatumReader<>(schema);
this.supportSchemaVersioning = schemaDefinition.getSupportSchemaVersioning();
}

@Override
Expand All @@ -110,9 +130,27 @@ public T decode(byte[] bytes) {
if (decoderFromCache == null) {
decoders.set(decoder);
}
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
return datumReader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
} catch (IOException e) {
throw new SchemaSerializationException(e);
}
}

@Override
public T decode(byte[] bytes, byte[] schemaVersion) {
try {
BinaryDecoder decoderFromCache = decoders.get();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
if (decoderFromCache == null) {
decoders.set(decoder);
}
return cache.get(schemaVersion).read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
} catch (IOException e) {
throw new SchemaSerializationException(e);
} catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}",
((MultiVersionGenericSchemaProvider)schemaProvider).getTopic().toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
return null;
}
}

Expand All @@ -135,4 +173,12 @@ public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties
return new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition);
}

public boolean supportSchemaVersioning(){
return supportSchemaVersioning;
}

private ReflectDatumReader loadReader(byte[] schemaVersion) {
return new ReflectDatumReader<T>(((GenericAvroSchema)schemaProvider.getVersionSchema(schemaVersion)).getAvroSchema(),schema);
}

}
Expand Up @@ -55,6 +55,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
*/
private String jsonDef;

/**
* The flag of message decode whether by schema version
*/
private boolean supportSchemaVersioning = false;

@Override
public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
this.alwaysAllowNull = alwaysAllowNull;
Expand All @@ -79,6 +84,11 @@ public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) {
return this;
}

@Override
public SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning) {
this.supportSchemaVersioning = supportSchemaVersioning;
return this;
}

@Override
public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties) {
Expand All @@ -89,7 +99,7 @@ public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties)
@Override
public SchemaDefinition<T> build() {
properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties);
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning);

}
}
Expand Up @@ -21,7 +21,6 @@

import org.apache.pulsar.client.api.schema.SchemaDefinition;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -48,19 +47,21 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{

private String jsonDef;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties) {
private boolean supportSchemaVersioning;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties, boolean supportSchemaVersioning) {
this.alwaysAllowNull = alwaysAllowNull;
this.properties = properties;
this.jsonDef = jsonDef;
this.pojo = pojo;
this.supportSchemaVersioning = supportSchemaVersioning;
}
/**
* get schema whether always allow null or not
*
* @return schema always null or not
*/
public boolean getAlwaysAllowNull() {

return alwaysAllowNull;
}

Expand All @@ -70,7 +71,6 @@ public boolean getAlwaysAllowNull() {
* @return schema class
*/
public String getJsonDef() {

return jsonDef;
}
/**
Expand All @@ -83,16 +83,18 @@ public Class<T> getPojo() {
return pojo;
}

@Override
public boolean getSupportSchemaVersioning() {
return supportSchemaVersioning;
}

/**
* Get schema class
*
* @return schema class
*/
public Map<String, String> getProperties() {

return properties;
}



}
Expand Up @@ -25,6 +25,7 @@
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaProvider;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

Expand All @@ -42,6 +43,7 @@ abstract class StructSchema<T> implements Schema<T> {

protected final org.apache.avro.Schema schema;
protected final SchemaInfo schemaInfo;
protected SchemaProvider schemaProvider;

protected StructSchema(SchemaType schemaType,
org.apache.avro.Schema schema,
Expand All @@ -54,7 +56,7 @@ protected StructSchema(SchemaType schemaType,
this.schemaInfo.setProperties(properties);
}

protected org.apache.avro.Schema getAvroSchema() {
public org.apache.avro.Schema getAvroSchema() {
return schema;
}

Expand All @@ -73,4 +75,8 @@ protected static org.apache.avro.Schema parseAvroSchema(String jsonDef) {
return parser.parse(jsonDef);
}

public void setSchemaProvider(SchemaProvider schemaProvider){
this.schemaProvider = schemaProvider;
}

}
Expand Up @@ -36,7 +36,7 @@
/**
* A generic avro schema.
*/
class GenericAvroSchema extends GenericSchemaImpl {
public class GenericAvroSchema extends GenericSchemaImpl {

private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter;
private BinaryEncoder encoder;
Expand Down
Expand Up @@ -20,6 +20,7 @@

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaProvider;
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.common.schema.SchemaInfo;

Expand All @@ -46,12 +47,12 @@ public boolean supportSchemaVersioning() {

@Override
public GenericRecord decode(byte[] bytes) {
return provider.getSchema(null).decode(bytes);
return provider.getVersionSchema(null).decode(bytes);
}

@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
return provider.getSchema(schemaVersion).decode(bytes, schemaVersion);
return provider.getVersionSchema(schemaVersion).decode(bytes, schemaVersion);
}

@Override
Expand Down