From 9bf1f9fd5aa08ef19653a4cfa52a8eccc64dc18b Mon Sep 17 00:00:00 2001 From: Raajay Viswanathan Date: Wed, 19 Jul 2017 09:49:25 -0700 Subject: [PATCH] Make num parallel fetches from a reducer configurable --- .../spark/storage/ShuffleBlockFetcherIterator.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 81d822dc8a98f..aef73b4f8c0a7 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -25,7 +25,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashSet, Queue} -import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager} @@ -244,10 +244,13 @@ final class ShuffleBlockFetcherIterator( } private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { - // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them - // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 + val numParallelFetchRequests = SparkEnv.get.conf + .getInt("spark.reducer.numParallelFetchRequests", 5) + // Make remote requests at most maxBytesInFlight / numParallelFetchRequests in length; + // the reason to keep them smaller than maxBytesInFlight is to allow multiple, + // parallel fetches from up to numParallelFetchRequests // nodes, rather than blocking on reading output from one node. - val targetRequestSize = math.max(maxBytesInFlight / 5, 1L) + val targetRequestSize = math.max(maxBytesInFlight / numParallelFetchRequests, 1L) logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize) // Split local and remote blocks. Remote blocks are further split into FetchRequests of size