From 32063006c28e4e94c6005e559e03465a1ce41e81 Mon Sep 17 00:00:00 2001 From: Sanket Date: Tue, 19 Jan 2016 15:38:51 -0600 Subject: [PATCH 1/9] Limit number of concurrent outbound connections --- .../org/apache/spark/SecurityManager.scala | 2 +- .../shuffle/BlockStoreShuffleReader.scala | 3 +- .../storage/ShuffleBlockFetcherIterator.scala | 41 ++++++++++++++----- .../ShuffleBlockFetcherIteratorSuite.scala | 9 ++-- docs/configuration.md | 11 +++++ 5 files changed, 51 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 64e483e384772..3d9695662e725 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -81,7 +81,7 @@ import org.apache.spark.util.Utils * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty * for the HttpServer. Jetty supports multiple authentication mechanisms - * Basic, Digest, Form, Spengo, etc. It also supports multiple different login - * services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService + * services - Hash, JAAS, Spengo, JDBC, etc. Spark currently uses the HashLoginService * to authenticate using DIGEST-MD5 via a single user and the shared secret. * Since we are using DIGEST-MD5, the shared secret is not passed on the wire * in plaintext. diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index b0abda4a81b8d..e7174a9c24f63 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -46,7 +46,8 @@ private[spark] class BlockStoreShuffleReader[K, C]( blockManager, mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility - SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024) + SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, + SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) // Wrap the streams for compression based on configuration val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 037bec1d9c33b..006afd1bf7c2f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -46,6 +46,7 @@ import org.apache.spark.util.Utils * For each block we also require the size (in bytes as a long field) in * order to throttle the memory usage. * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point. + * @param maxReqsInFlight max number of remote blocks to fetch at any given point. */ private[spark] final class ShuffleBlockFetcherIterator( @@ -53,7 +54,8 @@ final class ShuffleBlockFetcherIterator( shuffleClient: ShuffleClient, blockManager: BlockManager, blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], - maxBytesInFlight: Long) + maxBytesInFlight: Long, + maxReqsInFlight: Int) extends Iterator[(BlockId, InputStream)] with Logging { import ShuffleBlockFetcherIterator._ @@ -101,6 +103,9 @@ final class ShuffleBlockFetcherIterator( /** Current bytes in flight from our requests */ private[this] var bytesInFlight = 0L + /** Current number of requests in flight */ + private[this] var reqsInFlight = 0 + private[this] val shuffleMetrics = context.taskMetrics().createShuffleReadMetricsForDependency() /** @@ -116,7 +121,7 @@ final class ShuffleBlockFetcherIterator( private[storage] def releaseCurrentResultBuffer(): Unit = { // Release the current buffer if necessary currentResult match { - case SuccessFetchResult(_, _, _, buf) => buf.release() + case SuccessFetchResult(_, _, _, buf, _) => buf.release() case _ => } currentResult = null @@ -133,7 +138,7 @@ final class ShuffleBlockFetcherIterator( while (iter.hasNext) { val result = iter.next() result match { - case SuccessFetchResult(_, _, _, buf) => buf.release() + case SuccessFetchResult(_, _, _, buf, _) => buf.release() case _ => } } @@ -143,9 +148,11 @@ final class ShuffleBlockFetcherIterator( logDebug("Sending request for %d blocks (%s) from %s".format( req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) bytesInFlight += req.size + reqsInFlight += 1 // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap + val remainingBlocks = new ArrayBuffer[String]() ++ sizeMap.keys val blockIds = req.blocks.map(_._1.toString) val address = req.address @@ -158,9 +165,12 @@ final class ShuffleBlockFetcherIterator( // Increment the ref count because we need to pass this to a different thread. // This needs to be released after use. buf.retain() - results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf)) + remainingBlocks -= blockId + results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, + remainingBlocks.isEmpty)) shuffleMetrics.incRemoteBytesRead(buf.size) shuffleMetrics.incRemoteBlocksFetched(1) + logDebug("remainingBlocks" + remainingBlocks) } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } @@ -239,7 +249,7 @@ final class ShuffleBlockFetcherIterator( shuffleMetrics.incLocalBlocksFetched(1) shuffleMetrics.incLocalBytesRead(buf.size) buf.retain() - results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf)) + results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf, false)) } catch { case e: Exception => // If we see an exception, stop immediately. @@ -258,6 +268,8 @@ final class ShuffleBlockFetcherIterator( val remoteRequests = splitLocalRemoteBlocks() // Add the remote requests into our queue in a random order fetchRequests ++= Utils.randomize(remoteRequests) + assert ((0 == reqsInFlight) == (0 == bytesInFlight), + "reqsInFlight = " + reqsInFlight + ", bytesInFlight = " + bytesInFlight) // Send out initial requests for blocks, up to our maxBytesInFlight fetchUpToMaxBytes() @@ -289,7 +301,13 @@ final class ShuffleBlockFetcherIterator( shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) result match { - case SuccessFetchResult(_, _, size, _) => bytesInFlight -= size + case SuccessFetchResult(_, _, size, _, isNetworkReqDone) => { + bytesInFlight -= size + if (isNetworkReqDone) { + reqsInFlight -= 1 + logInfo("Number of requests in flight " + reqsInFlight) + } + } case _ => } // Send fetch requests up to maxBytesInFlight @@ -299,7 +317,7 @@ final class ShuffleBlockFetcherIterator( case FailureFetchResult(blockId, address, e) => throwFetchFailedException(blockId, address, e) - case SuccessFetchResult(blockId, address, _, buf) => + case SuccessFetchResult(blockId, address, _, buf, _) => try { (result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this)) } catch { @@ -312,7 +330,9 @@ final class ShuffleBlockFetcherIterator( private def fetchUpToMaxBytes(): Unit = { // Send fetch requests up to maxBytesInFlight while (fetchRequests.nonEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { + (bytesInFlight == 0 || + (reqsInFlight + 1 <= maxReqsInFlight && + bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) { sendRequest(fetchRequests.dequeue()) } } @@ -390,13 +410,14 @@ object ShuffleBlockFetcherIterator { * @param size estimated size of the block, used to calculate bytesInFlight. * Note that this is NOT the exact bytes. * @param buf [[ManagedBuffer]] for the content. + * @param isNetworkReqDone Is this the last network request for this host in this fetch request. */ private[storage] case class SuccessFetchResult( blockId: BlockId, address: BlockManagerId, size: Long, - buf: ManagedBuffer) - extends FetchResult { + buf: ManagedBuffer, + isNetworkReqDone: Boolean) extends FetchResult { require(buf != null) require(size >= 0) } diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 828153bdbfc44..c0d8aa0d8d566 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -99,7 +99,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT transfer, blockManager, blocksByAddress, - 48 * 1024 * 1024) + 48 * 1024 * 1024, + Int.MaxValue) // 3 local blocks fetched in initialization verify(blockManager, times(3)).getBlockData(any()) @@ -171,7 +172,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT transfer, blockManager, blocksByAddress, - 48 * 1024 * 1024) + 48 * 1024 * 1024, + Int.MaxValue) verify(blocks(ShuffleBlockId(0, 0, 0)), times(0)).release() iterator.next()._2.close() // close() first block's input stream @@ -233,7 +235,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT transfer, blockManager, blocksByAddress, - 48 * 1024 * 1024) + 48 * 1024 * 1024, + Int.MaxValue) // Continue only after the mock calls onBlockFetchFailure sem.acquire() diff --git a/docs/configuration.md b/docs/configuration.md index 08392c39187b9..c02f25cc6dc36 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -391,6 +391,17 @@ Apart from these, the following properties are also available, and may be useful overhead per reduce task, so keep it small unless you have a large amount of memory. + + spark.reducer.maxReqsInFlight + 20 + + spark.reducer.maxMbInFlight puts a bound on the in flight data in terms of size. + But this is not always sufficient when the number of hosts in the cluster increase, + this can lead to very large number of in-bound connections to one + or more nodes, causing workers to fail under the load. This configuration + limits the number of remote blocks to fetches at any given point. + + spark.shuffle.compress true From 9761809f5129fd4a5f593a4904f9b086f46c9f76 Mon Sep 17 00:00:00 2001 From: Sanket Date: Tue, 19 Jan 2016 15:48:41 -0600 Subject: [PATCH 2/9] Changed info level to debug level --- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 6125644114f3e..af02208b1a636 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -305,7 +305,7 @@ final class ShuffleBlockFetcherIterator( bytesInFlight -= size if (isNetworkReqDone) { reqsInFlight -= 1 - logInfo("Number of requests in flight " + reqsInFlight) + logDebug("Number of requests in flight " + reqsInFlight) } } case _ => From 8143b053b185549fcd44508649812ad8b60fec78 Mon Sep 17 00:00:00 2001 From: Sanket Date: Tue, 19 Jan 2016 20:39:12 -0600 Subject: [PATCH 3/9] changed default and typo --- .../main/scala/org/apache/spark/SecurityManager.scala | 4 ++-- docs/configuration.md | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 3d9695662e725..89b743ab921a3 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -80,8 +80,8 @@ import org.apache.spark.util.Utils * * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty * for the HttpServer. Jetty supports multiple authentication mechanisms - - * Basic, Digest, Form, Spengo, etc. It also supports multiple different login - * services - Hash, JAAS, Spengo, JDBC, etc. Spark currently uses the HashLoginService + * Basic, Digest, Form, Spnego, etc. It also supports multiple different login + * services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService * to authenticate using DIGEST-MD5 via a single user and the shared secret. * Since we are using DIGEST-MD5, the shared secret is not passed on the wire * in plaintext. diff --git a/docs/configuration.md b/docs/configuration.md index c02f25cc6dc36..c3ecfe18e8dfa 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -392,14 +392,13 @@ Apart from these, the following properties are also available, and may be useful - spark.reducer.maxReqsInFlight - 20 + Int.MaxValue - spark.reducer.maxMbInFlight puts a bound on the in flight data in terms of size. + "spark.reducer.maxMbInFlight" puts a bound on the in flight data in terms of size. But this is not always sufficient when the number of hosts in the cluster increase, this can lead to very large number of in-bound connections to one - or more nodes, causing workers to fail under the load. This configuration - limits the number of remote blocks to fetches at any given point. + or more nodes, causing workers to fail under the load. "spark.reducer.maxReqsInFlight" + configuration limits the number of remote blocks to fetches at any given point. From 35a8aa63f78fb7659886f787021c93cf4576897c Mon Sep 17 00:00:00 2001 From: Sanket Date: Wed, 20 Jan 2016 09:54:28 -0600 Subject: [PATCH 4/9] changed wording and sentence --- docs/configuration.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index c3ecfe18e8dfa..a98b6bc6844a0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -392,13 +392,13 @@ Apart from these, the following properties are also available, and may be useful + spark.reducer.maxReqsInFlight Int.MaxValue - "spark.reducer.maxMbInFlight" puts a bound on the in flight data in terms of size. - But this is not always sufficient when the number of hosts in the cluster increase, - this can lead to very large number of in-bound connections to one - or more nodes, causing workers to fail under the load. "spark.reducer.maxReqsInFlight" - configuration limits the number of remote blocks to fetches at any given point. + "spark.reducer.maxReqsInFlight" configuration limits the number of remote blocks to fetches at any given point. + Although "spark.reducer.maxSizeInFlight" puts a bound on the in flight data in terms of size, this is not always sufficient + when the number of hosts in the cluster increase, this can lead to very large number of in-bound connections to one + or more nodes, causing workers to fail under the load. Hence, "spark.reducer.maxReqsInFlight" is required. From cbdd253cf57f0f8e95ec3dc06956dc8966198562 Mon Sep 17 00:00:00 2001 From: Sanket Date: Wed, 20 Jan 2016 10:50:09 -0600 Subject: [PATCH 5/9] added around configs --- docs/configuration.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index a98b6bc6844a0..f15c9eaca4ebd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -392,11 +392,11 @@ Apart from these, the following properties are also available, and may be useful - spark.reducer.maxReqsInFlight + spark.reducer.maxReqsInFlight Int.MaxValue - "spark.reducer.maxReqsInFlight" configuration limits the number of remote blocks to fetches at any given point. - Although "spark.reducer.maxSizeInFlight" puts a bound on the in flight data in terms of size, this is not always sufficient + spark.reducer.maxReqsInFlight configuration limits the number of remote blocks to fetches at any given point. + Although spark.reducer.maxSizeInFlight puts a bound on the in flight data in terms of size, this is not always sufficient when the number of hosts in the cluster increase, this can lead to very large number of in-bound connections to one or more nodes, causing workers to fail under the load. Hence, "spark.reducer.maxReqsInFlight" is required. From 38f3e1fcc08614c44fc2fcedd744ade878025232 Mon Sep 17 00:00:00 2001 From: Sanket Date: Wed, 20 Jan 2016 12:32:35 -0600 Subject: [PATCH 6/9] rewording config info --- docs/configuration.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index f15c9eaca4ebd..a58ac3ce0eea3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -395,10 +395,10 @@ Apart from these, the following properties are also available, and may be useful spark.reducer.maxReqsInFlight Int.MaxValue - spark.reducer.maxReqsInFlight configuration limits the number of remote blocks to fetches at any given point. - Although spark.reducer.maxSizeInFlight puts a bound on the in flight data in terms of size, this is not always sufficient - when the number of hosts in the cluster increase, this can lead to very large number of in-bound connections to one - or more nodes, causing workers to fail under the load. Hence, "spark.reducer.maxReqsInFlight" is required. + This configuration limits the number of remote blocks to fetch at any given point. + When the number of hosts in the cluster increase, it might lead to very large number + of in-bound connections to one or more nodes, causing the workers to fail under load. + By allowing it to limit the number of fetch requests, this scenario can be mitigated. From 2aacd4d656fae3e78a234a90fab73be7f51871e8 Mon Sep 17 00:00:00 2001 From: Sanket Date: Fri, 22 Jan 2016 13:04:07 -0600 Subject: [PATCH 7/9] improved assert information --- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index af02208b1a636..9b80512b34de8 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -269,7 +269,8 @@ final class ShuffleBlockFetcherIterator( // Add the remote requests into our queue in a random order fetchRequests ++= Utils.randomize(remoteRequests) assert ((0 == reqsInFlight) == (0 == bytesInFlight), - "reqsInFlight = " + reqsInFlight + ", bytesInFlight = " + bytesInFlight) + "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight + + ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight) // Send out initial requests for blocks, up to our maxBytesInFlight fetchUpToMaxBytes() From f0778af55271266c78451c0447f9a3c50bb49aa6 Mon Sep 17 00:00:00 2001 From: Sanket Date: Thu, 11 Feb 2016 09:17:27 -0600 Subject: [PATCH 8/9] changing array buffer to hashset for faster deletes --- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index e37253d0d1c00..690e933db02da 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -162,7 +162,7 @@ final class ShuffleBlockFetcherIterator( // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap - val remainingBlocks = new ArrayBuffer[String]() ++ sizeMap.keys + val remainingBlocks = new HashSet[String]() ++ sizeMap.keys val blockIds = req.blocks.map(_._1.toString) val address = req.address From c70e76fc0c0d3cdb91ac86d8408d5c02efe30dcc Mon Sep 17 00:00:00 2001 From: Sanket Date: Thu, 11 Feb 2016 15:19:57 -0600 Subject: [PATCH 9/9] fixing minor nits, sentence change and optimizing hashset insertion --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 6 +++--- docs/configuration.md | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 690e933db02da..478a928acd03c 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -47,7 +47,7 @@ import org.apache.spark.util.Utils * For each block we also require the size (in bytes as a long field) in * order to throttle the memory usage. * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point. - * @param maxReqsInFlight max number of remote blocks to fetch at any given point. + * @param maxReqsInFlight max number of remote requests to fetch blocks at any given point. */ private[spark] final class ShuffleBlockFetcherIterator( @@ -162,7 +162,7 @@ final class ShuffleBlockFetcherIterator( // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap - val remainingBlocks = new HashSet[String]() ++ sizeMap.keys + val remainingBlocks = new HashSet[String]() ++= sizeMap.keys val blockIds = req.blocks.map(_._1.toString) val address = req.address @@ -179,7 +179,7 @@ final class ShuffleBlockFetcherIterator( remainingBlocks -= blockId results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, remainingBlocks.isEmpty)) - logDebug("remainingBlocks" + remainingBlocks) + logDebug("remainingBlocks: " + remainingBlocks) } } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) diff --git a/docs/configuration.md b/docs/configuration.md index 2c409f2e95433..080e0ef4b8f06 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -395,7 +395,7 @@ Apart from these, the following properties are also available, and may be useful spark.reducer.maxReqsInFlight Int.MaxValue - This configuration limits the number of remote blocks to fetch at any given point. + This configuration limits the number of remote requests to fetch blocks at any given point. When the number of hosts in the cluster increase, it might lead to very large number of in-bound connections to one or more nodes, causing the workers to fail under load. By allowing it to limit the number of fetch requests, this scenario can be mitigated.