Skip to content

Conversation

@exmy
Copy link
Contributor

@exmy exmy commented Jun 1, 2021

What changes were proposed in this pull request?

HighlyCompressedMapStatus supports record accurately the size of skewed shuffle block which small than spark.shuffle.accurateBlockThreshold.

Why are the changes needed?

HighlyCompressedMapStatus currently cannot record accurately the size of shuffle blocks which much greater than other block but small than spark.shuffle.accurateBlockThreshold, which is likely to lead OOM when fetch shuffle blocks. We have to tune some extra properties like spark.reducer.maxReqsInFlight to prevent it, so it is better to fix it in HighlyCompressedMapStatus.

Does this PR introduce any user-facing change?

Yes, a new config spark.shuffle.accurateSkewedBlockThreshold added.

How was this patch tested?

Add new ut.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions github-actions bot added the CORE label Jun 1, 2021
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR, @exmy .

"when fetch shuffle blocks.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(350 * 1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe the background of this value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe the background of this value?

Thanks for review. Described in the reply to @mridulm below, hopes to get your opinion.

@mridulm
Copy link
Contributor

mridulm commented Jun 2, 2021

I am missing something here, if a block is < spark.shuffle.accurateBlockThreshold, it is recorded as a small block - else it is marked as a huge block. The point of spark.shuffle.accurateBlockThreshold is that it is not very high, so should not cause OOM - are you configuring it to a very high value ?
Note that this gets triggered when (typically) you have > 2k partitions - so the benefits of using HighlyCompressedMapStatus is to prevent other issues which more accurate tracking ends up with for very large number of partitions.

// Huge blocks are not included in the calculation for average size, thus size for smaller
// blocks is more accurate.
if (size < threshold) {
if ((size >= 5 * overallNonEmptyAvgSize && size >= minThreshold) || size >= threshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Echo'ing @dongjoon-hyun's comment above - what is the background of this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We first compute a average size for non-empty uncompressedSizes, if a block is > N * this avg size, it is marked as a huge block. In order to avoid mistaken mark of a block which is > N * this avg size but not big enough, the new config spark.shuffle.accurateSkewedBlockThreshold is introduced. Only a block is both > N * this avg size and accurateSkewedBlockThreshold can be marked as a huge block.
The reason why accurateSkewedBlockThreshold is set default to 350K is because we assume 3k paratitions, only when the amount of data fetched by a reduce task greater than 3000 * 350K = 1G can this situation be considered.
I'm not sure if N = 5 and accurateSkewedBlockThreshold=350K here are appropriate and really hope to get your opinion.

@exmy
Copy link
Contributor Author

exmy commented Jun 2, 2021

I am missing something here, if a block is < spark.shuffle.accurateBlockThreshold, it is recorded as a small block - else it is marked as a huge block. The point of spark.shuffle.accurateBlockThreshold is that it is not very high, so should not cause OOM - are you configuring it to a very high value ?
Note that this gets triggered when (typically) you have > 2k partitions - so the benefits of using HighlyCompressedMapStatus is to prevent other issues which more accurate tracking ends up with for very large number of partitions.

Thanks for review. In our production, spark.shuffle.accurateBlockThreshold is set to 10M. We encounted a OOM case, for example, assume that map stage has 3k tasks and shuffle partitions are also 3k, most of output block size of each task is small(like 20K), but some block size(like 2M, but small than accurateBlockThreshold) are much bigger than other block, in this situation HighlyCompressedMapStatus also get avgrage size for these more bigger blocks, but for the reduce task processing these blocks it will fetch actually 3000 * 2M = 5.8G data which pretty likely cause OOM. Of course we can tune accurateBlockThreshold to prevent it, but this approach is specific-application and cannot be used in common case, so I think we should solve it in HighlyCompressedMapStatus.

@exmy exmy closed this Jun 2, 2021
@exmy exmy reopened this Jun 2, 2021
@mridulm
Copy link
Contributor

mridulm commented Jun 2, 2021

The size estimation helps make judgement about how many concurrent fetches to make, and whether those many concurrent inflight requests can be handled in parallel - it would not result in all of them getting fetched concurrently.
Please see spark.reducer.maxSizeInFlight, spark.reducer.maxReqsInFlight, etc configs to tune the behavior.

@shuai-xu
Copy link

shuai-xu commented Jun 7, 2021

@mridulm Tunning spark.reducer.maxSizeInFlight and spark.reducer.maxReqsInFlight is useful for one job. But it has thousands of jobs in a cluster, it's difficult to set these two configs for every job. Maybe #32287 can solve this problem. But recording the size of blocks more precisely may still be helpful.

@mridulm
Copy link
Contributor

mridulm commented Jun 7, 2021

These parameters are not for individual stages or jobs - but model behavior based on how much resources are available and what cost is acceptable for an application (memory, number of concurrent io, etc). I would suggest looking more into tuning these for the specifics of the resources available.
Making map status more expensive has nontrivial consequences to large shuffles.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 16, 2021
@github-actions github-actions bot closed this Sep 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants