diff --git a/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java b/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java index 61231e3ad8b..5beee1e1372 100644 --- a/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java +++ b/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java @@ -43,6 +43,7 @@ public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSer private final DecoderFactory decoderFactory = DecoderFactory.get(); protected boolean useSpecificAvroReader = false; + protected boolean forceNewSpecificDataInstance = false; private final Map readerSchemaCache = new ConcurrentHashMap(); @@ -54,6 +55,8 @@ protected void configure(KafkaAvroDeserializerConfig config) { configureClientProperties(config); useSpecificAvroReader = config .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG); + forceNewSpecificDataInstance = config + .getBoolean(KafkaAvroDeserializerConfig.AVRO_FORCE_NEW_SPECIFIC_DATA_CONFIG); } protected KafkaAvroDeserializerConfig deserializerConfig(Map props) { @@ -201,7 +204,11 @@ private DatumReader getDatumReader(Schema writerSchema, Schema readerSchema) { if (readerSchema == null) { readerSchema = getReaderSchema(writerSchema); } - return new SpecificDatumReader(writerSchema, readerSchema); + if (useSpecificAvroReader && forceNewSpecificDataInstance) { + return new SpecificDatumReader(writerSchema, readerSchema, new SpecificData()); + } else { + return new SpecificDatumReader(writerSchema, readerSchema); + } } else { if (readerSchema == null) { return new GenericDatumReader(writerSchema); diff --git a/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java index e98a6b2b191..3cb892227cd 100644 --- a/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java +++ b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java @@ -28,13 +28,19 @@ public class KafkaAvroDeserializerConfig extends AbstractKafkaAvroSerDeConfig { public static final boolean SPECIFIC_AVRO_READER_DEFAULT = false; public static final String SPECIFIC_AVRO_READER_DOC = "If true, tries to look up the SpecificRecord class "; + + public static final String AVRO_FORCE_NEW_SPECIFIC_DATA_CONFIG = "force.new.specific.avro.instance"; + public static final boolean AVRO_FORCE_NEW_SPECIFIC_DATA_DEFAULT = false; + public static final String AVRO_FORCE_NEW_SPECIFIC_DATA_DOC = "If true, it passes a new instace of SpecificData to SpecificDatumReader "; private static ConfigDef config; static { config = baseConfigDef() .define(SPECIFIC_AVRO_READER_CONFIG, Type.BOOLEAN, SPECIFIC_AVRO_READER_DEFAULT, - Importance.LOW, SPECIFIC_AVRO_READER_DOC); + Importance.LOW, SPECIFIC_AVRO_READER_DOC) + .define(AVRO_FORCE_NEW_SPECIFIC_DATA_CONFIG, Type.BOOLEAN, AVRO_FORCE_NEW_SPECIFIC_DATA_DEFAULT, + Importance.LOW, AVRO_FORCE_NEW_SPECIFIC_DATA_DOC); } public KafkaAvroDeserializerConfig(Map props) {