Skip to content

Commit

Permalink
[#1538] feat(spark): report blockIds to spark driver optionally (#1677)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Support report blockIds from shuffle-servers to spark driver optionally

### Why are the changes needed?

Fix: #1538

### Does this PR introduce _any_ user-facing change?

Yes. `rss.client.blockId.selfManagedEnabled` is introduced, default value is false.

### How was this patch tested?

Integration tests.
  • Loading branch information
zuston committed May 11, 2024
1 parent 7b63177 commit 313d4e0
Show file tree
Hide file tree
Showing 24 changed files with 720 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@

public class RssSparkConfig {

public static final ConfigOption<Boolean> RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED =
ConfigOptions.key("rss.blockId.selfManagementEnabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable the blockId self management in spark driver side. Default value is false.");

public static final ConfigOption<Long> RSS_CLIENT_SEND_SIZE_LIMITATION =
ConfigOptions.key("rss.client.send.size.limit")
.longType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle;

import java.util.List;
import java.util.Map;

import org.apache.commons.collections.CollectionUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;

/** The class is to manage the shuffle data blockIds in spark driver side. */
public class BlockIdManager {
private static final Logger LOGGER = LoggerFactory.getLogger(BlockIdManager.class);

// shuffleId -> partitionId -> blockIds
private Map<Integer, Map<Integer, Roaring64NavigableMap>> blockIds;

public BlockIdManager() {
this.blockIds = JavaUtils.newConcurrentMap();
}

public void add(int shuffleId, int partitionId, List<Long> ids) {
if (CollectionUtils.isEmpty(ids)) {
return;
}
Map<Integer, Roaring64NavigableMap> partitionedBlockIds =
blockIds.computeIfAbsent(shuffleId, (k) -> JavaUtils.newConcurrentMap());
partitionedBlockIds.compute(
partitionId,
(id, bitmap) -> {
Roaring64NavigableMap store = bitmap == null ? Roaring64NavigableMap.bitmapOf() : bitmap;
ids.stream().forEach(x -> store.add(x));
return store;
});
}

public Roaring64NavigableMap get(int shuffleId, int partitionId) {
Map<Integer, Roaring64NavigableMap> partitionedBlockIds = blockIds.get(shuffleId);
if (partitionedBlockIds == null || partitionedBlockIds.isEmpty()) {
return Roaring64NavigableMap.bitmapOf();
}

Roaring64NavigableMap idMap = partitionedBlockIds.get(partitionId);
if (idMap == null || idMap.isEmpty()) {
return Roaring64NavigableMap.bitmapOf();
}

return RssUtils.cloneBitMap(idMap);
}

public boolean remove(int shuffleId) {
blockIds.remove(shuffleId);
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.BlockIdLayout;

/**
* This class delegates the blockIds reporting/getting operations from shuffleServer side to Spark
* driver side.
*/
public class BlockIdSelfManagedShuffleWriteClient extends ShuffleWriteClientImpl {
private ShuffleManagerClient shuffleManagerClient;

public BlockIdSelfManagedShuffleWriteClient(
RssShuffleClientFactory.ExtendWriteClientBuilder builder) {
super(builder);

if (builder.getShuffleManagerClient() == null) {
throw new RssException("Illegal empty shuffleManagerClient. This should not happen");
}
this.shuffleManagerClient = builder.getShuffleManagerClient();
}

@Override
public void reportShuffleResult(
Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds,
String appId,
int shuffleId,
long taskAttemptId,
int bitmapNum) {
Map<Integer, List<Long>> partitionToBlockIds = new HashMap<>();
for (Map<Integer, Set<Long>> k : serverToPartitionToBlockIds.values()) {
for (Map.Entry<Integer, Set<Long>> entry : k.entrySet()) {
int partitionId = entry.getKey();
partitionToBlockIds
.computeIfAbsent(partitionId, x -> new ArrayList<>())
.addAll(entry.getValue());
}
}

RssReportShuffleResultRequest request =
new RssReportShuffleResultRequest(
appId, shuffleId, taskAttemptId, partitionToBlockIds, bitmapNum);
shuffleManagerClient.reportShuffleResult(request);
}

@Override
public Roaring64NavigableMap getShuffleResult(
String clientType,
Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId,
int shuffleId,
int partitionId) {
RssGetShuffleResultRequest request =
new RssGetShuffleResultRequest(appId, shuffleId, partitionId, BlockIdLayout.DEFAULT);
return shuffleManagerClient.getShuffleResult(request).getBlockIdBitmap();
}

@Override
public Roaring64NavigableMap getShuffleResultForMultiPart(
String clientType,
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions,
String appId,
int shuffleId,
Set<Integer> failedPartitions,
PartitionDataReplicaRequirementTracking replicaRequirementTracking) {
Set<Integer> partitionIds =
serverToPartitions.values().stream().flatMap(x -> x.stream()).collect(Collectors.toSet());
RssGetShuffleResultForMultiPartRequest request =
new RssGetShuffleResultForMultiPartRequest(
appId, shuffleId, partitionIds, BlockIdLayout.DEFAULT);
return shuffleManagerClient.getShuffleResultForMultiPart(request).getBlockIdBitmap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle;

import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;

public class RssShuffleClientFactory extends ShuffleClientFactory {

private static final RssShuffleClientFactory INSTANCE = new RssShuffleClientFactory();

public static RssShuffleClientFactory getInstance() {
return INSTANCE;
}

public ShuffleWriteClient createShuffleWriteClient(ExtendWriteClientBuilder builder) {
return builder.build();
}

public static ExtendWriteClientBuilder<?> newWriteBuilder() {
return new ExtendWriteClientBuilder();
}

public static class ExtendWriteClientBuilder<T extends ExtendWriteClientBuilder<T>>
extends WriteClientBuilder<T> {
private boolean blockIdSelfManagedEnabled;
private ShuffleManagerClient shuffleManagerClient;

public boolean isBlockIdSelfManagedEnabled() {
return blockIdSelfManagedEnabled;
}

public ShuffleManagerClient getShuffleManagerClient() {
return shuffleManagerClient;
}

public T shuffleManagerClient(ShuffleManagerClient client) {
this.shuffleManagerClient = client;
return self();
}

public T blockIdSelfManagedEnabled(boolean blockIdSelfManagedEnabled) {
this.blockIdSelfManagedEnabled = blockIdSelfManagedEnabled;
return self();
}

@Override
public ShuffleWriteClientImpl build() {
if (blockIdSelfManagedEnabled) {
return new BlockIdSelfManagedShuffleWriteClient(this);
}
return super.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.shuffle.BlockIdManager;

import static org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED;
Expand All @@ -61,6 +62,28 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac
private AtomicBoolean isInitialized = new AtomicBoolean(false);
private Method unregisterAllMapOutputMethod;
private Method registerShuffleMethod;
private volatile BlockIdManager blockIdManager;
private Object blockIdManagerLock = new Object();

public BlockIdManager getBlockIdManager() {
if (blockIdManager == null) {
synchronized (blockIdManagerLock) {
if (blockIdManager == null) {
blockIdManager = new BlockIdManager();
LOG.info("BlockId manager has been initialized.");
}
}
}
return blockIdManager;
}

@Override
public boolean unregisterShuffle(int shuffleId) {
if (blockIdManager != null) {
blockIdManager.remove(shuffleId);
}
return true;
}

/** See static overload of this method. */
public abstract void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;

import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.shuffle.BlockIdManager;

/**
* This is a proxy interface that mainly delegates the un-registration of shuffles to the
Expand Down Expand Up @@ -82,4 +83,6 @@ boolean reassignAllShuffleServersForWholeStage(

MutableShuffleHandleInfo reassignOnBlockSendFailure(
int shuffleId, Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);

BlockIdManager getBlockIdManager();
}

0 comments on commit 313d4e0

Please sign in to comment.