Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#134] improvement(spark3): Use taskId and attemptNo as taskAttemptId #1529

Merged
merged 5 commits into from
Feb 20, 2024

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Feb 15, 2024

What changes were proposed in this pull request?

Use map index and task attempt number as the task attempt id in Spark3.

This requires to rework the bits of the blockId to maximize bit utilization for Spark3:

// BlockId is long and consist of partitionId, taskAttemptId, atomicInt
// the length of them are ATOMIC_INT_MAX_LENGTH + PARTITION_ID_MAX_LENGTH +
// TASK_ATTEMPT_ID_MAX_LENGTH = 63
public static final int PARTITION_ID_MAX_LENGTH = 24;
public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 21;
public static final int ATOMIC_INT_MAX_LENGTH = 18;

Ideally, the TASK_ATTEMPT_ID_MAX_LENGTH is set equal to PARTITION_ID_MAX_LENGTH + the number of bits required to store the largest task attempt number. The largest task attempt number is maxFailures - 1, or maxFailures if speculative execution is enabled (configured via spark.speculation and disabled by default). The maxFailures is configured via spark.task.maxFailures and defaults to 4. So by default, two bits are required to store the largest attempt number and TASK_ATTEMPT_ID_MAX_LENGTH should be set to PARTITION_ID_MAX_LENGTH + 2.

Example:

  • with PARTITION_ID_MAX_LENGTH = 20, Uniffle supports 1,048,576 partitions
  • requiring TASK_ATTEMPT_ID_MAX_LENGTH = 22
  • allowing for ATOMIC_INT_MAX_LENGTH = 21.

Why are the changes needed?

The map index (map partition id) is limited to the number of partitions of a shuffle. The task attempt number is limited by the max number of failures configured by spark.task.maxFailures, which defaults to 4. This provides us an id that is unique per shuffe while not growing arbitrarily large as context.taskAttemptId does.

Fix: #134

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit and integration tests.

@codecov-commenter
Copy link

codecov-commenter commented Feb 15, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (7fbe7c9) 54.15% compared to head (60a6931) 55.14%.
Report is 3 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1529      +/-   ##
============================================
+ Coverage     54.15%   55.14%   +0.98%     
- Complexity     2803     2804       +1     
============================================
  Files           430      410      -20     
  Lines         24417    22056    -2361     
  Branches       2081     2081              
============================================
- Hits          13224    12163    -1061     
+ Misses        10361     9133    -1228     
+ Partials        832      760      -72     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

github-actions bot commented Feb 15, 2024

Test Results

2 438 files  + 9  2 438 suites  +9   4h 43m 12s ⏱️ + 3m 32s
  822 tests + 3    821 ✅ + 3   1 💤 ±0  0 ❌ ±0 
9 736 runs  +23  9 722 ✅ +23  14 💤 ±0  0 ❌ ±0 

Results for commit 60a6931. ± Comparison against base commit d120f4b.

♻️ This comment has been updated with latest results.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Feb 16, 2024

Re #1514 (comment):

AttemptNo will waste some bits. If we increase the bits, the bitmap will occupy more memory.

The current config is not optimal:

public static final int PARTITION_ID_MAX_LENGTH = 24;
public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 21;
public static final int ATOMIC_INT_MAX_LENGTH = 18;

The PARTITION_ID_MAX_LENGTH supports 16,777,216 partitions, with an assumed optimal partition size of 200 MB this would easily support a dataset of 3 PB. I think that can be reduced a bit.

Further, a TASK_ATTEMPT_ID_MAX_LENGTH that is smaller than PARTITION_ID_MAX_LENGTH does not make sense. A single stage with 2^PARTITION_ID_MAX_LENGTH partitions would create at least as many task attempt ids, which immediately exhausts TASK_ATTEMPT_ID_MAX_LENGTH. So there is room for improvement.

With the improvement in #1529 you would set TASK_ATTEMPT_ID_MAX_LENGTH = PARTITION_ID_MAX_LENGTH + 2 (for the default max failures of 4).

If you would like to support 2 m partitions and 4 max failures, then you would use:

public static final int PARTITION_ID_MAX_LENGTH = 21;
public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 23;
public static final int ATOMIC_INT_MAX_LENGTH = 19;

I think 2 m partitions is quite a lot (supports 400 TB datasets) and ATOMIC_INT_MAX_LENGTH would even be increased with that.

In other words, 2^PARTITION_ID_MAX_LENGTH (2^24) partitions have never been supported, at most 2^TASK_ATTEMPT_ID_MAX_LENGTH (2^21) partitions, which still is the case, plus more room for the sequence number.

@EnricoMi
Copy link
Contributor Author

Re #1514 (comment):

#1529 looks fine, can we add a integration test to cover the case of speculative exection?

The added FailingTasksTest simulates speculative execution quite well because the failing tasks actually write shuffle data but do not register the shuffle result, so they represent the killed slow tasks. The succeeding attempts represent the speculative task.

A speculative execution setup requires workers / executors with different host names. I think that has not been done in Uniffle before, so that would require some significant work with little extra benefit.

@zuston
Copy link
Member

zuston commented Feb 16, 2024

The added FailingTasksTest simulates speculative execution quite well because the failing tasks actually write shuffle data but do not register the shuffle result, so they represent the killed slow tasks. The succeeding attempts represent the speculative task.

Make sense.

Further, a TASK_ATTEMPT_ID_MAX_LENGTH that is smaller than PARTITION_ID_MAX_LENGTH does not make sense. A single stage with 2^PARTITION_ID_MAX_LENGTH partitions would create at least as many task attempt ids, which immediately exhausts TASK_ATTEMPT_ID_MAX_LENGTH. So there is room for improvement.

Thanks for your explanation, make sense for me. cc @jerqi

* practically reach LONG.MAX_VALUE. That would overflow the bits in the block id.
*
* <p>Here we use the map index or task id, appended by the attempt number per task. The map index
* is limited by the number of partitions of a stage. The attempt number per task is limited /
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attempt number may be larger than maxFailures.
If we fail 3 times, when fourth attempt run, Spark may trigger a speculative task at the same time. We will have 5 attempts.

Copy link
Contributor Author

@EnricoMi EnricoMi Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a bit surprising, but looking at the relevant code, the max failure is not considered when resubmitting a task as speculative:
https://github.com/apache/spark/blob/2abd3a2f445e86337ad94da19f301cb2b8bc232f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L1226-L1227

Copy link
Contributor Author

@EnricoMi EnricoMi Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, accounted for: 2f5bccd

@VisibleForTesting
protected static long getTaskAttemptId(int mapIndex, int attemptNo, int maxFailures) {
int maxAttemptNo = maxFailures < 1 ? 0 : maxFailures - 1;
if (attemptNo > maxAttemptNo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's ok if we have this judgement. But we should consider the case above.

@jerqi
Copy link
Contributor

jerqi commented Feb 16, 2024

How does the reader process the new taskAttemptId?

* @return a task attempt id unique for a shuffle stage
*/
@VisibleForTesting
protected static long getTaskAttemptId(int mapIndex, int attemptNo, int maxFailures, boolean speculationEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider that mapIndex exceed the limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the returned long as taskAttemptId to ClientUtils.getBlockId will raise an error that the taskAttemptId is too large for the block id bit layout.

Method getTaskAttemptId will not long-overflow because the mapIndex is an int, the maxFailures is an int. With maxFailures = Integer.MAX_VALUE, the value of attemptBits is 31 and the returned value is still a valid positive long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you prefer to fail-fast here, I can add an assertion.

Copy link
Contributor Author

@EnricoMi EnricoMi Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7b745ed as it is always good to fail-fast with a meaningful error message.

@EnricoMi
Copy link
Contributor Author

How does the reader process the new taskAttemptId?

The reader uses the taskAttemptId as is, they are opaque, they are just unique long values.

@EnricoMi EnricoMi force-pushed the blockid-with-mapindex-attemptno branch from dcd03bc to 3abe7ae Compare February 16, 2024 15:50
@EnricoMi EnricoMi force-pushed the blockid-with-mapindex-attemptno branch from 3abe7ae to 7b745ed Compare February 16, 2024 16:10
@jerqi
Copy link
Contributor

jerqi commented Feb 16, 2024

How does the reader process the new taskAttemptId?

The reader uses the taskAttemptId as is, they are opaque, they are just unique long values.

The reader will retrieve the taskAttemptIds which it need to read from the MapOutputTracker. So they are not opaque

@EnricoMi
Copy link
Contributor Author

How does the reader process the new taskAttemptId?

The reader uses the taskAttemptId as is, they are opaque, they are just unique long values.

The reader will retrieve the taskAttemptIds which it need to read from the MapOutputTracker. So they are not opaque

The RssShuffleWriter adds the taskAttemptId (now generated by RssShuffleManager.getTaskAttemptId) to the MapOutputTracker (by RssShuffleWriter.stop returning a MapStatus):

final BlockManagerId blockManagerId =
BlockManagerId.apply(
appId + "_" + taskId,
DUMMY_HOST,
DUMMY_PORT,
Option.apply(Long.toString(taskAttemptId)));
MapStatus mapStatus = MapStatus.apply(blockManagerId, partitionLengths, taskAttemptId);

The RssShuffleManager retrieves that id from the MapOutputTracker, to create the RssShuffleReader, as you pointed out. The ids are used to filter for blocks that come from those task attempts. The ids are compared as is, they are treated as plain longs, no inner information is (or has to be) extracted from them. This is what opaque means. Block ids, in contrast, are not opaque, as inner information like partition id are extracted from them.

@jerqi
Copy link
Contributor

jerqi commented Feb 17, 2024

How does the reader process the new taskAttemptId?

The reader uses the taskAttemptId as is, they are opaque, they are just unique long values.

The reader will retrieve the taskAttemptIds which it need to read from the MapOutputTracker. So they are not opaque

The RssShuffleWriter adds the taskAttemptId (now generated by RssShuffleManager.getTaskAttemptId) to the MapOutputTracker (by RssShuffleWriter.stop returning a MapStatus):

final BlockManagerId blockManagerId =
BlockManagerId.apply(
appId + "_" + taskId,
DUMMY_HOST,
DUMMY_PORT,
Option.apply(Long.toString(taskAttemptId)));
MapStatus mapStatus = MapStatus.apply(blockManagerId, partitionLengths, taskAttemptId);

The RssShuffleManager retrieves that id from the MapOutputTracker, to create the RssShuffleReader, as you pointed out. The ids are used to filter for blocks that come from those task attempts. The ids are compared as is, they are treated as plain longs, no inner information is (or has to be) extracted from them. This is what opaque means. Block ids, in contrast, are not opaque, as inner information like partition id are extracted from them.

OK. I got it.

@jerqi
Copy link
Contributor

jerqi commented Feb 17, 2024

@zuston Do you have another suggestion?

@zuston
Copy link
Member

zuston commented Feb 17, 2024

It looks this PR is a compatible change, but I'm not sure whether this will effect the local order mechanism? I will review this in next week of working days carefully.

Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I have checked the LocalOrderSegmentSplitter, and I think the task id is now composed in a way that still maintains the sequence required by local order

@zuston
Copy link
Member

zuston commented Feb 20, 2024

@jerqi PTAL again.

@zuston zuston merged commit 44eb4e5 into apache:master Feb 20, 2024
38 of 40 checks passed
@zuston
Copy link
Member

zuston commented Feb 20, 2024

Thanks @EnricoMi . This is a great improvement! 🎉

@EnricoMi
Copy link
Contributor Author

Thank you for incorporating this, it helps us greatly migrating to Uniffle!

@EnricoMi EnricoMi deleted the blockid-with-mapindex-attemptno branch February 20, 2024 08:59
jerqi pushed a commit that referenced this pull request Feb 29, 2024
…kAttemptId (#1544)

### What changes were proposed in this pull request?
Use map index and task attempt number as the task attempt id in Spark2.

### Why are the changes needed?

This aligns Spark2 taskAttemptId of the blockId with Spark3.

See  #1529

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing integration tests.
@EnricoMi EnricoMi mentioned this pull request Jul 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement] Support more tasks of the application
4 participants