From 32ad0d57becd216f1ad9e835ee6e4db527b3ab20 Mon Sep 17 00:00:00 2001 From: xukun Date: Tue, 22 Mar 2016 20:44:34 +0800 Subject: [PATCH 1/5] Increase probability of using cached serialized status --- .../scala/org/apache/spark/MapOutputTracker.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 3a5caa3510eb0..9e65619cad6ca 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -443,13 +443,12 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]()) epochGotten = epoch } - } - // If we got here, we failed to find the serialized locations in the cache, so we pulled - // out a snapshot of the locations as "statuses"; let's serialize and return that - val bytes = MapOutputTracker.serializeMapStatuses(statuses) - logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length)) - // Add them into the table only if the epoch hasn't changed while we were working - epochLock.synchronized { + + // If we got here, we failed to find the serialized locations in the cache, so we pulled + // out a snapshot of the locations as "statuses"; let's serialize and return that + val bytes = MapOutputTracker.serializeMapStatuses(statuses) + logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length)) + // Add them into the table only if the epoch hasn't changed while we were working if (epoch == epochGotten) { cachedSerializedStatuses(shuffleId) = bytes } From 2e66866e7e1bf3538471817ff3912842db68f43d Mon Sep 17 00:00:00 2001 From: xukun Date: Wed, 23 Mar 2016 23:27:13 +0800 Subject: [PATCH 2/5] fix bug --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 9e65619cad6ca..149661fd29d47 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -452,8 +452,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) if (epoch == epochGotten) { cachedSerializedStatuses(shuffleId) = bytes } + bytes } - bytes } override def stop() { From a856995b914d0cd8a9e3cf69b8cb116344cbc07b Mon Sep 17 00:00:00 2001 From: xukun Date: Mon, 11 Apr 2016 22:05:11 +0800 Subject: [PATCH 3/5] move serializeMapStatuses into cachedSerializedStatuses.get(shuffleId) match { case None = > } --- .../scala/org/apache/spark/MapOutputTracker.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 149661fd29d47..1b9d079a2a23e 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -431,6 +431,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = { var statuses: Array[MapStatus] = null var epochGotten: Long = -1 + var byteArr: Array[Byte] = null epochLock.synchronized { if (epoch > cacheEpoch) { cachedSerializedStatuses.clear() @@ -442,17 +443,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) case None => statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]()) epochGotten = epoch + // If we got here, we failed to find the serialized locations in the cache, so we pulled + // out a snapshot of the locations as "statuses"; let's serialize and return that + byteArr = MapOutputTracker.serializeMapStatuses(statuses) + logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, byteArr.length)) } - // If we got here, we failed to find the serialized locations in the cache, so we pulled - // out a snapshot of the locations as "statuses"; let's serialize and return that - val bytes = MapOutputTracker.serializeMapStatuses(statuses) - logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length)) // Add them into the table only if the epoch hasn't changed while we were working if (epoch == epochGotten) { - cachedSerializedStatuses(shuffleId) = bytes + cachedSerializedStatuses(shuffleId) = byteArr } - bytes + byteArr } } From 3b2e8e9853c1ba9be5010aff368e39fe67727bbe Mon Sep 17 00:00:00 2001 From: xukun Date: Mon, 11 Apr 2016 22:08:26 +0800 Subject: [PATCH 4/5] Update MapOutputTracker.scala fix bug --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 1b9d079a2a23e..da0f7d069f0a9 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -453,8 +453,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) if (epoch == epochGotten) { cachedSerializedStatuses(shuffleId) = byteArr } - byteArr } + byteArr } override def stop() { From 8f8133c52dfdbfdd1176f9e2764d3b9e3f921e70 Mon Sep 17 00:00:00 2001 From: x00228947 Date: Sat, 23 Apr 2016 14:05:37 +0800 Subject: [PATCH 5/5] fix style --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index da0f7d069f0a9..d4e31640e78c6 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -446,9 +446,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) // If we got here, we failed to find the serialized locations in the cache, so we pulled // out a snapshot of the locations as "statuses"; let's serialize and return that byteArr = MapOutputTracker.serializeMapStatuses(statuses) - logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, byteArr.length)) + logInfo("Size of output statuses for shuffle %d is %d bytes" + .format(shuffleId, byteArr.length)) } - + // Add them into the table only if the epoch hasn't changed while we were working if (epoch == epochGotten) { cachedSerializedStatuses(shuffleId) = byteArr