Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-11193 - Use Java ConcurrentHashMap instead of SynchronizedMap trait in order to avoid ClassCastException due to KryoSerializer in KinesisReceiver #10203

Closed
wants to merge 3 commits into from

Conversation

jbonofre
Copy link
Member

@jbonofre jbonofre commented Dec 8, 2015

No description provided.

@EugenCepoi
Copy link
Contributor

Why don't you just replace the use of SynchronizedMap in KinesisReceiver with a ConcurrentHashMap instead?

@jbonofre
Copy link
Member Author

jbonofre commented Dec 8, 2015

There are two things:

  • as you said, we could use a Java ConcurrentHashMap in KinesisReceiver (changing couple of code)
  • but it still means that users can't use Kryo serializer with SynchronizedMap trait (generally speaking). That's why I proposed a more "generic" solution.

Thought ?

@EugenCepoi
Copy link
Contributor

That problem actually applies to all types for which Kryo provides a default ser/de. Mostly because kryo will try to deserialize to the type known during registration and this syntax new Foo with Bar generates a class. I am wondering if this can't also occur for plain pojo/poso for which kryo will use it generic deser system. In short I think it is better to just use the java impl and eventually report that kind of problem to kryo chil project. Anyway SynchronizedMap is deperecated in scala 2.11.

@jbonofre
Copy link
Member Author

jbonofre commented Dec 8, 2015

Good point. Let me upgrade KinesisReceiver to use Java ConcurrentHashMap implementation in this PR. We will see what the others think about this.

@srowen
Copy link
Member

srowen commented Dec 9, 2015

Yes, the simpler solution is better here. Use ConcurrentHashMap.

@jbonofre
Copy link
Member Author

jbonofre commented Dec 9, 2015

All right. I'm updating the PR.

Maybe it could make sense to inform people who use Kryo that the SynchronizedMap trait is "lost".

WDYT ?

@jbonofre jbonofre force-pushed the SPARK-11193 branch 2 times, most recently from 313517f to caa4363 Compare December 9, 2015 17:35
@jbonofre
Copy link
Member Author

jbonofre commented Dec 9, 2015

PR rebased and updated to use Java ConcurrentHashMap.

I removed the change on the KryoSeralizer to deal with SynchronizedMap trait.

@@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T](

/** Get the latest sequence number for the given shard that can be checkpointed through KCL */
private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
shardIdToLatestStoredSeqNum.get(shardId)
return Option[String]{ shardIdToLatestStoredSeqNum.get(shardId) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified to just Option(shardIdToLatestStoredSeqNum.get(shardId))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point Sean. Let me improve this !

Thanks !

@srowen
Copy link
Member

srowen commented Dec 9, 2015

Aside from the two instances of that comment, looks OK

@@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T](

/** Get the latest sequence number for the given shard that can be checkpointed through KCL */
private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
shardIdToLatestStoredSeqNum.get(shardId)
return Option(shardIdToLatestStoredSeqNum.get(shardId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop the return keyword

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I forgot this cleanup. Thanks !

@EugenCepoi
Copy link
Contributor

Looks good

@@ -124,8 +125,7 @@ private[kinesis] class KinesisReceiver[T](
private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]

/** Sequence number ranges of data added to each generated block */
private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges]
with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to fix this -- but I think the style convention is to use () when the invocation has a side effect and I'd argue that constructors always do. I should have said it earlier but don't know that it's worth changing as the original call didn't either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks for this reminder Sean. You are right, I used the same syntax as in the original code. Let me know if you want I change this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably always a good idea to use ConcurrentHashMap instead of the mixed-in trait. The typesafe people themselves deprecated the trait and said it's unreliable and recommended that users use java's map instead.

@SparkQA
Copy link

SparkQA commented Dec 9, 2015

Test build #2190 has finished for PR 10203 at commit d80cf35.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class JavaIndexToStringExample\n

@srowen
Copy link
Member

srowen commented Dec 10, 2015

@jbonofre can you update the title to reflect the change? possibly description too

@SparkQA
Copy link

SparkQA commented Dec 10, 2015

Test build #2194 has started for PR 10203 at commit d80cf35.

@jbonofre
Copy link
Member Author

Sure. You mean the PR title or also the commit comment ?

@srowen
Copy link
Member

srowen commented Dec 10, 2015

PR title/description. The squashed commit gets a new message and the squashed commit descriptions look OK anyway.

@jbonofre
Copy link
Member Author

All right, let me do it.

@jbonofre jbonofre changed the title SPARK-11193 - Hack to support SynchronizedMap trait in Kryo serializer SPARK-11193 - Use Java ConcurrentHashMap instead of SynchronizedMap trait in order to avoid ClassCastException due to KryoSerializer in KinesisReceiver Dec 10, 2015
@jbonofre jbonofre force-pushed the SPARK-11193 branch 2 times, most recently from 9cad42d to 67aa4e6 Compare December 11, 2015 16:44
@andrewor14
Copy link
Contributor

retest this please. The changes here LGTM

@jbonofre
Copy link
Member Author

Thanks @andrewor14 (again ;)). Let me retest it.

@jbonofre
Copy link
Member Author

Tests OK on my box. The Jenkins test failure doesn't look related.

@SparkQA
Copy link

SparkQA commented Dec 11, 2015

Test build #2208 has finished for PR 10203 at commit 5cec007.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Dec 12, 2015

Merged to master/1.6

asfgit pushed a commit that referenced this pull request Dec 12, 2015
…rait in order to avoid ClassCastException due to KryoSerializer in KinesisReceiver

Author: Jean-Baptiste Onofré <jbonofre@apache.org>

Closes #10203 from jbonofre/SPARK-11193.

(cherry picked from commit 03138b6)
Signed-off-by: Sean Owen <sowen@cloudera.com>
@asfgit asfgit closed this in 03138b6 Dec 12, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants