Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-3741 - Remove use of IdentityHashMap #1009

Merged
merged 6 commits into from Feb 8, 2019

Conversation

rayokota
Copy link
Member

There are 2 reasons why we currently use IdentityHashMap:

  1. Performance
  2. Avro schemas are used as keys, and we mutate them by adding a version property. An IdentityHashMap allows for mutable keys.

Regarding #1 (performance), the real gain is by having the client use the same object reference with the CachedSchemaRegistryClient. If a client does not use the same object reference, then a new Map entry is created, which can eventually lead to "Too many schema objects". Our clients use the same object reference, but customers often do not.

If we replace the IdentityHashMap with HashMap, clients that use the same object reference will still be performant. This is because the Avro schema class caches its hashcode, and its equals method will first check for reference equality.

If we replace the IdentityHashMap with HashMap, clients that do NOT use the same object reference will not be as performant, however, they will correctly get the same lookup from the Map and will not continue adding new entries to the Map (thus more likely averting "Too many schema objects").

Regarding #2 (mutable keys), the fact that we store the version on the schema is somewhat of a hack. The version can vary by subject, so there is not really a unique version per schema. A cleaner solution would be to not store the version on the schema, but to pass it along with the schema so that the schema does not have to be mutated.

@cyrusv
Copy link
Contributor

cyrusv commented Jan 31, 2019

How is this related to #827?

@rayokota
Copy link
Member Author

@cyrusv , it's indirectly related to #827 in the sense that with this fix, we will be less likely to overpopulate the local schema cache, thus having an LRU policy on the cache is less critical. However, we can still add an eviction policy to the cache in the future, but the cache does not need to be identity (reference) based. So in that sense this fix is orthogonal to an eviction policy on the cache.

@rayokota
Copy link
Member Author

@YuvalItzchakov, here is a fix for #894

Copy link
Member

@mageshn mageshn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rayokota Thanks for fixing this long-standing problem with SR clients. Wondering if it might be useful to have some sort of jmh benchmark to assert that there is no performance degradation while using the same schema instance. Also, I believe the new change with version is potentially not backward compatible. I really like the new approach but would that need to be coupled with this PR? Can the migration to HashMap go without those changes?

if (schema.getType().equals(Schema.Type.RECORD)) {
return result;
return new GenericContainerWithVersion((GenericContainer) result, version);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rayokota I believe this is potentially an incompatible change. If there are connectors that are looking for the version property, they could potentially break with this change?

@rayokota
Copy link
Member Author

rayokota commented Feb 1, 2019

Thanks @mageshn !

Yes, I can do some microbenchmarking to verify performance is acceptable.

Regarding not setting the version property, I believe it is required to move to HashMap. The reason is that when an Avro schema is first registered, only the ID is returned from the Schema Registry. The CachedSchemaRegistryClient caches the Avro schema without any version information. Later, AvroConverter calls the following two methods:

GenericContainer deserialized = deserializer.deserialize(topic, isKey, value);
avroData.toConnectData(deserialized.getSchema(), deserialized);

The GenericContainer has a reference to the Avro schema returned from CachedSchemaRegistryClient. The toConnectData() method above wants the version. So deserialize() will look up the version and set it on the Avro schema in GenericContainer. Thus the Avro schema is mutated. If the Avro schema is a key in an IdentityHashMap it is fine, but if it is a key in a HashMap, things break.

I looked through all our connectors and did not find a reference to the schema.registry.schema.version property on the Avro schema anywhere in our connectors. It would be unusual for a connector to reference this property as it is very specific to Avro (which some connectors like HDFS and S3 do support), and also the logic is that the connect.version is supposed to take precedence over schema.registry.schema.version (I also did not find any connectors that refer to connect.version.) So I believe it's safe to not set the schema.registry.schema.version property any longer. It seems it was only used to pass information from the KafkaAvroDeserializer to AvroData in the GenericContainer.

I can do a further scan of third-party connectors and preview connectors to ensure that no connectors are referring to this property if it would make you feel more comfortable.

@rayokota
Copy link
Member Author

rayokota commented Feb 5, 2019

@mageshn , I added a SR client performance test and used it to test the difference between IdentityHashMap and HashMap. There was no performance degradation.

IdentityHashMap:
10000 records processed, 60.466438 records/sec (0.00 MB/sec), 16.47 ms avg latency, 442.00 ms max latency, 16 ms 50th, 28 ms 95th, 32 ms 99th, 47 ms 99.9th.

HashMap:
10000 records processed, 60.923602 records/sec (0.00 MB/sec), 16.35 ms avg latency, 426.00 ms max latency, 15 ms 50th, 28 ms 95th, 32 ms 99th, 48 ms 99.9th.

The HashMap was a little faster (on average), perhaps because of difference in collision resolution (linear probing vs. chaining) and the nature of the data used during the test.

@rhauch
Copy link
Member

rhauch commented Feb 6, 2019

@rayokota, I'm also concerned about this change being exposed to clients. Have you considered just changing how the client caches the entries, perhaps using a wrapper class for keys to achieve the precise comparison semantics we want?

@rayokota
Copy link
Member Author

rayokota commented Feb 6, 2019

@rhauch @mageshn I did some more investigation into whether it's safe to not set the schema.registry.schema.version property, and it appears it is never exposed publicly. Here is the flow:

AbstractKafkaAvroDeserializer.deserialize() is the method which sets this property, only if passed includeSchemaAndVersion as true. This is a protected method. The only client that passes true is AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion, another protected method. The only client of AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion is a private class of AvroConverter called only by AvroConverter.toConnectData, which gets the Avro schema and converts it to a Connect schema. It does not return the Avro schema but just uses it in the method. It sets a version on the Connect schema by first checking for the existence of the property connect.version, and if that does not exist, then schema.registry.schema.version. Here again is the flow from the top down:

AvroConverter.toConnectData(), which calls
AvroConverter.Deserializer.deserialize() (private), which calls
AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion() (protected), which calls
AbstractKafkaAvroDeserializer.deserialize() (protected) with includeSchemaAndVersion as true

To summarize:

  1. When using the KafkaAvroDeserializer directly by clients, the property schema.registry.schema.version is never set.
  2. The property is only set by the AvroConverter.
  3. The property is only used when converting an Avro schema to a Connect schema, to get the version.
  4. The Avro schema is not returned by the converter, which instead returns a Connect schema.

Also, I did try a wrapper approach at one point (by trying to manipulate both hashCode and equals) and could not get something to work that was both clean and efficient.

Copy link
Member

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments/questions.

}

public SchemaAndValue toConnectData(org.apache.avro.Schema avroSchema, Object value,
Integer version) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need JavaDoc for this new public method, with a clear definition of the behavior if/when version is null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll add.

return false;
}
AvroSchemaAndVersion that = (AvroSchemaAndVersion) o;
return Objects.equals(schema, that.schema) && Objects.equals(version, that.version);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

version can be null, so this only is a match if there is no version or the versions match. Is that sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the desired behavior.

} else if (version != null) {
versionInt = version.intValue();
}
if (versionInt >= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to not assume that -1 will never be used in a schema version? This seems brittle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, version must be a positive integer. The assumption is documented elsewhere. I'll add a comment.

import java.util.Objects;

/**
* Wrapper for GenericContainer along with a version number.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to be much more explicit about whether a version is required, when it is expected, and what the behavior is when the version is null. If it is not expected to be null, then we should consider using int rather than Integer, or add asserts or checks accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll add a comment that it can be null.

@@ -0,0 +1,60 @@
/*
* Copyright 2018 Confluent Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: year

@rayokota
Copy link
Member Author

rayokota commented Feb 6, 2019

Thanks for the reviews @rhauch , @avocader ! I've incorporated your review feedback.

Copy link
Member

@mageshn mageshn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @rayokota for driving this change. This is a super important change and will be appreciated by many users who hit the cache limit issue.

@pranavoyo
Copy link

newbie here, can anyone tell how do i take these changes into my existing confluent-5.1.1 setup? I am hitting this issue consistently. I am running kafka, connect, zk, schema registry

@rr-sucheth-shivakumar
Copy link

Hello group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.3.0' and still running into too many schema objects created issue. any leads on how to resolve this ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants