From 92c2e07171f60b977c62661ea6475486a1599b19 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sun, 9 Sep 2018 18:44:23 +0800 Subject: [PATCH 1/2] don't need synchronized the IndexShuffleBlockResolver for each writeIndexFileAndCommit --- .../shuffle/IndexShuffleBlockResolver.scala | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index d3f1c7ec1bbee..b2b3deae58d35 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -20,6 +20,7 @@ package org.apache.spark.shuffle import java.io._ import java.nio.channels.Channels import java.nio.file.Files +import java.util.concurrent.ConcurrentHashMap import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging @@ -51,6 +52,8 @@ private[spark] class IndexShuffleBlockResolver( private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") + private val shuffleIdToLocks = new ConcurrentHashMap[Int, Array[Object]]() + def getDataFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) } @@ -76,6 +79,13 @@ private[spark] class IndexShuffleBlockResolver( logWarning(s"Error deleting index ${file.getPath()}") } } + + // This should be called when we unregister shuffle from ShuffleManager, so it's safe to set + // null for given shuffleId. + val locks = shuffleIdToLocks.get(shuffleId) + if (locks != null) { + shuffleIdToLocks.put(shuffleId, null) + } } /** @@ -138,13 +148,22 @@ private[spark] class IndexShuffleBlockResolver( mapId: Int, lengths: Array[Long], dataTmp: File): Unit = { + shuffleIdToLocks.putIfAbsent(shuffleId, new Array[Object](lengths.length)) + val mapLocks = shuffleIdToLocks.get(shuffleId) + val lock = mapLocks.synchronized { + if (mapLocks(mapId) == null) { + mapLocks(mapId) = new Object() + } + mapLocks(mapId) + } + val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { val dataFile = getDataFile(shuffleId, mapId) - // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure - // the following check and rename are atomic. - synchronized { + // We need make sure the following check and rename are atomic, and we only need to + // synchronized the lock which tied to current shuffleId + mapId + lock.synchronized { val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, @@ -223,7 +242,9 @@ private[spark] class IndexShuffleBlockResolver( } } - override def stop(): Unit = {} + override def stop(): Unit = { + shuffleIdToLocks.clear() + } } private[spark] object IndexShuffleBlockResolver { From e651ff6577add62a6bc6bf647caaaab23911567d Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Mon, 10 Sep 2018 20:16:39 +0800 Subject: [PATCH 2/2] address comments --- .../shuffle/IndexShuffleBlockResolver.scala | 20 ++++++++++++------- .../shuffle/sort/SortShuffleManager.scala | 2 ++ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index b2b3deae58d35..fe6e53b60b907 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -54,6 +54,15 @@ private[spark] class IndexShuffleBlockResolver( private val shuffleIdToLocks = new ConcurrentHashMap[Int, Array[Object]]() + /** + * This should be call before calling the method of writeIndexFileAndCommit. + */ + def registerShuffle(shuffleId: Int, numMaps: Int): Unit = { + shuffleIdToLocks.computeIfAbsent(shuffleId, (_: Int) => { + new Array[Object](numMaps) + }) + } + def getDataFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) } @@ -80,12 +89,9 @@ private[spark] class IndexShuffleBlockResolver( } } - // This should be called when we unregister shuffle from ShuffleManager, so it's safe to set - // null for given shuffleId. - val locks = shuffleIdToLocks.get(shuffleId) - if (locks != null) { - shuffleIdToLocks.put(shuffleId, null) - } + // This should be called when we unregister shuffle from ShuffleManager, so it's safe to remove + // directly + shuffleIdToLocks.remove(shuffleId) } /** @@ -148,8 +154,8 @@ private[spark] class IndexShuffleBlockResolver( mapId: Int, lengths: Array[Long], dataTmp: File): Unit = { - shuffleIdToLocks.putIfAbsent(shuffleId, new Array[Object](lengths.length)) val mapLocks = shuffleIdToLocks.get(shuffleId) + require(mapLocks != null, "Shuffle should be registered to IndexShuffleBlockResolver first") val lock = mapLocks.synchronized { if (mapLocks(mapId) == null) { mapLocks(mapId) = new Object() diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 0caf84c6050a8..dc78aaf968e6a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -124,6 +124,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { + shuffleBlockResolver.registerShuffle(handle.shuffleId, + handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) val env = SparkEnv.get