Skip to content

Commit

Permalink
DGS-4172 Bound size of Avro datumReader/Writer caches (#2331)
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Jul 12, 2022
1 parent 2cb5b9f commit ed96278
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package io.confluent.kafka.serializers;

import com.google.common.collect.MapMaker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericContainer;
Expand Down Expand Up @@ -47,8 +50,33 @@ public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaSchemaS
private final DecoderFactory decoderFactory = DecoderFactory.get();
protected boolean useSpecificAvroReader = false;
private final Map<String, Schema> readerSchemaCache = new ConcurrentHashMap<>();
private final Map<Schema, Map<Schema, DatumReader<?>>> datumReaderCache =
new MapMaker().weakKeys().makeMap(); // use identity (==) comparison for keys
private final LoadingCache<IdentityPair<Schema, Schema>, DatumReader<?>> datumReaderCache;

public AbstractKafkaAvroDeserializer() {
CacheLoader<IdentityPair<Schema, Schema>, DatumReader<?>> cacheLoader =
new CacheLoader<IdentityPair<Schema, Schema>, DatumReader<?>>() {
@Override
public DatumReader<?> load(IdentityPair<Schema, Schema> key) {
Schema writerSchema = key.getKey();
Schema readerSchema = key.getValue();
Schema finalReaderSchema = getReaderSchema(writerSchema, readerSchema);
boolean writerSchemaIsPrimitive =
AvroSchemaUtils.getPrimitiveSchemas().containsValue(writerSchema);
if (writerSchemaIsPrimitive) {
return new GenericDatumReader<>(writerSchema, finalReaderSchema);
} else if (useSchemaReflection) {
return new ReflectDatumReader<>(writerSchema, finalReaderSchema);
} else if (useSpecificAvroReader) {
return new SpecificDatumReader<>(writerSchema, finalReaderSchema);
} else {
return new GenericDatumReader<>(writerSchema, finalReaderSchema);
}
}
};
datumReaderCache = CacheBuilder.newBuilder()
.maximumSize(DEFAULT_CACHE_CAPACITY)
.build(cacheLoader);
}

/**
* Sets properties for this deserializer without overriding the schema registry client itself.
Expand Down Expand Up @@ -179,26 +207,9 @@ protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
}
}

protected DatumReader<?> getDatumReader(Schema writerSchema, Schema readerSchema) {
// normalize reader schema
final Schema finalReaderSchema = getReaderSchema(writerSchema, readerSchema);

Map<Schema, DatumReader<?>> readers = datumReaderCache.computeIfAbsent(writerSchema,
schema -> new MapMaker().weakKeys().makeMap()); // use identity (==) comparison for keys

return readers.computeIfAbsent(finalReaderSchema, schema -> {
boolean writerSchemaIsPrimitive =
AvroSchemaUtils.getPrimitiveSchemas().values().contains(writerSchema);
if (writerSchemaIsPrimitive) {
return new GenericDatumReader<>(writerSchema, finalReaderSchema);
} else if (useSchemaReflection) {
return new ReflectDatumReader<>(writerSchema, finalReaderSchema);
} else if (useSpecificAvroReader) {
return new SpecificDatumReader<>(writerSchema, finalReaderSchema);
} else {
return new GenericDatumReader<>(writerSchema, finalReaderSchema);
}
});
protected DatumReader<?> getDatumReader(Schema writerSchema, Schema readerSchema)
throws ExecutionException {
return datumReaderCache.get(new IdentityPair<>(writerSchema, readerSchema));
}

/**
Expand All @@ -224,7 +235,7 @@ private Schema getReaderSchema(Schema writerSchema, Schema readerSchema) {
return readerSchema;
}
boolean writerSchemaIsPrimitive =
AvroSchemaUtils.getPrimitiveSchemas().values().contains(writerSchema);
AvroSchemaUtils.getPrimitiveSchemas().containsValue(writerSchema);
if (writerSchemaIsPrimitive) {
readerSchema = writerSchema;
} else if (useSchemaReflection) {
Expand Down Expand Up @@ -362,28 +373,75 @@ Object read(Schema writerSchema) {
}

Object read(Schema writerSchema, Schema readerSchema) {
DatumReader<?> reader = getDatumReader(writerSchema, readerSchema);
int length = buffer.limit() - 1 - idSize;
if (writerSchema.getType().equals(Schema.Type.BYTES)) {
byte[] bytes = new byte[length];
buffer.get(bytes, 0, length);
return bytes;
} else {
int start = buffer.position() + buffer.arrayOffset();
try {
try {
DatumReader<?> reader = getDatumReader(writerSchema, readerSchema);
int length = buffer.limit() - 1 - idSize;
if (writerSchema.getType().equals(Schema.Type.BYTES)) {
byte[] bytes = new byte[length];
buffer.get(bytes, 0, length);
return bytes;
} else {
int start = buffer.position() + buffer.arrayOffset();
Object result = reader.read(null, decoderFactory.binaryDecoder(buffer.array(),
start, length, null));
if (writerSchema.getType().equals(Schema.Type.STRING)) {
return result.toString();
} else {
return result;
}
} catch (IOException | RuntimeException e) {
// avro deserialization may throw AvroRuntimeException, NullPointerException, etc
throw new SerializationException("Error deserializing Avro message for id "
+ schemaId, e);
}
} catch (ExecutionException ex) {
throw new SerializationException("Error deserializing Avro message for id "
+ schemaId, ex.getCause());
} catch (IOException | RuntimeException e) {
// avro deserialization may throw AvroRuntimeException, NullPointerException, etc
throw new SerializationException("Error deserializing Avro message for id "
+ schemaId, e);
}
}
}

static class IdentityPair<K, V> {
private final K key;
private final V value;

public IdentityPair(K key, V value) {
this.key = key;
this.value = value;
}

public K getKey() {
return key;
}

public V getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IdentityPair<?, ?> pair = (IdentityPair<?, ?>) o;
// Only perform identity check
return key == pair.key && value == pair.value;
}

@Override
public int hashCode() {
return System.identityHashCode(key) + System.identityHashCode(value);
}

@Override
public String toString() {
return "IdentityPair{"
+ "key=" + key
+ ", value=" + value
+ '}';
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package io.confluent.kafka.serializers;

import com.google.common.collect.MapMaker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericDatumWriter;
Expand Down Expand Up @@ -46,8 +48,16 @@ public abstract class AbstractKafkaAvroSerializer extends AbstractKafkaSchemaSer
protected boolean removeJavaProperties;
protected boolean useLatestVersion;
protected boolean latestCompatStrict;
private final Map<Schema, DatumWriter<Object>> datumWriterCache =
new MapMaker().weakKeys().makeMap(); // use identity (==) comparison for keys
private final Cache<Schema, DatumWriter<Object>> datumWriterCache;

public AbstractKafkaAvroSerializer() {
// use identity (==) comparison for keys
datumWriterCache = CacheBuilder.newBuilder()
.maximumSize(DEFAULT_CACHE_CAPACITY)
.weakKeys()
.build();

}

protected void configure(KafkaAvroSerializerConfig config) {
configureClientProperties(config, new AvroSchemaProvider());
Expand Down Expand Up @@ -110,7 +120,7 @@ protected byte[] serializeImpl(
} else {
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
writer = datumWriterCache.computeIfAbsent(rawSchema, v -> {
writer = datumWriterCache.get(rawSchema, () -> {
if (value instanceof SpecificRecord) {
return new SpecificDatumWriter<>(rawSchema);
} else if (useSchemaReflection) {
Expand All @@ -125,6 +135,8 @@ protected byte[] serializeImpl(
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (ExecutionException ex) {
throw new SerializationException("Error serializing Avro message", ex.getCause());
} catch (IOException | RuntimeException e) {
// avro serialization can throw AvroRuntimeException, NullPointerException,
// ClassCastException, etc
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
files="(AvroData|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStoreReaderThread|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaJsonSchemaDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|SubjectVersionsResource).java"/>

<suppress checks="JavaNCSS"
files="(AvroData|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema).java"/>
files="(AvroData|AbstractKafkaAvroSerializer|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema).java"/>

<suppress checks="MethodLength"
files="AvroData.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public abstract class AbstractKafkaSchemaSerDe {

protected static final byte MAGIC_BYTE = 0x0;
protected static final int idSize = 4;
private static int DEFAULT_CACHE_CAPACITY = 1000;
protected static final int DEFAULT_CACHE_CAPACITY = 1000;

protected SchemaRegistryClient schemaRegistry;
protected Object keySubjectNameStrategy = new TopicNameStrategy();
Expand Down

0 comments on commit ed96278

Please sign in to comment.