Skip to content

Commit

Permalink
[#1608][part-5] feat(spark3): always use the available assignment (#1652
Browse files Browse the repository at this point in the history
)

### What changes were proposed in this pull request?

1. make the write client always use the latest available assignment for the following writing when the block reassign happens.
2. support multi time retry for partition reassign
3. limit the max reassign server num of one partition
4. refactor the reassign rpc
5. rename the faultyServer -> receivingFailureServer. 

#### Reassign whole process
![image](https://github.com/apache/incubator-uniffle/assets/8609142/8afa5386-be39-4ccb-9c10-95ffb3154939)

#### Always using the latest assignment

To acheive always using the latest assignment, I introduce the `TaskAttemptAssignment` to get the latest assignment for current task. The creating process of AddBlockEvent also will apply the latest assignment by `TaskAttemptAssignment` 

And it will be updated by the `reassignOnBlockSendFailure` rpc. 
That means the original reassign rpc response will be refactored and replaced by the whole latest `shuffleHandleInfo`.

### Why are the changes needed?

This PR is the subtask for #1608.

Leverging the #1615 / #1610 / #1609, we have implemented the reassign servers mechansim when write client encounters the server failure or unhealthy. But this is not good enough that will not share the faulty server state to the unstarted tasks and latter `AddBlockEvent` .

### Does this PR introduce _any_ user-facing change?

Yes. 

### How was this patch tested?

Unit and integration tests.

Integration tests as follows:
1. `PartitionBlockDataReassignBasicTest` to validate the reassign mechanism valid
2. `PartitionBlockDataReassignMultiTimesTest` is to test the partition reassign mechanism of multiple retries.

---------

Co-authored-by: Enrico Minack <github@enrico.minack.dev>
  • Loading branch information
zuston and EnricoMi committed May 9, 2024
1 parent 43bd09c commit 30bf8dc
Show file tree
Hide file tree
Showing 38 changed files with 1,730 additions and 626 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.spark.ShuffleDependency;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;

import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
Expand All @@ -31,14 +32,14 @@ public class RssShuffleHandle<K, V, C> extends ShuffleHandle {
private String appId;
private int numMaps;
private ShuffleDependency<K, V, C> dependency;
private Broadcast<ShuffleHandleInfo> handlerInfoBd;
private Broadcast<SimpleShuffleHandleInfo> handlerInfoBd;

public RssShuffleHandle(
int shuffleId,
String appId,
int numMaps,
ShuffleDependency<K, V, C> dependency,
Broadcast<ShuffleHandleInfo> handlerInfoBd) {
Broadcast<SimpleShuffleHandleInfo> handlerInfoBd) {
super(shuffleId);
this.appId = appId;
this.numMaps = numMaps;
Expand Down Expand Up @@ -67,6 +68,6 @@ public RemoteStorageInfo getRemoteStorage() {
}

public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
return handlerInfoBd.value().getPartitionToServers();
return handlerInfoBd.value().getAvailablePartitionServersForWriter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ public class RssSparkConfig {
.withDescription(
"The memory spill switch triggered by Spark TaskMemoryManager, default value is false.");

public static final ConfigOption<Integer> RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM =
ConfigOptions.key("rss.client.reassign.maxReassignServerNum")
.intType()
.defaultValue(10)
.withDescription(
"The max reassign server num for one partition when using partition reassign mechanism.");

public static final ConfigOption<Integer> RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES =
ConfigOptions.key("rss.client.reassign.blockRetryMaxTimes")
.intType()
.defaultValue(1)
.withDescription("The block retry max times when partition reassign is enabled.");

public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";

public static final ConfigEntry<Integer> RSS_PARTITION_NUM_PER_RANGE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.spark.SparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -62,8 +63,8 @@ public class RssSparkShuffleUtils {

private static final Logger LOG = LoggerFactory.getLogger(RssSparkShuffleUtils.class);

public static final ClassTag<ShuffleHandleInfo> SHUFFLE_HANDLER_INFO_CLASS_TAG =
scala.reflect.ClassTag$.MODULE$.apply(ShuffleHandleInfo.class);
public static final ClassTag<SimpleShuffleHandleInfo> DEFAULT_SHUFFLE_HANDLER_INFO_CLASS_TAG =
scala.reflect.ClassTag$.MODULE$.apply(SimpleShuffleHandleInfo.class);
public static final ClassTag<byte[]> BYTE_ARRAY_CLASS_TAG =
scala.reflect.ClassTag$.MODULE$.apply(byte[].class);

Expand Down Expand Up @@ -256,22 +257,22 @@ public static SparkContext getActiveSparkContext() {
}

/**
* create broadcast variable of {@link ShuffleHandleInfo}
* create broadcast variable of {@link SimpleShuffleHandleInfo}
*
* @param sc expose for easy unit-test
* @param shuffleId
* @param partitionToServers
* @param storageInfo
* @return Broadcast variable registered for auto cleanup
*/
public static Broadcast<ShuffleHandleInfo> broadcastShuffleHdlInfo(
public static Broadcast<SimpleShuffleHandleInfo> broadcastShuffleHdlInfo(
SparkContext sc,
int shuffleId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
RemoteStorageInfo storageInfo) {
ShuffleHandleInfo handleInfo =
new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
return sc.broadcast(handleInfo, SHUFFLE_HANDLER_INFO_CLASS_TAG);
SimpleShuffleHandleInfo handleInfo =
new SimpleShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
return sc.broadcast(handleInfo, DEFAULT_SHUFFLE_HANDLER_INFO_CLASS_TAG);
}

private static <T> T instantiateFetchFailedException(
Expand Down

This file was deleted.

0 comments on commit 30bf8dc

Please sign in to comment.