Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled #28911

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
0815e75
init
Ngone51 Jun 6, 2020
dc6e7b8
update
Ngone51 Jun 9, 2020
b1c6ac0
fix
Ngone51 Jun 22, 2020
64037cf
update
Ngone51 Jun 23, 2020
285fd70
add test
Ngone51 Jun 23, 2020
904335a
update
Ngone51 Jun 23, 2020
9e3c875
remove spark
Ngone51 Jun 23, 2020
5e12b68
remove spark
Ngone51 Jun 23, 2020
3960b44
update getHostLocalDirs
Ngone51 Jun 23, 2020
27c5f5c
add comment
Ngone51 Jun 23, 2020
8a5811f
fix build
Ngone51 Jun 30, 2020
05598f8
update test
Ngone51 Jun 30, 2020
23181b6
update conf doc
Ngone51 Jul 2, 2020
eb6eb9a
rename to getLocalDirs
Ngone51 Jul 10, 2020
5fdf1bc
update comment
Ngone51 Jul 10, 2020
d192db8
correct to bmId
Ngone51 Jul 10, 2020
db2500a
simplify to immutableHostLocalBlocksWithoutDirs
Ngone51 Jul 10, 2020
5c767da
rename to blockInfos
Ngone51 Jul 10, 2020
0d25492
add assert
Ngone51 Jul 21, 2020
f6cfcbc
refactor
Ngone51 Jul 21, 2020
0a144e3
fix NettyBlockTransferServiceSuite
Ngone51 Jul 23, 2020
b44f1cb
fix private def mockBlockManager(): BlockManager = {
Ngone51 Jul 23, 2020
342ae60
add comment
Ngone51 Jul 23, 2020
613ffab
update
Ngone51 Jul 23, 2020
cd3e30d
simplify the requirement
Ngone51 Jul 24, 2020
da8d78a
fix log
Ngone51 Jul 24, 2020
7d9036a
fix s
Ngone51 Jul 24, 2020
0372bd8
fix Matcher
Ngone51 Aug 26, 2020
17f1b60
fix java style
Ngone51 Aug 26, 2020
530d63c
don't close client
Ngone51 Aug 27, 2020
b14d611
comment style
Ngone51 Aug 27, 2020
3f7ea0b
add comment
Ngone51 Aug 27, 2020
2aa71f6
refactor
Ngone51 Aug 27, 2020
054bf69
reorg import
Ngone51 Aug 30, 2020
5fbd6bb
should be
Ngone51 Aug 31, 2020
8c5fdb3
move checkInit to BlockStoreClient
Ngone51 Aug 31, 2020
9aa6974
fix indent
Ngone51 Aug 31, 2020
09665e2
improve error message
Ngone51 Aug 31, 2020
6d21906
remove k8s tip
Ngone51 Aug 31, 2020
5ea8f24
resply with error
Ngone51 Aug 31, 2020
57f3e1d
improve comment
Ngone51 Aug 31, 2020
d70e757
allFetchSucceed -> allFetchSucceeded
Ngone51 Aug 31, 2020
6b97be5
combine the tests
Ngone51 Aug 31, 2020
5e98eca
remove unnecesary brackets
Ngone51 Sep 1, 2020
8f16c17
check executorId
Ngone51 Sep 1, 2020
7c38190
use _
Ngone51 Sep 1, 2020
a35807b
indent
Ngone51 Sep 1, 2020
a23ab17
update test
Ngone51 Sep 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,15 +18,33 @@
package org.apache.spark.network.shuffle;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import com.codahale.metrics.MetricSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors;
import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors;

/**
* Provides an interface for reading both shuffle files and RDD blocks, either from an Executor
* or external service.
*/
public abstract class BlockStoreClient implements Closeable {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());

protected volatile TransportClientFactory clientFactory;
protected String appId;

/**
* Fetch a sequence of blocks from a remote node asynchronously,
Expand Down Expand Up @@ -61,4 +79,60 @@ public MetricSet shuffleMetrics() {
// Return an empty MetricSet by default.
return () -> Collections.emptyMap();
}

protected void checkInit() {
assert appId != null : "Called before init()";
}

/**
* Request the local disk directories for executors which are located at the same host with
* the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService).
*
* @param host the host of BlockManager or ExternalShuffleService. It should be the same host
* with current BlockStoreClient.
* @param port the port of BlockManager or ExternalShuffleService.
* @param execIds a collection of executor Ids, which specifies the target executors that we
* want to get their local directories. There could be multiple executor Ids if
* BlockStoreClient is implemented by ExternalBlockStoreClient since the request
* handler, ExternalShuffleService, can serve multiple executors on the same node.
* Or, only one executor Id if BlockStoreClient is implemented by
* NettyBlockTransferService.
* @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id
* to its local directories if the request handler replies
* successfully. Otherwise, it contains a specific error.
*/
public void getHostLocalDirs(
String host,
int port,
String[] execIds,
CompletableFuture<Map<String, String[]>> hostLocalDirsCompletable) {
checkInit();
GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds);
try {
TransportClient client = clientFactory.createClient(host, port);
client.sendRpc(getLocalDirsMessage.toByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
try {
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
hostLocalDirsCompletable.complete(
((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
} catch (Throwable t) {
logger.warn("Error while trying to get the host local dirs for " +
Arrays.toString(getLocalDirsMessage.execIds), t.getCause());
hostLocalDirsCompletable.completeExceptionally(t);
}
}

@Override
public void onFailure(Throwable t) {
logger.warn("Error while trying to get the host local dirs for " +
Arrays.toString(getLocalDirsMessage.execIds), t.getCause());
hostLocalDirsCompletable.completeExceptionally(t);
}
});
} catch (IOException | InterruptedException e) {
hostLocalDirsCompletable.completeExceptionally(e);
}
}
}
Expand Up @@ -21,7 +21,6 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

Expand All @@ -30,10 +29,7 @@
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.shuffle.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.crypto.AuthClientBootstrap;
Expand All @@ -47,16 +43,11 @@
* (via BlockTransferService), which has the downside of losing the data if we lose the executors.
*/
public class ExternalBlockStoreClient extends BlockStoreClient {
private static final Logger logger = LoggerFactory.getLogger(ExternalBlockStoreClient.class);

private final TransportConf conf;
private final boolean authEnabled;
private final SecretKeyHolder secretKeyHolder;
private final long registrationTimeoutMs;

protected volatile TransportClientFactory clientFactory;
protected String appId;

/**
* Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
* then secretKeyHolder may be null.
Expand All @@ -72,10 +63,6 @@ public ExternalBlockStoreClient(
this.registrationTimeoutMs = registrationTimeoutMs;
}

protected void checkInit() {
assert appId != null : "Called before init()";
}

/**
* Initializes the BlockStoreClient, specifying this Executor's appId.
* Must be called before any other method on the BlockStoreClient.
Expand Down Expand Up @@ -188,43 +175,6 @@ public void onFailure(Throwable e) {
return numRemovedBlocksFuture;
}

public void getHostLocalDirs(
String host,
int port,
String[] execIds,
CompletableFuture<Map<String, String[]>> hostLocalDirsCompletable) {
checkInit();
GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds);
try {
TransportClient client = clientFactory.createClient(host, port);
client.sendRpc(getLocalDirsMessage.toByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
try {
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
hostLocalDirsCompletable.complete(
((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
} catch (Throwable t) {
logger.warn("Error trying to get the host local dirs for " +
Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service",
t.getCause());
hostLocalDirsCompletable.completeExceptionally(t);
}
}

@Override
public void onFailure(Throwable t) {
logger.warn("Error trying to get the host local dirs for " +
Arrays.toString(getLocalDirsMessage.execIds) + " via external shuffle service",
t.getCause());
hostLocalDirsCompletable.completeExceptionally(t);
}
});
} catch (IOException | InterruptedException e) {
hostLocalDirsCompletable.completeExceptionally(e);
}
}

@Override
public void close() {
checkInit();
Expand Down
Expand Up @@ -1415,10 +1415,9 @@ package object config {

private[spark] val SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED =
ConfigBuilder("spark.shuffle.readHostLocalDisk")
.doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled and external " +
s"shuffle `${SHUFFLE_SERVICE_ENABLED.key}` is enabled), shuffle " +
"blocks requested from those block managers which are running on the same host are read " +
"from the disk directly instead of being fetched as remote blocks over the network.")
.doc(s"If enabled (and `${SHUFFLE_USE_OLD_FETCH_PROTOCOL.key}` is disabled, shuffle " +
"blocks requested from those block managers which are running on the same host are " +
"read from the disk directly instead of being fetched as remote blocks over the network.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
Expand Down
Expand Up @@ -27,6 +27,11 @@ import org.apache.spark.storage.{BlockId, ShuffleBlockId, StorageLevel}
private[spark]
trait BlockDataManager {

/**
* Get the local directories that used by BlockManager to save the blocks to disk
*/
def getLocalDiskDirs: Array[String]
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Interface to get host-local shuffle block data. Throws an exception if the block cannot be
* found or cannot be read successfully.
Expand Down
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.ThreadUtils
* BlockTransferService contains both client and server inside.
*/
private[spark]
abstract class BlockTransferService extends BlockStoreClient with Logging {
abstract class BlockTransferService extends BlockStoreClient {

/**
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
Expand Down
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.network.client.{RpcResponseCallback, StreamCallbackWithI
import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager}
import org.apache.spark.network.shuffle.protocol._
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, ShuffleBlockBatchId, ShuffleBlockId, StorageLevel}
import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockBatchId, ShuffleBlockId, StorageLevel}

/**
* Serves requests to open blocks by simply registering one chunk per block requested.
Expand Down Expand Up @@ -113,6 +113,26 @@ class NettyBlockRpcServer(
s"when there is not sufficient space available to store the block.")
responseContext.onFailure(exception)
}

case getLocalDirs: GetLocalDirsForExecutors =>
val isIncorrectAppId = getLocalDirs.appId != appId
val execNum = getLocalDirs.execIds.length
if (isIncorrectAppId || execNum != 1) {
val errorMsg = "Invalid GetLocalDirsForExecutors request: " +
s"${if (isIncorrectAppId) s"incorrect application id: ${getLocalDirs.appId};"}" +
s"${if (execNum != 1) s"incorrect executor number: $execNum (expected 1);"}"
responseContext.onFailure(new IllegalStateException(errorMsg))
} else {
val expectedExecId = blockManager.asInstanceOf[BlockManager].executorId
val actualExecId = getLocalDirs.execIds.head
if (actualExecId != expectedExecId) {
responseContext.onFailure(new IllegalStateException(
s"Invalid executor id: $actualExecId, expected $expectedExecId."))
} else {
responseContext.onSuccess(new LocalDirsForExecutors(
Map(actualExecId -> blockManager.getLocalDiskDirs).asJava).toByteBuffer)
}
}
}
}

Expand Down
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.network.netty

import java.io.IOException
import java.nio.ByteBuffer
import java.util
import java.util.{HashMap => JHashMap, Map => JMap}
import java.util.concurrent.CompletableFuture

import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
Expand All @@ -33,11 +35,11 @@ import org.apache.spark.ExecutorDeadException
import org.apache.spark.internal.config
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient, TransportClientBootstrap, TransportClientFactory}
import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
import org.apache.spark.network.server._
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher}
import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream}
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, GetLocalDirsForExecutors, LocalDirsForExecutors, UploadBlock, UploadBlockStream}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.serializer.JavaSerializer
Expand Down Expand Up @@ -65,8 +67,6 @@ private[spark] class NettyBlockTransferService(

private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _
private[this] var clientFactory: TransportClientFactory = _
private[this] var appId: String = _

override def init(blockDataManager: BlockDataManager): Unit = {
val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
Expand All @@ -80,7 +80,7 @@ private[spark] class NettyBlockTransferService(
clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
server = createServer(serverBootstrap.toList)
appId = conf.getAppId
logInfo(s"Server created on ${hostName}:${server.getPort}")
logger.info(s"Server created on $hostName:${server.getPort}")
}

/** Creates and binds the TransportServer, possibly trying multiple ports. */
Expand Down Expand Up @@ -113,7 +113,9 @@ private[spark] class NettyBlockTransferService(
blockIds: Array[String],
listener: BlockFetchingListener,
tempFileManager: DownloadFileManager): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
if (logger.isTraceEnabled) {
logger.trace(s"Fetch blocks from $host:$port (executor id $execId)")
}
try {
val maxRetries = transportConf.maxIORetries()
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
Expand Down Expand Up @@ -146,7 +148,7 @@ private[spark] class NettyBlockTransferService(
}
} catch {
case e: Exception =>
logError("Exception while beginning fetchBlocks", e)
logger.error("Exception while beginning fetchBlocks", e)
blockIds.foreach(listener.onBlockFetchFailure(_, e))
}
}
Expand Down Expand Up @@ -174,12 +176,14 @@ private[spark] class NettyBlockTransferService(
blockId.isShuffle)
val callback = new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")
if (logger.isTraceEnabled) {
logger.trace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")
}
result.success((): Unit)
}

override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e)
logger.error(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e)
result.failure(e)
}
}
Expand Down