From 243eeb53357120db3637b1d2ce9d5b1dc665b6fe Mon Sep 17 00:00:00 2001 From: andrikod Date: Thu, 16 Mar 2017 12:28:37 +0100 Subject: [PATCH] Add files via upload Fix to avoid the ClassCastException for same class when used with Flink. https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions It is documented that when Avro is used with Flink, a ClassCastException is thrown, due to a caching issue. The issue occurs, then the confluent kafka client is configured with "specific.avro.reader" = true, and the deserialization is performed with the SpecificDatumReader, instead of the generic reader. Based on Flink documentation, it is recommended to pass a new SpecificData to SpecificDatumReader each time. This fix extends the deserializer with an extra configuration option "force.new.specific.avro.instance" to force SpecificDatumReader to use a new instance of SpecificData when used together with specific.avro.reader. TODO: investigate performance penalty of generating a new instance instead of using Singleton --- .../kafka/serializers/AbstractKafkaAvroDeserializer.java | 9 ++++++++- .../kafka/serializers/KafkaAvroDeserializerConfig.java | 8 +++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java b/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java index 61231e3ad8b..5beee1e1372 100644 --- a/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java +++ b/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java @@ -43,6 +43,7 @@ public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSer private final DecoderFactory decoderFactory = DecoderFactory.get(); protected boolean useSpecificAvroReader = false; + protected boolean forceNewSpecificDataInstance = false; private final Map readerSchemaCache = new ConcurrentHashMap(); @@ -54,6 +55,8 @@ protected void configure(KafkaAvroDeserializerConfig config) { configureClientProperties(config); useSpecificAvroReader = config .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG); + forceNewSpecificDataInstance = config + .getBoolean(KafkaAvroDeserializerConfig.AVRO_FORCE_NEW_SPECIFIC_DATA_CONFIG); } protected KafkaAvroDeserializerConfig deserializerConfig(Map props) { @@ -201,7 +204,11 @@ private DatumReader getDatumReader(Schema writerSchema, Schema readerSchema) { if (readerSchema == null) { readerSchema = getReaderSchema(writerSchema); } - return new SpecificDatumReader(writerSchema, readerSchema); + if (useSpecificAvroReader && forceNewSpecificDataInstance) { + return new SpecificDatumReader(writerSchema, readerSchema, new SpecificData()); + } else { + return new SpecificDatumReader(writerSchema, readerSchema); + } } else { if (readerSchema == null) { return new GenericDatumReader(writerSchema); diff --git a/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java index e98a6b2b191..3cb892227cd 100644 --- a/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java +++ b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java @@ -28,13 +28,19 @@ public class KafkaAvroDeserializerConfig extends AbstractKafkaAvroSerDeConfig { public static final boolean SPECIFIC_AVRO_READER_DEFAULT = false; public static final String SPECIFIC_AVRO_READER_DOC = "If true, tries to look up the SpecificRecord class "; + + public static final String AVRO_FORCE_NEW_SPECIFIC_DATA_CONFIG = "force.new.specific.avro.instance"; + public static final boolean AVRO_FORCE_NEW_SPECIFIC_DATA_DEFAULT = false; + public static final String AVRO_FORCE_NEW_SPECIFIC_DATA_DOC = "If true, it passes a new instace of SpecificData to SpecificDatumReader "; private static ConfigDef config; static { config = baseConfigDef() .define(SPECIFIC_AVRO_READER_CONFIG, Type.BOOLEAN, SPECIFIC_AVRO_READER_DEFAULT, - Importance.LOW, SPECIFIC_AVRO_READER_DOC); + Importance.LOW, SPECIFIC_AVRO_READER_DOC) + .define(AVRO_FORCE_NEW_SPECIFIC_DATA_CONFIG, Type.BOOLEAN, AVRO_FORCE_NEW_SPECIFIC_DATA_DEFAULT, + Importance.LOW, AVRO_FORCE_NEW_SPECIFIC_DATA_DOC); } public KafkaAvroDeserializerConfig(Map props) {