Skip to content

Conversation

Victsm
Copy link
Contributor

@Victsm Victsm commented May 22, 2018

What changes were proposed in this pull request?

Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.

How was this patch tested?

Added unit test for this patch.
In addition, we built an internal tool to stress test Spark shuffle service and have observed significant improvement after applying this patch.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@Victsm Victsm changed the title SPARK-24335 Spark external shuffle server improvement to better handle block fetch requests. SPARK-24355 Spark external shuffle server improvement to better handle block fetch requests. May 22, 2018
@cloud-fan
Copy link
Contributor

to confirm, StreamRequest will not block the server netty handler thread, right?

@cloud-fan
Copy link
Contributor

what's the strategy here? if the number of block fetch requests has reached the limitation, shall we fail following block fetch requests immediately?

@Victsm
Copy link
Contributor Author

Victsm commented May 23, 2018

StreamRequest will not block the server netty handler thread. Only ChunkFetchRequest does, as this request is the one that actually triggers seeking the disk to return the shuffle block to the remote shuffle client, and could block server nettty handler threads due to the concurrent random reads if the shuffle files are written to limited number of hard disk drives.

The strategy in this patch is to use a separate EventLoopGroup associated with a dedicated Netty channel handler to handle ChunkFetchRequest. The config introduced in this PR, spark.shuffle.server.chunkFetchHandlerThreads, specifies the number of threads this dedicated EventLoopGroup has. Assume shuffle server is configured with spark.shuffle.io.serverThreads = 45 and spark.shuffle.server.chunkFetchHandlerThreads = 40. This will create 2 separate Netty thread pools in shuffle server, one with 45 threads (main thread pool) and one with 40 threads (chunk fetch thread pool). All request except ChunkFetchRequest will be processed by one of the threads in the main thread pool. When a ChunkFetchRequest comes in, it will be processed by one of the threads in the chunk fetch thread pool.

Processing the ChunkFetchRequest itself does not block a Netty thread, it's writing the response in the underlying Channel that blocks a Netty thread, since shuffle files are being read here. Although ChunkFetchRequest is handled in the separate chunk fetch thread pool, writing response on the underlying Channel will always be processed by the thread (EventLoop) that the Channel is registered with. Since the Channel is always registered with a EventLoop (thread) from the main thread pool, so the blocking I/O operation blocks a thread in the main thread pool while the ChunkFetchRequest is processed by a thread in the chunk fetch thread pool. These 2 threads are like a pair of producer and consumer, while the thread in the chunk fetch thread pool produces a blocking I/O task and the the thread in the main thread pool consumes this task and gets blocked.

In this PR, the ChunkFetchRequestHandler performs a blocking operation with channel.writeAndFlush(result).sync(), thus synchronizing between the producer and the consumer threads. This way, when the ChunkFetchRequestHandler processes 40 ChunkFetchRequest and blocks 40 threads from the main thread pool, the 40 threads in the chunk fetch thread pool are also blocked. Thus when a new ChunkFetchRequest is received by the shuffle server, it has to wait for one of the threads in chunk fetch thread pool to become available before it can be processed. In the meantime, there are still 5 threads in the main thread pool that's not blocked by ChunkFetchRequest, and they will be available to handle all other types of requests which are much quicker to process.

If the number of ChunkFetchRequest has reached the configured limit, it will not fail following ChunkFetchRequest immediately. These requests will just wait for availability of threads in the chunk fetch thread pool. This is the same behavior before this PR. Difference is that these ChunkFetchRequest could block all threads in main thread pool, causing both ChunkFetchRequest and other types of requests to wait for availability of threads in the main thread pool, thus timing out on executor registration or Sasl bootstrap.

@cloud-fan
Copy link
Contributor

I think we should still respect spark.shuffle.io.serverThreads, can we set a percentage of the server threads as the upper bound to handle chunk fetch requests?

@cloud-fan
Copy link
Contributor

cc @zsxwing @jiangxb1987

@Victsm
Copy link
Contributor Author

Victsm commented May 23, 2018

Using a percentage to configure the number of threads to handle chunk fetch requests does make sense. Will update the PR for this change.

@felixcheung
Copy link
Member

while we are here, could we also add or at least propose some metrics around this, such as number of open block failure, or even number of block threads?

we have suffer a lot from the lack of visibility

@Victsm
Copy link
Contributor Author

Victsm commented May 24, 2018

@felixcheung
Adding these metrics are indeed things we have been working on recently.
I'd prefer to propose it in a separate ticket.

@cloud-fan
Copy link
Contributor

I'm fine of adding metrics in another PR, please add a TODO in the code comment.

@hvanhovell
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jun 4, 2018

Test build #91444 has finished for PR 21402 at commit d862a2d.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkFetchRequest>
  • public class TransportChannelHandler extends SimpleChannelInboundHandler<Message>

@vanzin
Copy link
Contributor

vanzin commented Jun 8, 2018

StreamRequest will not block the server netty handler thread.

Hmm, I'm not so sure that's accurate. I think the main difference is that I don't think there is currently any code path that sends a StreamRequest to the shuffle service. But for example if many executors request files from the driver simultaneously, you could potentially end up in the same situation. It's a less serious issue since I think it's a lot less common for large files to be transferred that way, at least after startup.

I took a look at the code and it seems the actual change that avoids the disk thrashing is the synchronization done in the chunk fetch handler; it only allows a certain number of threads to actually do disk reads simultaneously. That's an improvement already, but a couple of questions popped in my head when I read your comment:

  • how does that relate to maxChunksBeingTransferred()? Aren't both settings effectively a limit on the number of requests being serviced, making the existing one a little redundant?

  • would there be benefits by trying to add some sort of disk affinity to these threads? e.g. send fetch requests hitting different disks to different queues.

@felixcheung
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jun 9, 2018

Test build #91593 has finished for PR 21402 at commit d862a2d.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkFetchRequest>
  • public class TransportChannelHandler extends SimpleChannelInboundHandler<Message>

@vanzin
Copy link
Contributor

vanzin commented Jun 13, 2018

Another question before I forget about it again: with the current code, if chunkFetchHandlerThreads() is equal or greater the size of the main thread pool won't that allow the current bad behavior to still exist?

I think Wenchen suggested that that value should be a percentage of the number of threads for the other pool, which would avoid that problem (as long as the percentage is < 100). But a check that results in an error or at least a noisy warning log could at least help users detect a misconfiguration.

@redsanket
Copy link

@Victsm @vanzin i want to get this going, is it better if I have a PR up with the requested changes and concerns?

@cloud-fan
Copy link
Contributor

shall we close it since #22173 is merged?

@redsanket
Copy link

@cloud-fan yes we can close this

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

@redsanket you should close it by yourself.

@asfgit asfgit closed this in a3ba3a8 Nov 11, 2018
Willymontaz pushed a commit to Willymontaz/spark that referenced this pull request Feb 12, 2019
…dle block fetch requests.

## What changes were proposed in this pull request?

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

## How was this patch tested?

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <schintap@yahoo-inc.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
Willymontaz added a commit to criteo-forks/spark that referenced this pull request Feb 12, 2019
…dle block fetch requests. (#89)

## What changes were proposed in this pull request?

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

## How was this patch tested?

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <schintap@yahoo-inc.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
cfmcgrady pushed a commit to cfmcgrady/spark that referenced this pull request Jul 31, 2019
…dle block fetch requests.

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <schintap@yahoo-inc.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
prakharjain09 pushed a commit to prakharjain09/spark that referenced this pull request Nov 29, 2019
…dle block fetch requests.

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <schintap@yahoo-inc.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit ff601cf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants