Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid ClassCastException for the same class when used with Flink #509

Closed
wants to merge 1 commit into from

Conversation

Projects
None yet
10 participants
@andrikod
Copy link

commented Mar 16, 2017

Fix to avoid the ClassCastException for the same class when used with Flink.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions

It is documented that when Avro is used with Flink, a ClassCastException is thrown, due to a caching issue. The issue occurs, then the confluent kafka client is configured with "specific.avro.reader" = true, and the de-serialization is performed with the SpecificDatumReader, instead of the generic reader.
Based on Flink documentation, it is recommended to pass a new SpecificData to SpecificDatumReader each time.

This fix extends the deserializer with an extra configuration option "force.new.specific.avro.instance" to force SpecificDatumReader to use a new instance of SpecificData when used together with specific.avro.reader.

TODO: investigate performance penalty of generating a new instance instead of using Singleton

Add files via upload
Fix to avoid the ClassCastException for same class when used with Flink.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions
It is documented that when Avro is used with Flink, a ClassCastException is thrown, due to a caching issue. The issue occurs, then the confluent kafka client is configured with "specific.avro.reader" = true, and the deserialization is performed with the SpecificDatumReader, instead of the generic reader.
Based on Flink documentation, it is recommended to pass a new SpecificData to SpecificDatumReader each time.

This fix extends the deserializer with an extra configuration option "force.new.specific.avro.instance" to force SpecificDatumReader to use a new instance of SpecificData when used together with specific.avro.reader.

TODO: investigate  performance penalty of generating a new instance instead of using Singleton
@ConfluentJenkins

This comment has been minimized.

Copy link
Contributor

commented Mar 16, 2017

Can one of the admins verify this patch?

@ConfluentCLABot

This comment has been minimized.

Copy link

commented Mar 16, 2017

It looks like @andrikod hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@andrikod

This comment has been minimized.

Copy link
Author

commented Apr 12, 2017

[clabot:check]

@ConfluentCLABot

This comment has been minimized.

Copy link

commented Apr 12, 2017

@confluentinc It looks like @andrikod just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@andrikod andrikod closed this Apr 22, 2017

@andrikod andrikod reopened this Apr 22, 2017

@ConfluentJenkins

This comment has been minimized.

Copy link
Contributor

commented Apr 22, 2017

Can one of the admins verify this patch?

@ewencp

This comment has been minimized.

Copy link
Member

commented Apr 24, 2017

ok to test

@ewencp
Copy link
Member

left a comment

@andrikod Barring one comment on the documentation, the change itself looks reasonable. My main question is if this is really the right place to fix this problem. This sounds a lot like an issue with the way Flink handles classloading -- it's unclear to me that this is the right place to fix the problem (case in point, the issue isn't just this deserializer, not even just Avro, but it extends to other libraries as well).

Is anything being done in Flink to address this? Is addressing it in each library that encounters the issue the right way to address it?


public static final String AVRO_FORCE_NEW_SPECIFIC_DATA_CONFIG = "force.new.specific.avro.instance";
public static final boolean AVRO_FORCE_NEW_SPECIFIC_DATA_DEFAULT = false;
public static final String AVRO_FORCE_NEW_SPECIFIC_DATA_DOC = "If true, it passes a new instace of SpecificData to SpecificDatumReader ";

This comment has been minimized.

Copy link
@ewencp

ewencp Apr 24, 2017

Member

Good to see that this is at Importance.LOW, but the docs could probably be clearer. This is a very specific scenario where you'd need to use this setting. We should probably call out exactly when you'd want to use it.

@sv3ndk

This comment has been minimized.

Copy link

commented Aug 17, 2017

Hi all,

We are hitting exactly the same issue while integrating Flink with Confluent's avro schema registry with this Avro deserializer.

As I understand it, Avro's SpecificData() is a JVM-wide cache of Class instances and other reflection-related objects. On the other hand, when Flink is deployed in standalone mode, it is essentially a set of JVMs running on their own which load and unload the class definitions of the jobs to execute at the moment the fat jars are uploaded.

What happens is that when a new version of a Flink jobs get's re-submitted, Flink reloads the class definitions accordingly but has no way to invalidate Avro's SpecificData JVM-wide cache, which leads to the "X cannot be cast to Z" mentioned by @andrikod.

I am not aware of other frameworks that do dynamic class reloading and are also impacted by this, although it sounds plausible to me that they are all impacted in a similar fashion.

May I suggest we try to align the lifespan of the SpecificData cache instance with the lifespan of the application?

I see the solution proposed by @andrikod is to create one instance of SpecificData for each SpecificDatumReader, though that might be a bit extreme since it defeats a bit the idea of a cache.

I think if we create one instance of SpecificData per instance of AbstractKafkaAvroDeserializer.java, we'll obtain the same caching benefit since applications would typically have only one (or few) instances of the AvroDeserializer, and the cache would automatically be destroyed as soon as those instances would be garbage collected.

@ewencp , does that sounds reasonable to you?

PS: I published on my blog more details on how we integrate FLink with Confluent's Schema Registry, for which we just discover we meet the issue above

@ConfluentJenkins

This comment has been minimized.

Copy link
Contributor

commented Aug 17, 2017

Can one of the admins verify this patch?

@eackerm

This comment has been minimized.

Copy link

commented Aug 21, 2017

Hi all,

We are having classloading issues when using KafkaAvroDeserializer inside a Flink program as well. For the moment we are maintaining a forked kafka-avro-serializer dependency to be able to fix/work around this issue.

I tried sv3nd's approach by adding a field for a single instance of SpecificData inside AbstractKafkaAvroDeserializer. This cached instance of SpecificData is then reused for the creation of each new SpecificDatumReader inside method AbstractKafkaAvroDeserializer.getDatumReader.

To make a long story short ; this approach works!

So we can still benefit from a cached SpecificData instance, but without the classloading issues we encountered in Flink (and potentially any other dynamically loaded program).

Let me know if you like us to do a pull request containing this solution.

belo82 added a commit to triviadata/schema-registry that referenced this pull request Oct 13, 2017

Avoid ClassCastException for the same class when used with Flink
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions

It is documented that when Avro is used with Flink, a `ClassCastException`
is thrown, due to a caching issue. The issue occurs, then the confluent
kafka client is configured with `specific.avro.reader= true`, and the
de-serialization is performed with the `SpecificDatumReader`, instead of
the generic reader.
Based on Flink documentation, it is recommended to pass a new
`SpecificData` to `SpecificDatumReader` each time.

There is JIRA issue in Flink discussing this bug:
https://issues.apache.org/jira/browse/FLINK-5633

This hotfix is backported PR fixing this problem and
waiting to be merged here:
confluentinc#509
@rmetzger

This comment has been minimized.

Copy link

commented Nov 8, 2017

I'm sorry for all users who ran into this issue. I've been told that the upcoming Flink 1.4 release will fix this issue.

@arthurandres

This comment has been minimized.

Copy link

commented May 10, 2018

@rmetzger as far as I can see this PR has not been merged. Has the issue been fixed in Flink 1.4, and if yes how?
Thanks.

@tzulitai

This comment has been minimized.

Copy link

commented May 14, 2018

Hi @arthurandres,

I think starting from Flink 1.4, this issue is now properly fixed.
For more details, you can take a look at https://issues.apache.org/jira/browse/FLINK-7997 and https://issues.apache.org/jira/browse/FLINK-7420.

Shortly put, starting from Flink 1.4, dependency management for Avro usage was modified so that Avro classes are always part of the user code and that the Avro schema cache is no longer JVM-wide.

@mageshn

This comment has been minimized.

Copy link
Member

commented Nov 9, 2018

@andrikod @arthurandres and @tzulitai SInce this is fixed in Flink, I assume it's safe to close this PR. If you think otherwise, please feel free to reopen the PR.

@mageshn mageshn closed this Nov 9, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.