New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21133][CORE] Fix HighlyCompressedMapStatus#writeExternal throws NPE #18343
Conversation
I'm not sure that's a valid fix. This makes this field serialize, when it wasn't intended to. It's either supposed to be recreated on demand, or else, the code needs to deal with it not existing. |
cc @jinxing64 |
Test build #78224 has finished for PR 18343 at commit
|
Thanks for ping.
In both situations above, Could you give some examples how to reproduce this NPE ? |
@jinxing64 spark-sql -e "
set spark.sql.shuffle.partitions=2001;
drop table if exists spark_hcms_npe;
create table spark_hcms_npe as select id, count(*) from big_table group by id;
" @srowen I think this issue should be fixed before release 2.2.0, big query throws this exception every time: |
Test build #78230 has finished for PR 18343 at commit
|
Is this still re-producable in current codebase? In the error message above, there is the call to |
@wangyum Are you using kyro serializer? I think it is why you hit this issue. Once you use kyro, I think the |
I think this should be addressed before 2.2. I already asked notice of other committers on dev mailing list. |
@viirya Yes, I' using
|
Because we write/read LGTM cc @cloud-fan |
@wangyum Thanks for updating. Can you try to disable kyro and try it again? So we can verify it. |
@wangyum Can you also add a test for this? |
@@ -141,7 +141,7 @@ private[spark] class HighlyCompressedMapStatus private ( | |||
private[this] var numNonEmptyBlocks: Int, | |||
private[this] var emptyBlocks: RoaringBitmap, | |||
private[this] var avgSize: Long, | |||
@transient private var hugeBlockSizes: Map[Int, Byte]) | |||
private[this] var hugeBlockSizes: Map[Int, Byte]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do want to serialize hugeBlockSizes
, but with customized logic, that why we marked it @transient
.
I think the corrected fix is, make this class implements KryoSerializable
, and copy the customized serialization logic of hugeBlockSizes
to kryo serialization hooks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. However, the customized serialization logic looks similar to kyro's map serializer. So I'm not sure if it's worth duplicating the customized logic for kyro.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you can figure out a way to make it serializable with kryo and still keep the customized serialization logic for java serializer, I'm ok with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh seems it is now, then LGTM
Test build #78239 has finished for PR 18343 at commit
|
@@ -175,6 +175,7 @@ class KryoSerializer(conf: SparkConf) | |||
kryo.register(None.getClass) | |||
kryo.register(Nil.getClass) | |||
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon")) | |||
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why Map$EmptyMap$
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test build #78248 has finished for PR 18343 at commit
|
@transient private var hugeBlockSizes: Map[Int, Byte]) | ||
extends MapStatus with Externalizable { | ||
@transient private[this] var hugeBlockSizes: Map[Int, Byte]) | ||
extends MapStatus with Externalizable with KryoSerializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the previous version already worked... When we have writeExternal
and readExternal
, @transient
doesn't matter for java serializer, so removing @transient
to make it work with kryo is a valid fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have manual tests remove @transient
and Extends KryoSerializable
, both worked fine, I'm doing UT.
OK I get it. Hm, I wonder why some classes in the code extend I imagine Kryo understands what to do with That seems like the right fix? Because this seems to just cause default serialization to take over, and that's not desirable, apparently. If so then we might have a similar problem with |
It seems For this patch, the kryo serializer seems have the same logic to serialize a map: #18343 (comment) , so it's ok to just remove the |
I don't quibble with custom serialization logic, but you can do that with |
Test build #78251 has finished for PR 18343 at commit
|
Test build #78254 has finished for PR 18343 at commit
|
It's obvious it will reduce data size with custom serialization, since the custom logic doesn't need to write the full classname out which the java default one does. I don't think Kryo knows what to do with serializable either, unless we specifically register it -- so we'd need to implement both either way whenever we implement custom serialization logic. |
Hmmm... from http://docs.oracle.com/javase/8/docs/api/java/io/Externalizable.html: So the custom code doesn't need to write the class name because the JRE libs do that for you, so that's not really a win. So it's mostly a cleaner version of the private But re: kryo, yeah, nothing changes. |
I was talking about the classname for the internal members. |
True. I guess since the same serializer instance is not reused, you don't get the benefits of the optimizations that don't require sending the class name after it first shows up.. But back to the change, it feels like the correct thing would be to implement |
I ran a local test and a LGTM. |
Agreed. The LGTM |
…s NPE ## What changes were proposed in this pull request? Fix HighlyCompressedMapStatus#writeExternal NPE: ``` 17/06/18 15:00:27 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/06/18 15:00:27 ERROR MapOutputTrackerMaster: java.lang.NullPointerException java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) ... 17 more 17/06/18 15:00:27 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.17.47.20:50188 17/06/18 15:00:27 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ## How was this patch tested? manual tests Author: Yuming Wang <wgyumg@gmail.com> Closes #18343 from wangyum/SPARK-21133. (cherry picked from commit 9b57cd8) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
thanks, merging to master/2.2! |
…s NPE ## What changes were proposed in this pull request? Fix HighlyCompressedMapStatus#writeExternal NPE: ``` 17/06/18 15:00:27 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/06/18 15:00:27 ERROR MapOutputTrackerMaster: java.lang.NullPointerException java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) ... 17 more 17/06/18 15:00:27 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.17.47.20:50188 17/06/18 15:00:27 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ## How was this patch tested? manual tests Author: Yuming Wang <wgyumg@gmail.com> Closes apache#18343 from wangyum/SPARK-21133.
What changes were proposed in this pull request?
Fix HighlyCompressedMapStatus#writeExternal NPE:
How was this patch tested?
manual tests