Skip to content

Commit 79a2721

Browse files
Allow specifying readerSchema on KafkaAvroDeserializer (#2366)
* Allow specifying readerSchema on KafkaAvroDeserializer Adds two new configs: - `specific.avro.key.type` - `specific.avro.value.type` These configs are only applicable when `specific.avro.reader` is true. If unset, it defaults to current behavior of using the writerSchema. #670 * Minor change to logger name Co-authored-by: Robert Yokota <rayokota@gmail.com>
1 parent fc4a26c commit 79a2721

File tree

6 files changed

+180
-14
lines changed

6 files changed

+180
-14
lines changed

avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@
4545
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
4646
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
4747
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
48+
import org.apache.kafka.common.config.ConfigException;
4849

4950
public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaSchemaSerDe {
5051
private final DecoderFactory decoderFactory = DecoderFactory.get();
5152
protected boolean isKey;
5253
protected boolean useSpecificAvroReader = false;
54+
protected Schema specificAvroReaderSchema = null;
5355
protected boolean avroReflectionAllowNull = false;
5456
protected boolean avroUseLogicalTypeConverters = false;
5557
private final Map<String, Schema> readerSchemaCache = new ConcurrentHashMap<>();
@@ -91,14 +93,34 @@ public DatumReader<?> load(IdentityPair<Schema, Schema> key) {
9193
.build(cacheLoader);
9294
}
9395

96+
protected void configure(KafkaAvroDeserializerConfig config) {
97+
configure(config, null);
98+
}
99+
94100
/**
95101
* Sets properties for this deserializer without overriding the schema registry client itself.
96102
* Useful for testing, where a mock client is injected.
97103
*/
98-
protected void configure(KafkaAvroDeserializerConfig config) {
104+
protected void configure(KafkaAvroDeserializerConfig config, Class<?> type) {
99105
configureClientProperties(config, new AvroSchemaProvider());
100106
useSpecificAvroReader = config
101107
.getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
108+
109+
if (useSpecificAvroReader && type != null) {
110+
try {
111+
specificAvroReaderSchema = ((SpecificRecord)type.getDeclaredConstructor().newInstance())
112+
.getSchema();
113+
} catch (Exception e) {
114+
throw new ConfigException(
115+
String.format(
116+
"Error getting specificAvroReaderSchema from '%s'",
117+
type.getName()
118+
),
119+
e
120+
);
121+
}
122+
}
123+
102124
avroReflectionAllowNull = config
103125
.getBoolean(KafkaAvroDeserializerConfig.AVRO_REFLECTION_ALLOW_NULL_CONFIG);
104126
avroUseLogicalTypeConverters = config
@@ -123,7 +145,7 @@ protected KafkaAvroDeserializerConfig deserializerConfig(Properties props) {
123145
* @return the deserialized object
124146
*/
125147
protected Object deserialize(byte[] payload) throws SerializationException {
126-
return deserialize(null, isKey, payload, null);
148+
return deserialize(null, isKey, payload, specificAvroReaderSchema);
127149
}
128150

129151
/**
@@ -201,7 +223,7 @@ protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
201223
// explicit from the Connector).
202224
DeserializationContext context = new DeserializationContext(topic, isKey, payload);
203225
AvroSchema schema = context.schemaForDeserialize();
204-
Object result = context.read(schema.rawSchema(), null);
226+
Object result = context.read(schema.rawSchema(), specificAvroReaderSchema);
205227

206228
try {
207229
Integer version = schemaVersion(topic, isKey, context.getSchemaId(),
@@ -396,7 +418,7 @@ int getSchemaId() {
396418
}
397419

398420
Object read(Schema writerSchema) {
399-
return read(writerSchema, null);
421+
return read(writerSchema, specificAvroReaderSchema);
400422
}
401423

402424
Object read(Schema writerSchema, Schema readerSchema) {

avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ public KafkaAvroDecoder(SchemaRegistryClient schemaRegistry) {
2929
}
3030

3131
public KafkaAvroDecoder(SchemaRegistryClient schemaRegistry, VerifiableProperties props) {
32+
3233
this.schemaRegistry = schemaRegistry;
33-
configure(deserializerConfig(props.props()));
34+
configure(deserializerConfig(props.props()), null);
3435
}
3536

3637
/**
3738
* Constructor used by Kafka consumer.
3839
*/
3940
public KafkaAvroDecoder(VerifiableProperties props) {
40-
configure(new KafkaAvroDeserializerConfig(props.props()));
41+
configure(new KafkaAvroDeserializerConfig(props.props()), null);
4142
}
4243

4344
@Override

avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,19 @@
2222
import java.util.Map;
2323

2424
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
25+
import static io.confluent.kafka.serializers.KafkaAvroDeserializerConfig.SPECIFIC_AVRO_KEY_TYPE_CONFIG;
26+
import static io.confluent.kafka.serializers.KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG;
27+
import static io.confluent.kafka.serializers.KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG;
28+
import org.apache.avro.specific.SpecificRecord;
29+
import org.apache.kafka.common.config.ConfigException;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
2532

2633
public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
2734
implements Deserializer<Object> {
2835

36+
private static final Logger log = LoggerFactory.getLogger(KafkaAvroDeserializer.class);
37+
2938
/**
3039
* Constructor used by Kafka consumer.
3140
*/
@@ -38,19 +47,56 @@ public KafkaAvroDeserializer(SchemaRegistryClient client) {
3847
}
3948

4049
public KafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props) {
41-
schemaRegistry = client;
42-
configure(deserializerConfig(props));
50+
this(client, props, false);
4351
}
4452

4553
@Override
46-
public void configure(Map<String, ?> configs, boolean isKey) {
54+
public void configure(Map<String, ?> props, boolean isKey) {
55+
this.isKey = isKey;
56+
configure(deserializerConfig(props), null);
57+
}
58+
59+
public KafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props, boolean isKey) {
60+
schemaRegistry = client;
4761
this.isKey = isKey;
48-
configure(new KafkaAvroDeserializerConfig(configs));
62+
63+
final String specificAvroClassLookupKey = isKey
64+
? SPECIFIC_AVRO_KEY_TYPE_CONFIG :
65+
SPECIFIC_AVRO_VALUE_TYPE_CONFIG;
66+
67+
final KafkaAvroDeserializerConfig config = deserializerConfig(props);
68+
69+
final Class<?> type = config.getClass(specificAvroClassLookupKey);
70+
71+
if (type != null && !config.getBoolean(SPECIFIC_AVRO_READER_CONFIG)) {
72+
if (log.isWarnEnabled()) {
73+
log.warn(
74+
String.format(
75+
"'%s' value of '%s' is ignored because '%s' is false",
76+
specificAvroClassLookupKey,
77+
type.getName(),
78+
SPECIFIC_AVRO_READER_CONFIG
79+
)
80+
);
81+
}
82+
}
83+
84+
if (type != null && !SpecificRecord.class.isAssignableFrom(type)) {
85+
throw new ConfigException(
86+
String.format("Value '%s' specified for '%s' is not a '%s'",
87+
type.getName(),
88+
specificAvroClassLookupKey,
89+
SpecificRecord.class.getName()
90+
)
91+
);
92+
}
93+
94+
configure(deserializerConfig(props), type);
4995
}
5096

5197
@Override
5298
public Object deserialize(String topic, byte[] bytes) {
53-
return deserialize(topic, isKey, bytes, null);
99+
return deserialize(topic, isKey, bytes, specificAvroReaderSchema);
54100
}
55101

56102
/**

avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,18 @@ public class KafkaAvroDeserializerConfig extends AbstractKafkaSchemaSerDeConfig
3030
public static final String SPECIFIC_AVRO_READER_DOC =
3131
"If true, tries to look up the SpecificRecord class ";
3232

33+
public static final String SPECIFIC_AVRO_KEY_TYPE_CONFIG = "specific.avro.key.type";
34+
public static final String SPECIFIC_AVRO_KEY_TYPE_DOC =
35+
"A class generated by Avro that the message key should be deserialized to";
36+
37+
public static final String SPECIFIC_AVRO_VALUE_TYPE_CONFIG = "specific.avro.value.type";
38+
public static final String SPECIFIC_AVRO_VALUE_TYPE_DOC =
39+
"A class generated by Avro that the message value should be deserialized to";
40+
41+
public static final String AVRO_READER_SCHEMA_CONFIG = "avro.reader.schema";
42+
public static final String AVRO_READER_SCHEMA_DOC =
43+
"Reader schema to use when deserializing. Defaults to using the writer schema.";
44+
3345
public static final String AVRO_REFLECTION_ALLOW_NULL_CONFIG = "avro.reflection.allow.null";
3446
public static final boolean AVRO_REFLECTION_ALLOW_NULL_DEFAULT = false;
3547
public static final String AVRO_REFLECTION_ALLOW_NULL_DOC =
@@ -47,6 +59,14 @@ public class KafkaAvroDeserializerConfig extends AbstractKafkaSchemaSerDeConfig
4759
config = baseConfigDef()
4860
.define(SPECIFIC_AVRO_READER_CONFIG, Type.BOOLEAN, SPECIFIC_AVRO_READER_DEFAULT,
4961
Importance.LOW, SPECIFIC_AVRO_READER_DOC)
62+
.define(SPECIFIC_AVRO_KEY_TYPE_CONFIG,
63+
Type.CLASS,
64+
null,
65+
Importance.MEDIUM, SPECIFIC_AVRO_KEY_TYPE_DOC)
66+
.define(SPECIFIC_AVRO_VALUE_TYPE_CONFIG,
67+
Type.CLASS,
68+
null,
69+
Importance.MEDIUM, SPECIFIC_AVRO_VALUE_TYPE_DOC)
5070
.define(AVRO_REFLECTION_ALLOW_NULL_CONFIG, Type.BOOLEAN, AVRO_REFLECTION_ALLOW_NULL_DEFAULT,
5171
Importance.MEDIUM, AVRO_REFLECTION_ALLOW_NULL_DOC)
5272
.define(AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, ConfigDef.Type.BOOLEAN,

avro-serializer/src/test/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializerTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import static org.junit.Assert.assertThat;
77
import static org.junit.Assert.fail;
88

9-
import io.confluent.kafka.schemaregistry.ParsedSchema;
109
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
1110
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
1211
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
@@ -21,6 +20,7 @@
2120
import java.util.Map;
2221

2322
import com.google.common.collect.ImmutableMap;
23+
import java.util.HashMap;
2424
import org.apache.avro.Schema;
2525
import org.apache.avro.generic.GenericData;
2626
import org.apache.avro.generic.GenericRecord;
@@ -77,7 +77,7 @@ public void assertSchemaNotCopiedWhenDeserializedWithVersion(
7777
String subject = subjectNameStrategy.subjectName(topic, false,
7878
new AvroSchema(avroRecord.getSchema()));
7979
avroSerializer.configure(configs, false);
80-
deserializer.configure(new KafkaAvroDeserializerConfig(configs));
80+
deserializer.configure(new KafkaAvroDeserializerConfig(configs), null);
8181
schemaRegistry.register(subject, new AvroSchema(avroRecord.getSchema()));
8282
byte[] bytes = avroSerializer.serialize(topic, avroRecord);
8383
IndexedRecord deserialized
@@ -219,4 +219,56 @@ public void testMixedUrlsAreRejected() {
219219

220220
Assert.assertNull(kafkaAvroSerializer.schemaRegistry);
221221
}
222+
223+
@Test
224+
public void testInvalidSpecificSchemaKeyTypeConfig() {
225+
HashMap<String, String> props = new HashMap<String, String>();
226+
// Intentionally invalid schema registry URL to satisfy the config class's requirement that
227+
// it be set.
228+
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
229+
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
230+
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG,
231+
Object.class.getName()
232+
);
233+
234+
try {
235+
new KafkaAvroDeserializer(
236+
new MockSchemaRegistryClient(),
237+
props
238+
);
239+
fail();
240+
}
241+
catch (ConfigException e) {
242+
Assert.assertEquals(
243+
"Value 'java.lang.Object' specified for " +
244+
"'specific.avro.value.type' is not a " +
245+
"'org.apache.avro.specific.SpecificRecord'",
246+
e.getMessage()
247+
);
248+
}
249+
250+
251+
props.remove(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG);
252+
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_KEY_TYPE_CONFIG,
253+
Object.class.getName()
254+
);
255+
256+
try {
257+
new KafkaAvroDeserializer(
258+
new MockSchemaRegistryClient(),
259+
props,
260+
true
261+
);
262+
fail();
263+
}
264+
catch (ConfigException e) {
265+
Assert.assertEquals(
266+
"Value 'java.lang.Object' specified for " +
267+
"'specific.avro.key.type' is not a " +
268+
"'org.apache.avro.specific.SpecificRecord'",
269+
e.getMessage()
270+
);
271+
}
272+
273+
}
222274
}

avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import static org.junit.Assert.assertArrayEquals;
5959
import static org.junit.Assert.assertEquals;
6060
import static org.junit.Assert.assertNotNull;
61-
import static org.junit.Assert.assertNull;
6261
import static org.junit.Assert.assertTrue;
6362
import static org.junit.Assert.fail;
6463

@@ -895,6 +894,32 @@ public void testKafkaAvroSerializerSpecificRecordWithProjection() {
895894
assertEquals("testUser", ((User) obj).getName().toString());
896895
}
897896

897+
@Test
898+
public void testKafkaAvroSerializerSpecificRecordWithValueTypeConfig() {
899+
HashMap<String, String> specificDeserializerProps = new HashMap<String, String>();
900+
// Intentionally invalid schema registry URL to satisfy the config class's requirement that
901+
// it be set.
902+
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
903+
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
904+
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_VALUE_TYPE_CONFIG,
905+
User.class.getName()
906+
);
907+
908+
final KafkaAvroDeserializer specificAvroDeserializerWithReaderSchema = new KafkaAvroDeserializer(
909+
schemaRegistry, specificDeserializerProps
910+
);
911+
912+
IndexedRecord avroRecord = createExtendedSpecificAvroRecord();
913+
final byte[] bytes = avroSerializer.serialize(topic, avroRecord);
914+
915+
final Object obj = specificAvroDeserializerWithReaderSchema.deserialize(topic, bytes);
916+
assertTrue(
917+
"Full object should be a io.confluent.kafka.example.User",
918+
User.class.isInstance(obj)
919+
);
920+
assertEquals("testUser", ((User) obj).getName().toString());
921+
}
922+
898923
@Test
899924
public void testKafkaAvroSerializerReflectionRecord() {
900925
byte[] bytes;

0 commit comments

Comments
 (0)