Skip to content

[CELEBORN-955] Re-run Spark Stage for Celeborn Shuffle Fetch Failure#1924

Closed
ErikFang wants to merge 8 commits intoapache:mainfrom
ErikFang:Re-run-Spark-Stage-for-Celeborn-Shuffle-Fetch-Failure
Closed

[CELEBORN-955] Re-run Spark Stage for Celeborn Shuffle Fetch Failure#1924
ErikFang wants to merge 8 commits intoapache:mainfrom
ErikFang:Re-run-Spark-Stage-for-Celeborn-Shuffle-Fetch-Failure

Conversation

@ErikFang
Copy link
Contributor

What changes were proposed in this pull request?

Currently, Celeborn uses replication to handle shuffle data lost for celeborn shuffle reader, this PR implements an alternative solution by Spark stage resubmission.

Design doc:
https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8/edit

Why are the changes needed?

Spark stage resubmission uses less resources compared with replication, and some Celeborn users are also asking for it

Does this PR introduce any user-facing change?

a new config celeborn.client.fetch.throwsFetchFailure is introduced to enable this feature

How was this patch tested?

two UTs are attached, and we also tested it in Ant Group's Dev spark cluster

@ErikFang
Copy link
Contributor Author

In my local spark 3.2 build, below code snippet is added to trigger DAGScheduler to clean up mapOutputTracker status

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f97c426348..fef6013d98 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1844,6 +1844,12 @@ private[spark] class DAGScheduler(
             }
           }

+          // ensure retry of all MapTasks of resubmitted stage attempt
+          // TODO: change to ShuffleManagerIndicator.isRemoteShuffle
+          if (failureMessage.contains("Celeborn FetchFailure: ")) {
+            mapOutputTracker.unregisterAllMapAndMergeOutput(shuffleId)
+          }
+
           if (failedStage.rdd.isBarrier()) {
             failedStage match {
               case failedMapStage: ShuffleMapStage =>

this is also the reason why CelebornFetchFailureSuite fails with community spark in this PR
will update a commit soon to include the RPC related changes for LifecycleManager to clean up mapOutputTracker

commitManager.handleGetReducerFileGroup(context, shuffleId)
}

private def handleGetShuffleId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option:

  1. For writer, Add new field appShuffleId to registerShuffleRequest and generate uniqueShuffleId(used as celeborn shuffleId) at client side(spark), then lifecycleManager doesn't have to compute shuffleId according to spark stage rerun-logic, and writer needn't introduce a new rpc, btw shuffleIdMapping still need in lifecyceManager.
  2. For Reader, shuffleClient can directly get latest shuffleId related, this way we don't need the getShuffleId rpc, but we need change the shuffleId meaning in GetReducerFileGroup to appShuffleId, and response latest shuffle id in GetReducerFileGroupResponse @ErikFang @waitinfuture

Copy link
Contributor Author

@ErikFang ErikFang Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option:

  1. For writer, Add new field appShuffleId to registerShuffleRequest and generate uniqueShuffleId(used as celeborn shuffleId) at client side(spark), then lifecycleManager doesn't have to compute shuffleId according to spark stage rerun-logic, and writer needn't introduce a new rpc, btw shuffleIdMapping still need in lifecyceManager.

This is good, I've ,made the change locally, will update the PR soon

  1. For Reader, shuffleClient can directly get latest shuffleId related, this way we don't need the getShuffleId rpc, but we need change the shuffleId meaning in GetReducerFileGroup to appShuffleId, and response latest shuffle id in GetReducerFileGroupResponse @ErikFang @waitinfuture

I checked the code, but got problem with "response latest shuffle id in GetReducerFileGroupResponse"
the call hierarchy is

ShuffleClientImpl.readPartition(int appShuffleId, ....)
            |
ShuffleClientImpl.loadFileGroup(int appShuffleId, ...)
            |
ShuffleClientImpl.updateFileGroup(int appShuffleId, ...)
            |
ShuffleClientImpl.loadFileGroupInternal(int appShuffleId)

and reduceFileGroupsMap is of type

 protected final Map<Integer, ReduceFileGroups> reduceFileGroupsMap =
      JavaUtils.newConcurrentHashMap();

, which maps shuffleId -> ReduceFileGroups and updates with
reduceFileGroupsMap.computeIfAbsent(shuffleId, (id) -> loadFileGroupInternal(shuffleId))
If shuffleId is obtained from GetReducerFileGroupResponse, then current computeIfAbsent() logic will be changed and every updateFileGroup() call sends a GetReducerFileGroup RPC to lifecycleManager, which may impact performace

How about keeping the get shuffleId RPC for Reader, or any advice? @RexXiong @waitinfuture

shuffleIdMapping.put(appShuffleId, shuffleIds :+ newShuffleId)
}

val shuffleId = shuffleIdMapping.get(appShuffleId).last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Shuffle unregister/clean uselessShuffleId

  1. When register a new shuffleId with same appShuffleId we can clear all out-dated info in lifecycleManager, such as shuffleIdMapping(appShuffleId->shuffleId), shuffleId related info...
  2. When shuffleId unregister, LifecycleManager alse can clear the useless shuffleId in shuffleIdMapping.

Copy link
Contributor Author

@ErikFang ErikFang Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffle clean up has been added:

  1. new function unregisterAppShuffle() is added to LifecycleManager what will be called by SparkShuffleManager.unregisterShuffle() with appShuffleId
  2. regarding executor, shuffleIdTracker is added to track and cleanup related shuffle id in shuffleClient

@SteNicholas
Copy link
Member

@ErikFang, could you use dev/reformat to format the changes?


private void registerCallback() {
MapOutputTrackerMaster mapOutputTracker = (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
Consumer<Integer> c = (shuffleId) -> mapOutputTracker.unregisterAllMapAndMergeOutput(shuffleId);
Copy link
Contributor

@waitinfuture waitinfuture Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For INDETERMINATE shuffleId, I think we should also guarantee that the reduce stage is also recomputed if I understand correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://lists.apache.org/thread/t8z8pc2dw6ooclk57y4sn1loof68tthq
Thanks for @mridulm to give a detailed explanation in mail list

.getOrElse(context::applicationId);
}

public static int celebornShuffleId(ShuffleClient client, CelebornShuffleHandle<?, ? ,?> handle, TaskContext context, Boolean isWriter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to distinguish from appShuffleId and celebornShuffleId, as this PR introduces a mapping from spark's shuffleId to celeborn's shuffleId.

Copy link
Member

@SteNicholas SteNicholas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ErikFang, I left some comments for the updates. PTAL.

/*
In order to support Spark Stage resubmit with ShuffleReader FetchFails, Celeborn shuffleId has to be distinguished
from Spark shuffleId. Spark shuffleId is assigned at ShuffleDependency construction time, and all Attempt of
a Spark Stage has the same ShuffleId. When Celeborn ShuffleReader fails to fetch shuffle data from worker and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
a Spark Stage has the same ShuffleId. When Celeborn ShuffleReader fails to fetch shuffle data from worker and
a Spark Stage have the same ShuffleId. When Celeborn ShuffleReader fails to fetch shuffle data from worker and

@SteNicholas
Copy link
Member

@ErikFang, could you rebase the latest main branch to resolve conflicts for this pull request?

@ErikFang
Copy link
Contributor Author

I've update the PR to reformat & rebase

@codecov
Copy link

codecov bot commented Oct 18, 2023

Codecov Report

Attention: 17 lines in your changes are missing coverage. Please review.

Comparison is base (788b0c3) 46.50% compared to head (3226459) 46.46%.
Report is 1 commits behind head on main.

Files Patch % Lines
...born/common/protocol/message/ControlMessages.scala 0.00% 12 Missing ⚠️
...born/common/network/protocol/TransportMessage.java 0.00% 4 Missing ⚠️
...cala/org/apache/celeborn/common/CelebornConf.scala 85.72% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1924      +/-   ##
==========================================
- Coverage   46.50%   46.46%   -0.04%     
==========================================
  Files         166      166              
  Lines       10758    10781      +23     
  Branches      984      988       +4     
==========================================
+ Hits         5002     5008       +6     
- Misses       5430     5447      +17     
  Partials      326      326              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@waitinfuture
Copy link
Contributor

I've update the PR to reformat & rebase

Hi @ErikFang , please run ./dev/reformat to auto format the code style.

@ErikFang
Copy link
Contributor Author

update a new commit to address comments related to code style/comment/variable name

@waitinfuture
Copy link
Contributor

Hi @ErikFang , please add license header for new added files

throws {@link FetchFailedException}, Spark DAGScheduler resubmits the failed ResultStage and corresponding ShuffleMapStage
, but Celeborn can't differentiate shuffle data from previous failed/resubmitted ShuffleMapStage with the same shuffleId.
Current solution takes Stage retry in account, and set
celeborn shuffleId = spark shuffleId * maxStageAttemptsNum + stageAttemptsId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably mentioned this in the discussion in dev list - we cannot assume spark shuffleId * maxStageAttemptsNum + stageAttemptsId does not lead to conflicts.

With shuffle id reuse across jobs, this will result in conflicts.

Copy link
Contributor Author

@ErikFang ErikFang Oct 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm

as I explained in maillist, spark shuffle id is a static concept bounded to the shuffle dependency, aka, the shuffle data. I prefer not to call it "shuffle id reuse", since it is not the same id pointing to different data, it is just THE shuffle data's id. I don't think there will be any conflict.
If spark shuffle data 0 needs to be accessed, no matter it will be submitted to DAGScheduler soon, or it had been computed in previous job, spark shuffle id 0 should be used. In second case, spark read the shuffle data directly if the data doesn't need to be re-computed. you can search 'spark_shuffleId 0' in logs pasted in my mail to see that, shuffle data 0 is written once and read twice with spark shuffle id 0/celeborn shuffle id 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark uses shuffle id, stage reexecution, job reexecution (particularly for sql) in a specific way, and that does not change due to Celeborn being used.

In this specific context, the assumption being made is, for a given Spark shuffle id, there are upto maxStageAttemptNums stage attempts possible - if there is more, the generated id will conflict with some other Spark shuffle id's Celeborn shuffle id - given this, the question is, can this happen - and as I have illustrated before - it can, and does actually.

Please let me know if I am missing something etching or can help clarify

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add, spark.stage.maxConsecutiveAttempts is for a specific stage, not shuffle id - so total number of times a particular shuffle id is compted by stage attempts has no bounds (across jobs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification, you are correct.
I feel the shuffle id mapping by (maxStageAttemptsNum, stageAttemptsId) is a bad idea now
The bounded shuffle id range enforced by maxStageAttemptsNum conflicts with the unbounded spark shuffle id range generated by spark stage re-run, which may happens when shuffle data gets recomputed with several round of shuffle fetch failure

Let me review the solution of singleton shuffle id generation in LifecycleMananger, and keep track of the correct shuffleId with
private val shuffleIdMapping = JavaUtils.newConcurrentHashMap[Int, scala.collection.mutable.LinkedHashMap[Int, Boolean]]()

Copy link
Contributor

@mridulm mridulm Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do let me know if I can help or give clarity ... I don't understand Celeborn well yet (particularly given it supports multiple engines), but I can help explain how Spark might behave for your proposals !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basic idea of shuffle id generation in LifecycleManager:

  1. shuffle writer sends app shuffle id to LifecycleManager, which may create new shuffle id for writing
  • If the app shuffle id is not recorded in shuffleIdMapping, then generates a new shuffle id with AtomicInteger, puts appShuffleId -> (shuffleId -> true) in shuffleIdMapping, returns shuffleId to shuffle writer
  • If the app shuffle id is mapped to (shuffleId -> true), returns shuffleId to shuffle writer
  • If the app shuffle id is mapped to (shuffleId -> false), generates a new (shuffleId' -> true) to put into shuffleIdMapping, returns shuffleId' to shuffle writer // fetch failure happened
  1. shuffle reader sends app shuffle id to LifecycleManager, gets latest shuffle id associated with app shuffle id

  2. during FetchFailure, shuffle reader sends ReportFetchFailure with (app shuffleId, shuffle id) to LifecycleManager, updates shuffleIdMapping to appShuffleId -> (shuffleId -> false)

@mridulm
Do you see any issue here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, the incoming request to LifecycleManager will include app-shuffle-id, stage-id, stage-attempt-id ?
If yes, agree, this should address all the concerns I was having.

One small nit, if we can generate this at driver and ship the value to executor (so that we can avoid the invocation from executor to driver to fetch this id), that would be great - I have not thought through how we can do this, but adding as a comment in case there is an existing solution we can leverage !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is definitely a good idea to use app-shuffle-id + stage-id + stage-attempt-id as an identifier for one execution of shuffle computing, which maps to a celeborn shuffle id. let's call it appShuffleIdentifier
val appShuffleIdentifier = s"$appShuffleId-$stageId-$stageAttemptId"

the solutions is updated as:
define

  // app shuffle id -> LinkedHashMap of (app shuffle identifier, (shuffle id, fetch status))
  private val shuffleIdMapping = JavaUtils.newConcurrentHashMap[Int, scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]()
  1. shuffle writer sends (appShuffleId, appShuffleIdentifier, isWriter=true) to LifecycleManager
  • If appShuffleId or appShuffleIdentifier is not recorded, then generates a new shuffle id with AtomicInteger, saves mapping of appShuffleId -> appShuffleIdentifier -> (shuffleId, true) and return to writer
  • if appShuffleIdentifier is found, return shuffleId to writer
  1. shuffle reader sends (appShuffleId, null, isWriter=false) to LifecycleManager
  • find the latest finished(all shuffle write task is done in celeborn) shuffle id associated with appShuffleId, return to reader
  1. during FetchFailure, shuffle reader sends ReportFetchFailure with (app shuffleId, shuffle id) to LifecycleManager, cleans status on MapOutputTracker (exactly once) and updates fetch status of the shuffle id to false, and does nothing if fetch status is already false

Regarding 2, reader needs to get the latest finished shuffleId to prevent reader from a previous job to incorrectly get the running resubmitted ShuffleMapStage's shuffleId (that is unfinished) and reports fetch failure mistakenly

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments, thanks for fixing this !

@ErikFang
Copy link
Contributor Author

MapOutputTrackerMaster. unregisterAllMapAndMergeOutput() was added in spark 3.2 by SPARK-32920, so checks with spark 3.0/3.1 fails

Can you suggest how should I handle this spark 3.x minor version issue in celeborn? @waitinfuture

@mridulm
Copy link
Contributor

mridulm commented Oct 22, 2023

unregisterAllMapAndMergeOutput was added when we contributed push based shuffle to Apache Spark.
It replaced the earlier unregisterAllMapOutput - which should work for 3.1 and older.

@mridulm
Copy link
Contributor

mridulm commented Oct 29, 2023

+CC @otterc as well.

@waitinfuture
Copy link
Contributor

Hi @mridulm , I still have a question about the deterministic. DAGScheduler#submitMissingTasks will only call unregisterAllMapAndMergeOutput if the current ShuffleMapStage is Indeterminate. What if the current stage is determinate, but its upstream stage is Indeterminate, and current stage throws FetchFailed, seems only its upstream
stage will be fully rerun, the current stage will only rerun its missing tasks. If this is the case, the current stage's output
can be incorrect.

@waitinfuture
Copy link
Contributor

Hi @mridulm , I still have a question about the deterministic. DAGScheduler#submitMissingTasks will only call unregisterAllMapAndMergeOutput if the current ShuffleMapStage is Indeterminate. What if the current stage is determinate, but its upstream stage is Indeterminate, and current stage throws FetchFailed, seems only its upstream stage will be fully rerun, the current stage will only rerun its missing tasks. If this is the case, the current stage's output can be incorrect.

I checked RDD#getOutputDeterministicLevel and find that if an RDD's upstream is INDETERMINATE,
then it's also INDETERMINATE.

@mridulm
Copy link
Contributor

mridulm commented Nov 4, 2023

Note that RDD can override getOutputDeterministicLevel - but the default behavior is what you described.

@waitinfuture
Copy link
Contributor

Note that RDD can override getOutputDeterministicLevel - but the default behavior is what you described.

Yeah, so it can be that upstream stage is indeterminate while the downstream stage is determinate, and the incorrectness I described before can happen, right?

import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.reflect.DynMethods;

/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/*
/**
* In order to support Spark stage resubmission when ShuffleReader's fetch fails, Celeborn's shuffle id should be distinguished from Spark's shuffle id.
* Spark's shuffle id is assigned when ShuffleDependency is constructed. Meanwhile, all attempts of a Spark Stage have the same shuffle id.
* When ShuffleReader fails to fetch shuffle data from worker and throws {@link FetchFailedException}, Spark DAGScheduler resubmits the failed ResultStage and corresponding ShuffleMapStage.
* But Celeborn couldn't make shuffle data any different from previous failed or resubmitted ShuffleMapStage with the same shuffle id.
* Celeborn takes retry of stage in consideration in current implementation. LifecycleManager generates and tracks usage of shuffle id for Spark and Celeborn.
* Then, Spark shuffle Reader/Writer gets shuffle id from LifecycleManager with GetShuffleId request.
*/


public abstract int getShuffleId(int appShuffleId, String appShuffleIdentifier, boolean isWriter);

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
/**
* Reports the failure of shuffle data fetch given shuffle id of Spark and Celeborn.
*/

.createWithDefault(3)

val CLIENT_FETCH_THROWS_FETCH_FAILURE: ConfigEntry[Boolean] =
buildConf("celeborn.client.fetch.throwsFetchFailure")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the config option suitable for all engine client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes to "celeborn.client.spark.fetch.throwsFetchFailure"

Copy link
Contributor

@waitinfuture waitinfuture left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current code mostly LGTM except some small comments. Thanks!

ret = false
}
shuffleIds.put(appShuffleIdentifier, (shuffleId, false))
unregisterShuffle(shuffleId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether we should call unregisterShuffle here. Consider this scenario:
one out of 100 worker is down, currently 1000 reduce tasks are running, and about 10 of them encounter fetch failure. If we unregister here, the other 900 tasks will also fail. If we delay the unregister shuffle to unregisterAppShuffle, we can avoid this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose other 900 tasks fails with fetch failure won't change any thing in current design
anyway, I don't have any problem to delay the unregister shuffle, will change it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the reduce stage is determinate, then if the 900 reduce tasks succeed, they will not be rerun.

.getOrElse(context::applicationId);
}

public static int celebornShuffleId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's better to move this method to ShuffleClient for potentially other engines. We can change the argument list to appShuffleId, appShuffleIdentifier, isWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public abstract int getShuffleId(int appShuffleId, String appShuffleIdentifier, boolean isWriter);
was already added to ShuffleClient

I prefer to keep SparkUtils.celebornShuffleId() method as a place to check whether it needs to get shuffleId from LifecycleManager

  public static int celebornShuffleId(
      ShuffleClient client,
      CelebornShuffleHandle<?, ?, ?> handle,
      TaskContext context,
      Boolean isWriter) {
    if (handle.throwsFetchFailure()) {
      String appShuffleIdentifier = getAppShuffleIdentifier(handle.shuffleId(), context);
      return client.getShuffleId(handle.shuffleId(), appShuffleIdentifier, isWriter);
    } else {
      return handle.shuffleId();
    }
  }

otherwise, it would be hard to implement the check/fallback logic in place like constructor

  public SortBasedShuffleWriter(
      CelebornShuffleHandle<K, V, C> handle,
      TaskContext taskContext,
      CelebornConf conf,
      ShuffleClient client,
      ShuffleWriteMetricsReporter metrics,
      ExecutorService executorService,
      SendBufferPool sendBufferPool)
      throws IOException {
    this(
        SparkUtils.celebornShuffleId(client, handle, taskContext, true),
        handle.dependency(),
        handle.numMappers(),
        taskContext,
        conf,
        client,
        metrics,
        executorService,
        sendBufferPool);
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add shuffleId as an argument to the constructor, as HashBasedShuffleWriter does. SparkUtils.celebornShuffleId() is a small wrapper for ShuffleClient.getShuffleId. I'm fine with this anyway.

val CLIENT_FETCH_THROWS_FETCH_FAILURE: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.fetch.throwsFetchFailure")
.categories("client")
.version("0.4.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.version("0.4.0")
.version("0.3.2")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @SteNicholas , since this is a big feature, I suggest not merge to branch-0.3

Copy link
Contributor

@waitinfuture waitinfuture left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code LGTM, I tested without replication and kill worker in either write stage and read stage, results are good. cc @mridulm @otterc do you have more comments on this PR?

Copy link
Contributor

@otterc otterc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just have few nits

@waitinfuture
Copy link
Contributor

Here is my test. Note: should first cherry-pick another fix: #2104
Command:

spark-shell --conf spark.executor.instances=100 --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.celeborn.master.endpoints=master-1-1:9097 --conf spark.shuffle.service.enabled=false --conf spark.celeborn.client.push.replicate.enabled=false --conf spark.celeborn.client.spark.fetch.throwsFetchFailure=true

Test case:

spark.sparkContext.parallelize(1 to 1000, 1000).flatMap( _ => (1 to 1000000).iterator.map(num => num)).repartition(1000).count

When I killed a worker during shuffle write stage:
image

When I killed a worker during shuffle read stage:
image

I'm going to test TPCDS.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits and queries. Thanks for working on this @ErikFang, excited to see this as part of next release :-)

(MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
lifecycleManager.registerShuffleTrackerCallback(
shuffleId -> mapOutputTracker.unregisterAllMapOutput(shuffleId));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: Do we want to make this a constructor param ?
Technically this has MT safety concerns (since update to appShuffleTrackerCallback is not gauranteed to be visible across threads).

Copy link
Contributor Author

@ErikFang ErikFang Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think the MT concerns are reasonable, but I'm afraid changing the constructor signature may affect other engines
how about adding volatile to appShuffleTrackerCallback?

private long sendBufferPoolCheckInterval;
private long sendBufferPoolExpireTimeout;

private ExecutorShuffleIdTracker shuffleIdTracker = new ExecutorShuffleIdTracker();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: Do we want a NoOpShuffleIdTracker ? Which simply preserves existing behavior when clientFetchThrowsFetchFailure is false ?

class NoOpShuffleIdTracker extends ShuffleIdTracker {
  // nothing to track
  def track(appShuffleId: Int, shuffleId: Int): Unit = {}

  def unregisterAppShuffleId(client: ShuffleClient, appShuffleId): Unit = {
    shuffleClient.cleanupShuffle(appShuffleId)
  }
}

@waitinfuture
Copy link
Contributor

waitinfuture commented Nov 19, 2023

There is another issue to be solved. Say a determinate shuffleMapStage A also generates shuffle out, at some time it finishes 100 tasks, with another 100 tasks to be run, then it encounters fetch failure. It triggers upstream stage rerun successfully and went back. At this time, since it uses a new stage attemptid, it has a new shuffleIdentifier, so it creates a new shuffle id. Since it's determinate, it only runs the remaining 100 tasks. However, Celeborn only trigger StageEnd for a shuffleId when it receives MapperEnd for number of partitions of this stage, which is 200, so it will not trigger StageEnd. But Spark thinks all 200 tasks are successful, so it schedules the downstream stage, the downstream stage can't find a finished shuffleId, and finally causes job failure.

IMO we need to get the deterministic information and only create a new shuffleId for a writer when it's nondeterminate, WDYT @ErikFang @mridulm ?

@ErikFang
Copy link
Contributor Author

There is another issue to be solved. Say a determinate shuffleMapStage A also generates shuffle out, at some time it finishes 100 tasks, with another 100 tasks to be run, then it encounters fetch failure. It triggers upstream stage rerun successfully and went back. At this time, since it uses a new stage attemptid, it has a new shuffleIdentifier, so it creates a new shuffle id. Since it's determinate, it only runs the remaining 100 tasks. However, Celeborn only trigger StageEnd for a shuffleId when it receives MapperEnd for number of partitions of this stage, which is 200, so it will not trigger StageEnd. But Spark thinks all 200 tasks are successful, so it schedules the downstream stage, the downstream stage can't find a finished shuffleId, and finally causes job failure.

IMO we need to get the deterministic information and only create a new shuffleId for a writer when it's nondeterminate, WDYT @ErikFang @mridulm ?

I suppose it is doable

https://github.com/apache/spark/blob/v3.2.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L468
DAGScheduler creates ShuffleMapStage with rdd obtained from ShuffleDependency, and ShuffleManager gets ShuffleDependency object in registerShuffle(...), then we can have registerShuffle(...) to pass (appShuffleId, rdd.outputDeterministicLevel == INDETERMINATE) to LifeCycleManager

@waitinfuture
Copy link
Contributor

waitinfuture commented Nov 21, 2023

I tested with the latest code (43e4ee2) together with CELEBORN-1130, and I'm good with the result.

I tested TPCDS q11 1T with 8 workers and replication off. I successively killed 6 workers out of 8, and the query succeeds with correct result (I set spark.stage.maxConsecutiveAttempts=100). We can see lots of retry stages:
image

@ErikFang please rebase the lates code, I'm happy to merge this PR :)
cc @mridulm @otterc @SteNicholas @RexXiong

@waitinfuture
Copy link
Contributor

I also tested q23ab and results are also good

appShuffleTrackerCallback = Some(callback)
}

def registerAppShuffleDeterminate(appShuffleId: Int, determinate: Boolean): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: determinate is an argument to the method, so shouldn't we just call this method registerAppShuffle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my perspective, the whole point of this method is to pass determinate information of ShuffleMapStage to LifecycleManager, so it is a better name than registerAppShuffle
actually, it is handleGetShuffleIdForApp method that handles the job of "registerAppShuffle"

scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]()
private val shuffleIdGenerator = new AtomicInteger(0)
// app shuffle id -> whether shuffle is determinate, rerun of a indeterminate shuffle gets different result
private val appShuffleDeterminateMap = JavaUtils.newConcurrentHashMap[Int, Boolean]();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to maintain this state? Can the GetShuffleId message not include whether the shuffle is determinate or not?

Copy link
Contributor Author

@ErikFang ErikFang Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of reducing status~
however, it seems hard to get rdd/stage DeterministicLevel in task/executor, it is where GetShuffleId RPC is called

@SteNicholas
Copy link
Member

SteNicholas commented Nov 22, 2023

@waitinfuture, @ErikFang, @mridulm, @otterc, the test result of internal Spark SQL job is successful as follows:
wecom-temp-164223-b36261fd0b10d3bacdaef3928a45efbb
image
wecom-temp-82543-e6d89ca9cb6caa6f63f2e73dffb70aeb

@ErikFang
Copy link
Contributor Author

rebased the code against latest main branch, renamed HookedCelebornShuffleManager to TestCelebornShuffleManager, added shuffle id with FETCH_FAILURE_ERROR_MSG to help debug, decorated appShuffleTrackerCallback with volatile

I favor adding NoOpShuffleIdTracker and UT of PbGetShuffleId/PbReportShuffleFetchFailure message handing in a follow-up PR
gentle ping @waitinfuture to take a look

Copy link
Contributor

@waitinfuture waitinfuture left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks! Merging to main(v0.4.0). Thanks all reviewers, especially @mridulm for giving valuable insights of spark's rerun internals!

None

val shuffleId: Integer =
if (determinate && candidateShuffle.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to check determinate here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants