Skip to content

Commit

Permalink
[SPARK-14731][shuffle]Revert SPARK-12130 to make 2.0 shuffle service …
Browse files Browse the repository at this point in the history
…compatible with 1.x

## What changes were proposed in this pull request?
SPARK-12130 make 2.0 shuffle service incompatible with 1.x. So from discussion: [http://apache-spark-developers-list.1001551.n3.nabble.com/YARN-Shuffle-service-and-its-compatibility-td17222.html](url) we should maintain compatibility between Spark 1.x and Spark 2.x's shuffle service.
I put string comparison into executor's register at first avoid string comparison in getBlockData every time.

## How was this patch tested?
N/A

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #12568 from lianhuiwang/SPARK-14731.
  • Loading branch information
lianhuiwang authored and Marcelo Vanzin committed Apr 25, 2016
1 parent 425f691 commit 6bfe42a
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public class ExternalShuffleBlockResolver {
@VisibleForTesting
final DB db;

private final List<String> knownManagers = Arrays.asList(
"org.apache.spark.shuffle.sort.SortShuffleManager",
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
throws IOException {
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
Expand Down Expand Up @@ -149,6 +153,10 @@ public void registerExecutor(
ExecutorShuffleInfo executorInfo) {
AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
if (!knownManagers.contains(executorInfo.shuffleManager)) {
throw new UnsupportedOperationException(
"Unsupported shuffle manager of executor: " + executorInfo);
}
try {
if (db != null) {
byte[] key = dbAppExecKey(fullId);
Expand Down Expand Up @@ -183,12 +191,7 @@ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}

if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
} else {
throw new UnsupportedOperationException(
"Unsupported shuffle manager: " + executor.shuffleManager);
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public void onBlockFetchFailure(String blockId, Throwable t) {

// Register an executor so that the next steps work.
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
new String[] { System.getProperty("java.io.tmpdir") }, 1,
"org.apache.spark.shuffle.sort.SortShuffleManager");
RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public class ExternalShuffleBlockResolverSuite {
private static final String sortBlock0 = "Hello!";
private static final String sortBlock1 = "World!";
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";

private static TestShuffleDataContext dataContext;

Expand Down Expand Up @@ -71,8 +72,8 @@ public void testBadRequests() throws IOException {
}

// Invalid shuffle manager
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
try {
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
fail("Should have failed");
} catch (UnsupportedOperationException e) {
Expand All @@ -81,7 +82,7 @@ public void testBadRequests() throws IOException {

// Nonexistent shuffle block
resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo("sort"));
dataContext.createExecutorInfo(SORT_MANAGER));
try {
resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
fail("Should have failed");
Expand All @@ -94,7 +95,7 @@ public void testBadRequests() throws IOException {
public void testSortShuffleBlocks() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("sort"));
dataContext.createExecutorInfo(SORT_MANAGER));

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
Expand All @@ -120,7 +121,7 @@ public void jsonSerializationOfExecutorRegistration() throws IOException {
assertEquals(parsedAppId, appId);

ExecutorShuffleInfo shuffleInfo =
new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort");
new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, SORT_MANAGER);
String shuffleJson = mapper.writeValueAsString(shuffleInfo);
ExecutorShuffleInfo parsedShuffleInfo =
mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
Expand All @@ -131,7 +132,7 @@ public void jsonSerializationOfExecutorRegistration() throws IOException {
String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
"\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}";
"\"subDirsPerLocalDir\": 7, \"shuffleManager\": " + "\"" + SORT_MANAGER + "\"}";
assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,20 @@ public class ExternalShuffleCleanupSuite {
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";

@Test
public void noCleanupAndCleanup() throws IOException {
TestShuffleDataContext dataContext = createSomeData();

ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
resolver.applicationRemoved("app", false /* cleanup */);

assertStillThere(dataContext);

resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo(SORT_MANAGER));
resolver.applicationRemoved("app", true /* cleanup */);

assertCleanedUp(dataContext);
Expand All @@ -69,7 +70,7 @@ public void cleanupUsesExecutor() throws IOException {
ExternalShuffleBlockResolver manager =
new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);

manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
manager.applicationRemoved("app", true);

assertTrue(cleanupCalled.get());
Expand All @@ -87,8 +88,8 @@ public void cleanupMultipleExecutors() throws IOException {
ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);

resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));
resolver.applicationRemoved("app", true);

assertCleanedUp(dataContext0);
Expand All @@ -103,8 +104,8 @@ public void cleanupOnlyRemovedApp() throws IOException {
ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);

resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo(SORT_MANAGER));

resolver.applicationRemoved("app-nonexistent", true);
assertStillThere(dataContext0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@

public class ExternalShuffleIntegrationSuite {

static String APP_ID = "app-id";
static String SORT_MANAGER = "sort";
private static final String APP_ID = "app-id";
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";

// Executor 0 is sort-based
static TestShuffleDataContext dataContext0;
Expand Down Expand Up @@ -184,12 +184,9 @@ public void testFetchThreeSort() throws Exception {
exec0Fetch.releaseBuffers();
}

@Test
public void testFetchInvalidShuffle() throws Exception {
@Test (expected = RuntimeException.class)
public void testRegisterInvalidExecutor() throws Exception {
registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" });
assertTrue(execFetch.successBlocks.isEmpty());
assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private void validate(String appId, String secretKey, boolean encrypt) throws IO
client.init(appId);
// Registration either succeeds or throws an exception.
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
new ExecutorShuffleInfo(new String[0], 0, ""));
new ExecutorShuffleInfo(new String[0], 0,
"org.apache.spark.shuffle.sort.SortShuffleManager"));
client.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ import org.apache.spark.{ShuffleDependency, TaskContext}
*/
private[spark] trait ShuffleManager {

/** Return short name for the ShuffleManager */
val shortName: String

/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()

override val shortName: String = "sort"

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private[spark] class BlockManager(
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirs.map(_.toString),
diskBlockManager.subDirsPerLocalDir,
shuffleManager.shortName)
shuffleManager.getClass.getName)

val MAX_ATTEMPTS = 3
val SLEEP_TIME_SECS = 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo

class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down Expand Up @@ -87,8 +88,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd

val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)

val blockHandler = s1.blockHandler
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
Expand Down Expand Up @@ -158,8 +159,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd

val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)

val blockHandler = s1.blockHandler
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
Expand All @@ -186,7 +187,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s1.initializeApplication(app1Data)

val execStateFile = s1.registeredExecutorFile
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)

val blockHandler = s1.blockHandler
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
Expand Down Expand Up @@ -218,7 +219,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val app2Data: ApplicationInitializationContext =
new ApplicationInitializationContext("user", app2Id, null)
s2.initializeApplication(app2Data)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (Some(shuffleInfo2))
s2.stop()
Expand Down

0 comments on commit 6bfe42a

Please sign in to comment.