Skip to content

Commit

Permalink
[SPARK-21133][CORE] Fix HighlyCompressedMapStatus#writeExternal throw…
Browse files Browse the repository at this point in the history
…s NPE

## What changes were proposed in this pull request?

Fix HighlyCompressedMapStatus#writeExternal NPE:
```
17/06/18 15:00:27 ERROR Utils: Exception encountered
java.lang.NullPointerException
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
        at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
        at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
17/06/18 15:00:27 ERROR MapOutputTrackerMaster: java.lang.NullPointerException
java.io.IOException: java.lang.NullPointerException
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
        at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
        at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
        ... 17 more
17/06/18 15:00:27 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.17.47.20:50188
17/06/18 15:00:27 ERROR Utils: Exception encountered
java.lang.NullPointerException
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
        at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619)
        at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562)
        at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

## How was this patch tested?

manual tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #18343 from wangyum/SPARK-21133.

(cherry picked from commit 9b57cd8)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
wangyum authored and cloud-fan committed Jun 20, 2017
1 parent cf10fa8 commit 8bf7f1e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
Expand Up @@ -141,7 +141,7 @@ private[spark] class HighlyCompressedMapStatus private (
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
@transient private var hugeBlockSizes: Map[Int, Byte])
private var hugeBlockSizes: Map[Int, Byte])
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
Expand Down
Expand Up @@ -175,6 +175,7 @@ class KryoSerializer(conf: SparkConf)
kryo.register(None.getClass)
kryo.register(Nil.getClass)
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
kryo.register(classOf[ArrayBuffer[Any]])

kryo.setClassLoader(classLoader)
Expand Down
Expand Up @@ -24,9 +24,9 @@ import scala.util.Random
import org.mockito.Mockito._
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.internal.config
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.storage.BlockManagerId

class MapStatusSuite extends SparkFunSuite {
Expand Down Expand Up @@ -154,4 +154,18 @@ class MapStatusSuite extends SparkFunSuite {
case part => assert(status2.getSizeForBlock(part) >= sizes(part))
}
}

test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") {
val conf = new SparkConf()
.set("spark.serializer", classOf[KryoSerializer].getName)
.setMaster("local")
.setAppName("SPARK-21133")
val sc = new SparkContext(conf)
try {
val count = sc.parallelize(0 until 3000, 10).repartition(2001).collect().length
assert(count === 3000)
} finally {
sc.stop()
}
}
}

0 comments on commit 8bf7f1e

Please sign in to comment.