Skip to content

Commit

Permalink
[schema] AutoConsume should use the schema associated with messages a…
Browse files Browse the repository at this point in the history
…s both writer and reader schema (#4325)

* [schema] AutoConsume should use the schema associated with messages for both writer and reader schema

*Motivation*

AutoConsume should use the schema associated with the messages for decoding the schemas.

*Modifications*

- provide a flag enable or disable using the provided schema as the reader schema
- for AUTO_CONSUME schema, disable usnig the provided schema as the reader schema. so it can use the right
  schema version for decoding messages into right generic records
- provide a few util methods for displaying schema data

* Handle 64 bytes schema version

* Addressed review comments
  • Loading branch information
sijie authored and srkukarni committed May 22, 2019
1 parent 527995b commit bf06ef3
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 35 deletions.
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.schema;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.Base64;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -52,4 +55,19 @@ public class SchemaInfo {
* Additional properties of the schema definition (implementation defined)
*/
private Map<String, String> properties = Collections.emptyMap();

public String getSchemaDefinition() {
if (null == schema) {
return "";
}

switch (type) {
case AVRO:
case JSON:
case PROTOBUF:
return new String(schema, UTF_8);
default:
return Base64.getEncoder().encodeToString(schema);
}
}
}
Expand Up @@ -730,13 +730,16 @@ protected void preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl

if (schema instanceof AutoConsumeSchema) {
SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
if (schemaInfo.getType() != SchemaType.AVRO){
if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
throw new RuntimeException("Currently schema detection only works for topics with avro schemas");

}
GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfoProvider.getLatestSchema());

// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
GenericSchema genericSchema = GenericSchemaImpl.of(
schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/);
log.info("Auto detected schema for topic {} : {}",
topicName, new String(schemaInfo.getSchema(), UTF_8));
topicName, schemaInfo.getSchemaDefinition());
((AutoConsumeSchema) schema).setSchema(genericSchema);
}
schema.setSchemaInfoProvider(schemaInfoProvider);
Expand Down
Expand Up @@ -102,11 +102,16 @@ public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties
protected SchemaReader<T> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
return new AvroReader<>(parseAvroSchema(new String(schemaInfo.getSchema())), schema);
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
schemaInfo.getSchemaDefinition());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
this.schemaInfo.getSchemaDefinition());
return reader;
}
}


}
Expand Up @@ -27,7 +27,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.reader.JsonReader;
import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.avro.protobuf.ProtobufData;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -96,7 +95,8 @@ public void accept(Descriptors.FieldDescriptor fieldDescriptor) {

@Override
protected SchemaReader<T> loadReader(byte[] schemaVersion) {
throw new RuntimeException("ProtobufSchema don't support schema versioning"); }
throw new RuntimeException("ProtobufSchema don't support schema versioning");
}

public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
Expand Down
Expand Up @@ -22,15 +22,17 @@
import io.netty.buffer.ByteBufUtil;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;

/**
* Utils for schemas.
*/
final class SchemaUtils {
public final class SchemaUtils {

private SchemaUtils() {}

Expand Down Expand Up @@ -149,4 +151,22 @@ public static Object toAvroObject(Object value) {
}
}

public static String getStringSchemaVersion(byte[] schemaVersionBytes) {
if (null == schemaVersionBytes) {
return "NULL";
} else if (
// the length of schema version is 8 bytes post 2.4.0
schemaVersionBytes.length == Long.BYTES
// the length of schema version is 64 bytes before 2.4.0
|| schemaVersionBytes.length == Long.SIZE) {
ByteBuffer bb = ByteBuffer.wrap(schemaVersionBytes);
return String.valueOf(bb.getLong());
} else if (schemaVersionBytes.length == 0) {
return "EMPTY";
} else {
return Base64.getEncoder().encodeToString(schemaVersionBytes);
}

}

}
Expand Up @@ -131,6 +131,12 @@ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
this.schemaInfoProvider = schemaInfoProvider;
}

/**
* Load the schema reader for reading messages encoded by the given schema version.
*
* @param schemaVersion the provided schema version
* @return the schema reader for decoding messages encoded by the provided schema version.
*/
protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);

protected void setWriter(SchemaWriter<T> writer) {
Expand Down
Expand Up @@ -18,18 +18,27 @@
*/
package org.apache.pulsar.client.impl.schema.generic;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
* A generic avro schema.
*/
@Slf4j
public class GenericAvroSchema extends GenericSchemaImpl {

public GenericAvroSchema(SchemaInfo schemaInfo) {
super(schemaInfo);
this(schemaInfo, true);
}

GenericAvroSchema(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
super(schemaInfo, useProvidedSchemaAsReaderSchema);
setReader(new GenericAvroReader(schema));
setWriter(new GenericAvroWriter(schema));
}
Expand All @@ -48,11 +57,19 @@ public boolean supportSchemaVersioning() {
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
schemaInfo.getSchemaDefinition());
Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
return new GenericAvroReader(
parseAvroSchema(new String(schemaInfo.getSchema())),
schema,
writerSchema,
readerSchema,
schemaVersion);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
this.schemaInfo.getSchemaDefinition());
return reader;
}
}
Expand Down
Expand Up @@ -20,21 +20,28 @@

import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.schema.SchemaInfo;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* A generic json schema.
*/
@Slf4j
class GenericJsonSchema extends GenericSchemaImpl {

public GenericJsonSchema(SchemaInfo schemaInfo) {
super(schemaInfo);
this(schemaInfo, true);
}

GenericJsonSchema(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
super(schemaInfo, useProvidedSchemaAsReaderSchema);
setWriter(new GenericJsonWriter());
setReader(new GenericJsonReader(fields));
}
Expand All @@ -43,12 +50,24 @@ public GenericJsonSchema(SchemaInfo schemaInfo) {
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
schemaInfo.getSchemaDefinition());
Schema readerSchema;
if (useProvidedSchemaAsReaderSchema) {
readerSchema = schema;
} else {
readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
}
return new GenericJsonReader(schemaVersion,
(parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)).getFields()
readerSchema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList())));
.collect(Collectors.toList()));
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
this.schemaInfo.getSchemaDefinition());
return reader;
}
}
Expand Down
Expand Up @@ -33,14 +33,20 @@
public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> implements GenericSchema<GenericRecord> {

protected final List<Field> fields;
// the flag controls whether to use the provided schema as reader schema
// to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
// allows decoding the messages using the schema associated with the messages.
protected final boolean useProvidedSchemaAsReaderSchema;

protected GenericSchemaImpl(SchemaInfo schemaInfo) {
protected GenericSchemaImpl(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
super(schemaInfo);

this.fields = schema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());
this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
}

@Override
Expand All @@ -55,11 +61,16 @@ public List<Field> getFields() {
* @return a generic schema instance
*/
public static GenericSchemaImpl of(SchemaInfo schemaInfo) {
return of(schemaInfo, true);
}

public static GenericSchemaImpl of(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
switch (schemaInfo.getType()) {
case AVRO:
return new GenericAvroSchema(schemaInfo);
return new GenericAvroSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
case JSON:
return new GenericJsonSchema(schemaInfo);
return new GenericJsonSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
default:
throw new UnsupportedOperationException("Generic schema is not supported on schema type "
+ schemaInfo.getType() + "'");
Expand Down

0 comments on commit bf06ef3

Please sign in to comment.