Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32916][SHUFFLE] Implementation of shuffle service that leverages push-based shuffle in YARN deployment mode #30062

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
df48e01
LIHADOOP-48527 Magnet shuffle service block transfer netty protocol
Victsm May 9, 2020
6472267
LIHADOOP-53438 Using different appId for the tests in RemoteBlockPush…
otterc May 12, 2020
1c78e1d
LIHADOOP-53496 Not logging all block push exceptions on the client
otterc May 15, 2020
71f3246
LIHADOOP-53700 Separate configuration for caching the merged index fi…
zhouyejoe Jun 1, 2020
221178f
LIHADOOP-53940 Logging the data file and index file path when shuffle…
otterc Jun 10, 2020
55b4a5f
LIHADOOP-54059 LIHADOOP-53496 Handle the inconsistencies between loc…
otterc Jun 15, 2020
f9d0e86
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
548e2c0
LIHADOOP-52494 Magnet fallback to origin shuffle blocks when fetch of…
otterc Jul 24, 2020
50efba9
LIHADOOP-55372 reduced the default for minChunkSizeInMergedShuffleFile
otterc Aug 26, 2020
8a6e01b
LIHADOOP-55315 Avoid network when fetching merged shuffle file in loc…
zhouyejoe Sep 9, 2020
ae5ffac
LIHADOOP-55654 Duplicate application init calls trigger NPE and wrong…
zhouyejoe Sep 12, 2020
e51042b
Further prune changes that should go into a later PR.
Victsm Sep 23, 2020
83aca99
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
04e0efe
LIHADOOP-55022 Disable the merged shuffle file cleanup in stopApplica…
zhouyejoe Aug 11, 2020
71dfd48
Tests and cleanup
otterc Oct 6, 2020
0c411c1
LIHADOOP-55948 Failure in the push stream should not change the curre…
otterc Oct 1, 2020
d029463
Minor style corrections
otterc Oct 15, 2020
8f3839f
Fixed style issues
otterc Oct 15, 2020
1cd2d03
Renamed variables, methods, fixed indentation, addressed other review…
otterc Oct 19, 2020
3356c19
Addressing review comments
otterc Oct 23, 2020
d879beb
Changed the partitions map and addressed other review comments
otterc Oct 26, 2020
48ae819
Added support for subdirs under merge_manager dirs and removed the ya…
otterc Oct 28, 2020
9b031f7
Addressed test failure and other review comments in RemoteBlockPushRe…
otterc Oct 29, 2020
807cc7b
Minor change in finalization
otterc Oct 29, 2020
5b169bc
Removing the partition from inner map after the files are closed
otterc Oct 30, 2020
9ece587
Server side configuration to specify the implementation of MergedShuf…
otterc Oct 30, 2020
d13c7ad
Change the Push block stream to not encode shuffle Id, map index, and…
otterc Nov 2, 2020
63843bb
Fixed typos, address review comments, made NoOp the default impl, and…
otterc Nov 2, 2020
d35aa4b
Addressed review comments
otterc Nov 3, 2020
ba92311
Fix IndexOutOfBoundsException and avoid instantiating AppsPathInfo mu…
otterc Nov 4, 2020
9be25b3
Removed duplicate declaration of shuffle push prefix
otterc Nov 4, 2020
ba51796
Added UT for collision with 2 streams
otterc Nov 4, 2020
1f4fcfe
Removed unnecessary TODOs, marked MergedShuffleFileManager evolving, …
otterc Nov 4, 2020
28edaae
Changed the serialization on chunktracker and removed serializedSizeI…
otterc Nov 6, 2020
cb1881c
Use RandomAccessFile
otterc Nov 8, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.network.protocol;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -46,7 +47,11 @@ public static String decode(ByteBuf buf) {
}
}

/** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
/**
* Bitmaps are encoded with their serialization length followed by the serialization bytes.
*
* @since 3.1.0
*/
public static class Bitmaps {
public static int encodedLength(RoaringBitmap b) {
// Compress the bitmap before serializing it. Note that since BlockTransferMessage
Expand All @@ -57,13 +62,20 @@ public static int encodedLength(RoaringBitmap b) {
return b.serializedSizeInBytes();
}

/**
* The input ByteBuf for this encoder should have enough write capacity to fit the serialized
* bitmap. Other encoders which use {@link io.netty.buffer.AbstractByteBuf#writeBytes(byte[])}
* to write can expand the buf as writeBytes calls {@link ByteBuf#ensureWritable} internally.
* However, this encoder doesn't rely on netty's writeBytes and will fail if the input buf
* doesn't have enough write capacity.
*/
public static void encode(ByteBuf buf, RoaringBitmap b) {
int encodedLength = b.serializedSizeInBytes();
// RoaringBitmap requires nio ByteBuffer for serde. We expose the netty ByteBuf as a nio
// ByteBuffer. Here, we need to explicitly manage the index so we can write into the
// ByteBuffer, and the write is reflected in the underneath ByteBuf.
b.serialize(buf.nioBuffer(buf.writerIndex(), encodedLength));
buf.writerIndex(buf.writerIndex() + encodedLength);
ByteBuffer byteBuffer = buf.nioBuffer(buf.writerIndex(), buf.writableBytes());
b.serialize(byteBuffer);
buf.writerIndex(buf.writerIndex() + byteBuffer.position());
}

public static RoaringBitmap decode(ByteBuf buf) {
Expand Down Expand Up @@ -172,7 +184,11 @@ public static long[] decode(ByteBuf buf) {
}
}

/** Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding. */
/**
* Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding.
*
* @since 3.1.0
*/
public static class BitmapArrays {
public static int encodedLength(RoaringBitmap[] bitmaps) {
int totalLength = 4;
Expand Down
Expand Up @@ -363,4 +363,39 @@ public boolean useOldFetchProtocol() {
return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
}

/**
* Class name of the implementation of MergedShuffleFileManager that merges the blocks
* pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
* a cluster level because this configuration is set to
* 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
* To turn on push-based shuffle at a cluster level, set the configuration to
* 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
*/
public String mergedShuffleFileManagerImpl() {
return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
"org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
}

/**
* The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during
* push-based shuffle.
* A merged shuffle file consists of multiple small shuffle blocks. Fetching the
* complete merged shuffle file in a single response increases the memory requirements for the
otterc marked this conversation as resolved.
Show resolved Hide resolved
* clients. Instead of serving the entire merged file, the shuffle service serves the
* merged file in `chunks`. A `chunk` constitutes few shuffle blocks in entirety and this
* configuration controls how big a chunk can get. A corresponding index file for each merged
* shuffle file will be generated indicating chunk boundaries.
*/
public int minChunkSizeInMergedShuffleFile() {
return Ints.checkedCast(JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
otterc marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* The size of cache in memory which is used in push-based shuffle for storing merged index files.
*/
public long mergedIndexCacheSize() {
return JavaUtils.byteStringAsBytes(
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
}
}
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import org.roaringbitmap.RoaringBitmap;

import static org.junit.Assert.*;

/**
* Tests for {@link Encoders}.
*/
public class EncodersSuite {

@Test
public void testRoaringBitmapEncodeDecode() {
RoaringBitmap bitmap = new RoaringBitmap();
bitmap.add(1, 2, 3);
ByteBuf buf = Unpooled.buffer(Encoders.Bitmaps.encodedLength(bitmap));
Encoders.Bitmaps.encode(buf, bitmap);
RoaringBitmap decodedBitmap = Encoders.Bitmaps.decode(buf);
assertEquals(bitmap, decodedBitmap);
}

@Test (expected = java.nio.BufferOverflowException.class)
public void testRoaringBitmapEncodeShouldFailWhenBufferIsSmall() {
RoaringBitmap bitmap = new RoaringBitmap();
bitmap.add(1, 2, 3);
ByteBuf buf = Unpooled.buffer(4);
Encoders.Bitmaps.encode(buf, bitmap);
}

@Test
public void testBitmapArraysEncodeDecode() {
RoaringBitmap[] bitmaps = new RoaringBitmap[] {
new RoaringBitmap(),
new RoaringBitmap(),
new RoaringBitmap(), // empty
new RoaringBitmap(),
new RoaringBitmap()
};
bitmaps[0].add(1, 2, 3);
bitmaps[1].add(1, 2, 4);
bitmaps[3].add(7L, 9L);
bitmaps[4].add(1L, 100L);
ByteBuf buf = Unpooled.buffer(Encoders.BitmapArrays.encodedLength(bitmaps));
Encoders.BitmapArrays.encode(buf, bitmaps);
RoaringBitmap[] decodedBitmaps = Encoders.BitmapArrays.decode(buf);
assertArrayEquals(bitmaps, decodedBitmaps);
}
}
10 changes: 5 additions & 5 deletions common/network-shuffle/pom.xml
Expand Up @@ -47,6 +47,11 @@
<artifactId>metrics-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>

<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
Expand All @@ -70,11 +75,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
Expand Down
Expand Up @@ -21,14 +21,18 @@

import com.google.common.base.Throwables;

import org.apache.spark.annotation.Evolving;

/**
* Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
* and logged.
* Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
* - remaining retries < max retries
* - exception is an IOException
*
* @since 3.1.0
*/

@Evolving
public interface ErrorHandler {

boolean shouldRetryError(Throwable t);
Expand All @@ -44,6 +48,8 @@ default boolean shouldLogError(Throwable t) {

/**
* The error handler for pushing shuffle blocks to remote shuffle services.
*
* @since 3.1.0
*/
class BlockPushErrorHandler implements ErrorHandler {
/**
Expand Down
Expand Up @@ -68,7 +68,7 @@ public ExternalBlockHandler(TransportConf conf, File registeredExecutorFile)
throws IOException {
this(new OneForOneStreamManager(),
new ExternalShuffleBlockResolver(conf, registeredExecutorFile),
new NoOpMergedShuffleFileManager());
new NoOpMergedShuffleFileManager(conf));
}

public ExternalBlockHandler(
Expand All @@ -89,7 +89,7 @@ public ExternalShuffleBlockResolver getBlockResolver() {
public ExternalBlockHandler(
OneForOneStreamManager streamManager,
ExternalShuffleBlockResolver blockManager) {
this(streamManager, blockManager, new NoOpMergedShuffleFileManager());
this(streamManager, blockManager, new NoOpMergedShuffleFileManager(null));
}

/** Enables mocking out the StreamManager, BlockManager, and MergeManager. */
Expand Down Expand Up @@ -175,7 +175,7 @@ protected void handleMessage(
RegisterExecutor msg = (RegisterExecutor) msgObj;
checkAuth(client, msg.appId);
blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
mergeManager.registerExecutor(msg.appId, msg.executorInfo.localDirs);
mergeManager.registerExecutor(msg.appId, msg.executorInfo);
callback.onSuccess(ByteBuffer.wrap(new byte[0]));
} finally {
responseDelayContext.stop();
Expand Down Expand Up @@ -232,6 +232,7 @@ public StreamManager getStreamManager() {
*/
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
mergeManager.applicationRemoved(appId, cleanupLocalDirs);
}

/**
Expand Down Expand Up @@ -430,8 +431,15 @@ public ManagedBuffer next() {
/**
* Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
* is not enabled.
*
* @since 3.1.0
*/
private static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {

// This constructor is needed because we use this constructor to instantiate an implementation
// of MergedShuffleFileManager using reflection.
// See YarnShuffleService#newMergedShuffleFileManagerInstance.
public NoOpMergedShuffleFileManager(TransportConf transportConf) {}

@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
Expand All @@ -444,18 +452,13 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
}

@Override
public void registerApplication(String appId, String user) {
// No-op. Do nothing.
}

@Override
public void registerExecutor(String appId, String[] localDirs) {
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
// No-Op. Do nothing.
}

@Override
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
// No-Op. Do nothing.
}

@Override
Expand Down
Expand Up @@ -34,6 +34,8 @@
* 1. Number of chunks in a merged shuffle block.
* 2. Bitmaps for each chunk in the merged block. A chunk bitmap contains all the mapIds that were
* merged to that merged block chunk.
*
* @since 3.1.0
*/
public class MergedBlockMeta {
private final int numChunks;
Expand Down
Expand Up @@ -19,21 +19,25 @@

import java.io.IOException;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;


/**
* The MergedShuffleFileManager is used to process push based shuffle when enabled. It works
* along side {@link ExternalBlockHandler} and serves as an RPCHandler for
* {@link org.apache.spark.network.server.RpcHandler#receiveStream}, where it processes the
* remotely pushed streams of shuffle blocks to merge them into merged shuffle files. Right
* now, support for push based shuffle is only implemented for external shuffle service in
* YARN mode.
*
* @since 3.1.0
*/
@Evolving
public interface MergedShuffleFileManager {
/**
* Provides the stream callback used to process a remotely pushed block. The callback is
Expand All @@ -56,25 +60,15 @@ public interface MergedShuffleFileManager {
MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException;

/**
* Registers an application when it starts. It also stores the username which is necessary
* for generating the host local directories for merged shuffle files.
* Right now, this is invoked by YarnShuffleService.
*
* @param appId application ID
* @param user username
*/
void registerApplication(String appId, String user);

/**
* Registers an executor with its local dir list when it starts. This provides the specific path
* so MergedShuffleFileManager knows where to store and look for shuffle data for a
* given application. It is invoked by the RPC call when executor tries to register with the
* local shuffle service.
* Registers an executor with MergedShuffleFileManager. This executor-info provides
* the directories and number of sub-dirs per dir so that MergedShuffleFileManager knows where to
* store and look for shuffle data for a given application. It is invoked by the RPC call when
* executor tries to register with the local shuffle service.
*
* @param appId application ID
* @param localDirs The list of local dirs that this executor gets granted from NodeManager
* @param executorInfo The list of local dirs that this executor gets granted from NodeManager
*/
void registerExecutor(String appId, String[] localDirs);
void registerExecutor(String appId, ExecutorShuffleInfo executorInfo);

/**
* Invoked when an application finishes. This cleans up any remaining metadata associated with
Expand Down
Expand Up @@ -35,10 +35,13 @@
* be merged instead of for fetching them from remote shuffle services. This is used by
* ShuffleWriter when the block push process is initiated. The supplied BlockFetchingListener
* is used to handle the success or failure in pushing each blocks.
*
* @since 3.1.0
*/
public class OneForOneBlockPusher {
private static final Logger logger = LoggerFactory.getLogger(OneForOneBlockPusher.class);
private static final ErrorHandler PUSH_ERROR_HANDLER = new ErrorHandler.BlockPushErrorHandler();
public static final String SHUFFLE_PUSH_BLOCK_PREFIX = "shufflePush";

private final TransportClient client;
private final String appId;
Expand Down Expand Up @@ -115,7 +118,13 @@ public void start() {
for (int i = 0; i < blockIds.length; i++) {
assert buffers.containsKey(blockIds[i]) : "Could not find the block buffer for block "
+ blockIds[i];
ByteBuffer header = new PushBlockStream(appId, blockIds[i], i).toByteBuffer();
String[] blockIdParts = blockIds[i].split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals(SHUFFLE_PUSH_BLOCK_PREFIX)) {
throw new IllegalArgumentException(
"Unexpected shuffle push block id format: " + blockIds[i]);
}
ByteBuffer header = new PushBlockStream(appId, Integer.parseInt(blockIdParts[1]),
Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
client.uploadStream(new NioManagedBuffer(header), buffers.get(blockIds[i]),
new BlockPushCallback(i, blockIds[i]));
}
Expand Down