[BEAM-6021] Registered more internal classes for kryo serialization#6998
[BEAM-6021] Registered more internal classes for kryo serialization#6998mareksimunek wants to merge 2 commits intoapache:masterfrom
Conversation
|
Run Spark ValidatesRunner |
...src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
Show resolved
Hide resolved
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
Outdated
Show resolved
Hide resolved
|
There have been issues in the past about using Kryo as the serializer, this a discussion about it just over a year ago: https://lists.apache.org/thread.html/a36d68b568377f8064463b7fc374e8304d59a26412360050333bb2aa@%3Cdev.beam.apache.org%3E It may not be an issue anymore though but wanted to provide context as to why this may have not worked in the past. |
5c6222f to
59e4a18
Compare
|
As I investigate problems which Lukasz mentioned still last so I only added more classes to register so if you set KryoSerializer you have registered some of internals used objects by default. |
|
@iemejia pls code review |
aviemzur
left a comment
There was a problem hiding this comment.
M2C: We made sure that everywhere that we need to serialize data we didn’t let Spark do it with its serializer, which defaults to Java unless configured otherwise by Spark's spark.serializer configuration. Spark users often configure this to be Kryo instead of Java, so we made sure to encode the data using Beam’s encoders, then passed Byte[] so when Spark serializes data before it is transmitted to other machines, it uses its serializer (Kryo/Java) to serialize a Byte[] which is always serializable by the serializer (Whether Java or Kryo).
If other classes, which are not the user's data are serialized by Kryo, and we're sure that they are always serializable by Kryo, that's fine.
I'm not sure this change actually makes the classes registered to be serialized by Kryo. Do we have tests to show this? IIRC this is controlled by the spark.serializer configuration.
59e4a18 to
875d716
Compare
|
@aviemzur Hi, you are influenced by this change only if you use |
|
Run Java PreCommit |
25d0332 to
51fec74
Compare
|
Run Java PreCommit |
iemejia
left a comment
There was a problem hiding this comment.
Let some comments. Please ping me to re review and sorry for the delay.
| conf.setAppName("test"); | ||
| conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); | ||
| // register immutable collections serializers because the SDK uses them. | ||
| conf.set("spark.kryo.registrator", BeamSparkRunnerRegistrator.class.getName()); |
There was a problem hiding this comment.
There seem to be some initialization error because the test seems not to be testing the BeamSparkRunnerRegistrator. Probably a good idea to assert that if calling MicrobatchSource it really uses StatelessJavaSerializer.
There was a problem hiding this comment.
There was no easy way to hook if register of BeamSparkRunnerRegistrator is called so I created wrapper around regitrator, simple pipeline and created desired test if MicrobatchSource is registred
| import scala.reflect.ClassTag$; | ||
|
|
||
| /** Testing of beam registrar. */ | ||
| public class BeamSparkRunnerRegistratorTest { |
There was a problem hiding this comment.
Probably worth to add an extra test that asserts that the Registrator is not called by default.
51fec74 to
ff1c700
Compare
|
Rebased and fixed test and added new one testing default behavior. |
…n using kryo's BeamSparkRunnerRegistrator
|
Thanks a lot @mareksimunek and sorry for the extra long time to review. Already merged manually because I did some final touches, so closing it here. |
Registred more internal classes for kryo serialization
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username) to look at it.Post-Commit Tests Status (on master branch)