Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23524] Big local shuffle blocks should not be checked for corruption. #20685

Closed
wants to merge 3 commits into from

Conversation

jinxing64
Copy link

What changes were proposed in this pull request?

In current code, all local blocks will be checked for corruption no matter it's big or not. The reasons are as below:

Size in FetchResult for local block is set to be 0 (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
SPARK-4105 meant to only check the small blocks(size<maxBytesInFlight/3), but for reason 1, below check will be invalid. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420

We can fix this and avoid the OOM.

How was this patch tested?

UT added

@jinxing64
Copy link
Author

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87714 has finished for PR 20685 at commit 535916c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2018

Test build #87741 has finished for PR 20685 at commit 110c851.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Feb 28, 2018

Test build #87755 has finished for PR 20685 at commit 110c851.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Feb 28, 2018

Test build #87770 has finished for PR 20685 at commit 110c851.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

jinxing64 commented Mar 1, 2018

The failed test FileBasedDataSourceSuite is not related, I can pass in my local

@jinxing64
Copy link
Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87807 has finished for PR 20685 at commit 110c851.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

jinxing64 commented Mar 1, 2018

cc @cloud-fan @zsxwing @squito @jiangxb1987
Could you please help take a look?

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor cleanup in the test, otherwise lgtm. thanks for catching this and suggesting a fix.

@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
intercept[FetchFailedException] { iterator.next() }
}

test("big corrupt blocks will not be retiried") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: retried (or maybe "retired", not sure)
though I think a better name would be "big blocks are not checked for corruption"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will refine this.

)

val transfer = mock(classOf[BlockTransferService])
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can reuse createMockTransfer to simplify this a little.

(actually, a bunch of this test code looks like it could be refactored across these tests -- but we can leave that out of this change.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot~ Imran, I can file another pr for the refine :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry my comment was vague -- I do think you can use createMockTransfer here, since that helper method already exists.

I was just thinking that there may be more we could clean up -- setting up the local & remote BlockManager Id, creating the ShuffleIterator, etc. seems to have a lot of boilerplate in all the tests. But let's not to do a pure refactoring to the other tests in this change.

@cloud-fan
Copy link
Contributor

We should update the doc of SuccessFetchResult#size, currently it's saying estimated size of the block, used to calculate bytesInFlight.

@jinxing64
Copy link
Author

@squito @cloud-fan
Thanks you so much for reviewing. I refined accordingly. Please take another look when you have time.

@SparkQA
Copy link

SparkQA commented Mar 6, 2018

Test build #88003 has finished for PR 20685 at commit 1cd8653.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

From what I've seen in #15923 , this corruption check is mostly to detect network failures, so seems like we don't need this check for local blocks at all.

cc @davies @zsxwing

@squito
Copy link
Contributor

squito commented Mar 7, 2018

it'll also help with disk corruption ... from the stack traces in SPARK-4105 you can't really tell what the source of the problem is. it'll be pretty hard to determine what the source of corruption is if we start seeing it again. anyway, I don't feel that strongly about it either way.

* @param size estimated size of the block, used to calculate bytesInFlight.
* Note that this is NOT the exact bytes.
* @param size estimated size of the block. Note that this is NOT the exact bytes.
* Size of remote block is used to calculate bytesInFlight.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: documentation style

@cloud-fan
Copy link
Contributor

sounds reasonable. The purpose of this corruption check is to fail fast to retry the stage(re-shuffle), so disk corruption should also be counted.

@cloud-fan
Copy link
Contributor

LGTM

@jinxing64
Copy link
Author

jinxing64 commented Mar 7, 2018

@cloud-fan @squito @Ngone51
Thanks a lot !

@SparkQA
Copy link

SparkQA commented Mar 7, 2018

Test build #88033 has finished for PR 20685 at commit 4e4f075.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Mar 7, 2018

I think #20179 probably already fixed the data corruption issue.

@cloud-fan
Copy link
Contributor

yea very likely, but I'm not 100% sure. How about we merge this one first to fix the mistake for local shuffle blocks, and then think about whether or not we should remove this corruption check?

@squito
Copy link
Contributor

squito commented Mar 8, 2018

I agree with @cloud-fan .

@squito
Copy link
Contributor

squito commented Mar 8, 2018

lgtm

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Mar 8, 2018
…uption.

## What changes were proposed in this pull request?

In current code, all local blocks will be checked for corruption no matter it's big or not.  The reasons are as below:

Size in FetchResult for local block is set to be 0 (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
SPARK-4105 meant to only check the small blocks(size<maxBytesInFlight/3), but for reason 1, below check will be invalid. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420

We can fix this and avoid the OOM.

## How was this patch tested?

UT added

Author: jx158167 <jx158167@antfin.com>

Closes #20685 from jinxing64/SPARK-23524.

(cherry picked from commit 77c91cc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@asfgit asfgit closed this in 77c91cc Mar 8, 2018
@jinxing64
Copy link
Author

Thanks for merging !
@cloud-fan @squito @zsxwing @Ngone51

peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…uption.

## What changes were proposed in this pull request?

In current code, all local blocks will be checked for corruption no matter it's big or not.  The reasons are as below:

Size in FetchResult for local block is set to be 0 (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
SPARK-4105 meant to only check the small blocks(size<maxBytesInFlight/3), but for reason 1, below check will be invalid. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420

We can fix this and avoid the OOM.

## How was this patch tested?

UT added

Author: jx158167 <jx158167@antfin.com>

Closes apache#20685 from jinxing64/SPARK-23524.

(cherry picked from commit 77c91cc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants