Skip to content

[SPARK-39072][SHUFFLE]Fast fail the remaining push blocks if shuffle …#36411

Closed
wankunde wants to merge 2 commits intoapache:masterfrom
wankunde:fast_fail_push_blocks
Closed

[SPARK-39072][SHUFFLE]Fast fail the remaining push blocks if shuffle …#36411
wankunde wants to merge 2 commits intoapache:masterfrom
wankunde:fast_fail_push_blocks

Conversation

@wankunde
Copy link
Contributor

…stage finalized

What changes were proposed in this pull request?

Limit the push blocks in flight and try to stop push the remaining blocks shuffle stage is finalized.

Why are the changes needed?

Map task will try to push all map outputs to external shuffle service now.
After the shuffle stage is finalized, the reduce fetch blocks RPC will be blocked if there are still many map output blocks in flight.
We could stop pushing the remaining blocks if the shuffle stage is finalized.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Exists UT

@github-actions github-actions bot added the CORE label Apr 29, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mridulm
Copy link
Contributor

mridulm commented May 1, 2022

+CC @otterc

@wankunde wankunde changed the title [SPARK-39072][SHUFFLE]Fast Fail the remaining push blocks if shuffle … [SPARK-39072][SHUFFLE]Fast fail the remaining push blocks if shuffle … May 1, 2022
@otterc
Copy link
Contributor

otterc commented May 2, 2022

@wankunde Can you please provide more details/logs of the problem that you are trying to solve. In specific, can you provide some logs that exhibit the below

After the shuffle stage is finalized, the reduce fetch blocks RPC will be blocked if there are still many map output blocks in flight.

ShuffleBlockPusher already uses config maxBlocksInFlightPerAddress. Why do we need a new configuration for this?

1 similar comment
@otterc
Copy link
Contributor

otterc commented May 2, 2022

@wankunde Can you please provide more details/logs of the problem that you are trying to solve. In specific, can you provide some logs that exhibit the below

After the shuffle stage is finalized, the reduce fetch blocks RPC will be blocked if there are still many map output blocks in flight.

ShuffleBlockPusher already uses config maxBlocksInFlightPerAddress. Why do we need a new configuration for this?

@wankunde
Copy link
Contributor Author

wankunde commented May 7, 2022

Hi @otterc , You are right, we do not need this PR because ShuffleBlockPusher already uses config maxBlocksInFlightPerAddress. But I found that some ESS received FinalizeShuffleMerge RPC after few seconds. I am not sure if it is because there are many in flight pushing blocks to those ESS.

@wankunde
Copy link
Contributor Author

wankunde commented May 7, 2022

Driver Logs

22/05/07 01:00:13,218 INFO [shuffle-client-9-5] scheduler.DAGScheduler:57 : finalizeShuffleMerge success from ESS_NODE in 7763 ms  shuffleId 2 - 0

ESS logs

2022-05-07 01:00:08,891 DEBUG org.apache.spark.network.server.TransportServer: New connection accepted for remote address /DRIVER_IP:41402.
2022-05-07 01:00:12,816 INFO org.apache.spark.network.shuffle.ExternalBlockHandler: receive FinalizeShuffleMerge finalize shuffle merge
2022-05-07 01:00:12,816 INFO org.apache.spark.network.shuffle.RemoteBlockPushResolver: application_1651741622014_0557 attempt -1 shuffle 2 shuffleMerge 0: finalize shuffle merge
2022-05-07 01:00:13,217 TRACE org.apache.spark.network.server.TransportRequestHandler: Sent result RpcResponse[requestId=7587608556323056846,body=NioManagedBuffer[buf=java.nio.HeapByteBuffer[pos=0 lim=818744 cap=818744]]] to client /DRIVER_IP:41402

@wankunde
Copy link
Contributor Author

wankunde commented May 7, 2022

Close this PR

@wankunde wankunde closed this May 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants