Skip to content

[BEAM-1144] Spark runner fails to deserialize MicrobatchSource in cluster mode#1613

Closed
aviemzur wants to merge 2 commits intoapache:masterfrom
aviemzur:cnf-deserialize-issue
Closed

[BEAM-1144] Spark runner fails to deserialize MicrobatchSource in cluster mode#1613
aviemzur wants to merge 2 commits intoapache:masterfrom
aviemzur:cnf-deserialize-issue

Conversation

@aviemzur
Copy link
Member

@aviemzur aviemzur commented Dec 14, 2016

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@aviemzur
Copy link
Member Author

R: @amitsela @jbonofre

Copy link
Member

@amitsela amitsela left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments. Thanks!

<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.21.1</version>
<version>2.21</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

public void registerClasses(Kryo kryo) {
for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) {
kryo.register(clazz, new JavaSerializer());
kryo.register(clazz, new StatelessJavaSerializer());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

try {
return Class.forName(desc.getName(), false, classLoader);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not find class: " + desc.getName(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about primeClasses ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

primClasses is a private member of ObjectInputStream. In any case, primitives aren't serialized with StatelessJavaSerializer only the classes that were registered to use it (Source and Coder implementations)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/**
* ObjectInputStream with specific ClassLoader.
*/
class ObjectInputStreamWithClassLoader extends ObjectInputStream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by StatelessJavaSerializer, why not make it an inner class ?
StatelessJavaSerializer also provides explanations as to way this is needed at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* Stateless Java Serializer.
*
* <p>
* Solves state re-use issue in Kryo version 2.21 used in Spark 1.x
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* </p>
*
* <p>
* Also, solves class loading issue in cluster caused by ${@link ObjectInputStream}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

for (Class<?> clazz : classesForJavaSerialization) {
Assert.assertThat("Registered serializer for class " + clazz.getName()
+ " was not an instance of " + JavaSerializer.class.getName(),
+ " was not an instance of " + StatelessJavaSerializer.class.getName(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indents are 4 spaces usually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fixup

@asfbot
Copy link

asfbot commented Dec 14, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5902/
--none--

@asfbot
Copy link

asfbot commented Dec 15, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/5946/
--none--

@aviemzur
Copy link
Member Author

@amitsela made the changes discussed in comments.

@amitsela
Copy link
Member

LGTM from me, @jbonofre PTAL - specifically on StatelessJavaSerializer and it's ObjectInputStreamWithClassLoader.

@jbonofre
Copy link
Member

CC: @jbonofre

Thanks guys ! Let me take a look on ClassLoader.

@aviemzur
Copy link
Member Author

This change exists also on Kryo master: EsotericSoftware/kryo@19a6b5e

@amitsela
Copy link
Member

Thanks @aviemzur this is reassuring.

@jbonofre
Copy link
Member

It sounds good to me, but I'm checking one more thing.

@amitsela
Copy link
Member

amitsela commented Jan 1, 2017

Ping @jbonofre can we merge this ?

@asfbot
Copy link

asfbot commented Jan 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6353/
--none--

Copy link
Member

@jbonofre jbonofre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I gonna merge.

@amitsela
Copy link
Member

amitsela commented Jan 1, 2017

Thanks!

@asfgit asfgit closed this in e136f12 Jan 2, 2017
@aviemzur aviemzur deleted the cnf-deserialize-issue branch January 2, 2017 13:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants