Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31034][CORE] ShuffleBlockFetcherIterator should always create request for last block group #27786

Closed
wants to merge 7 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Mar 4, 2020

What changes were proposed in this pull request?

This is a bug fix of #27280. This PR fix the bug where ShuffleBlockFetcherIterator may forget to create request for the last block group.

Why are the changes needed?

When (all blocks).sum < targetRemoteRequestSize and (all blocks).length > maxBlocksInFlightPerAddress and (last block group).size < maxBlocksInFlightPerAddress,
ShuffleBlockFetcherIterator will not create a request for the last group. Thus, it will lost data for the reduce task.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Updated test.

@Ngone51
Copy link
Member Author

Ngone51 commented Mar 4, 2020

cc @cloud-fan @xuanyuanking

@@ -339,14 +339,14 @@ final class ShuffleBlockFetcherIterator(
+ s"with ${blocks.size} blocks")
}

def createFetchRequests(): Unit = {
def createFetchRequests(hasMore: Boolean): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isLast?

@@ -367,12 +367,12 @@ final class ShuffleBlockFetcherIterator(
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
createFetchRequests()
createFetchRequests(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's write down the parameter name.

@SparkQA
Copy link

SparkQA commented Mar 4, 2020

Test build #119285 has finished for PR 27786 at commit fc36eb1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.


var numResults = 0
// After initialize(), there will be 6 FetchRequests, and the each of the first 5
// includes 3 merged blocks and the last one has 1 merged block. So, only the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 6 merged blocks in total, how can each request includes 3 merged blocks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or do you mean shuffle blocks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's shuffle blocks in this case. But it seems inconsistent with comment in the below test....I need reword it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: 3 merged blocks -> 3 shuffle block(not batch)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's update

var numResults = 0
// After initialize(), there will be 2 FetchRequests that one has 2 merged blocks and another
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: 2 merged blocks -> 2 ShuffleBlockBatch

@SparkQA
Copy link

SparkQA commented Mar 4, 2020

Test build #119294 has finished for PR 27786 at commit 778be33.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 4, 2020

Test build #119287 has finished for PR 27786 at commit 28df4ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// After initialize(), there will be 6 FetchRequests. And each of the first 5 requests
// includes 1 merged block which is merged from 3 shuffle blocks. The last request has 1 merged
// block which merged from 2 shuffle blocks. So, only the first 5 requests(5 * 3 * 100 >= 1500)
// can be sent. The second FetchRequest will hit maxBlocksInFlightPerAddress so it won't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second -> The 6th?

@SparkQA
Copy link

SparkQA commented Mar 4, 2020

Test build #119308 has finished for PR 27786 at commit 6766c7c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 4, 2020

Test build #119319 has finished for PR 27786 at commit dc94365.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

@cloud-fan cloud-fan closed this in 2257ce2 Mar 5, 2020
cloud-fan pushed a commit that referenced this pull request Mar 5, 2020
…request for last block group

### What changes were proposed in this pull request?

This is a bug fix of #27280. This PR fix the bug where `ShuffleBlockFetcherIterator` may forget to create request for the last block group.

### Why are the changes needed?

When (all blocks).sum < `targetRemoteRequestSize` and (all blocks).length > `maxBlocksInFlightPerAddress` and (last block group).size < `maxBlocksInFlightPerAddress`,
`ShuffleBlockFetcherIterator` will not create a request for the last group. Thus, it will lost data for the reduce task.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Updated test.

Closes #27786 from Ngone51/fix_no_request_bug.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 2257ce2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@Ngone51
Copy link
Member Author

Ngone51 commented Mar 5, 2020

thanks all!

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…request for last block group

### What changes were proposed in this pull request?

This is a bug fix of apache#27280. This PR fix the bug where `ShuffleBlockFetcherIterator` may forget to create request for the last block group.

### Why are the changes needed?

When (all blocks).sum < `targetRemoteRequestSize` and (all blocks).length > `maxBlocksInFlightPerAddress` and (last block group).size < `maxBlocksInFlightPerAddress`,
`ShuffleBlockFetcherIterator` will not create a request for the last group. Thus, it will lost data for the reduce task.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Updated test.

Closes apache#27786 from Ngone51/fix_no_request_bug.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants