Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -159,8 +160,7 @@ protected void serviceInit(Configuration conf) throws Exception {
// If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
Expand Down Expand Up @@ -196,7 +196,7 @@ protected void serviceInit(Configuration conf) throws Exception {

private void createSecretManager() throws IOException {
secretManager = new ShuffleSecretManager();
secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME);
secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);

// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf);
Expand Down Expand Up @@ -328,37 +328,59 @@ public void setRecoveryPath(Path recoveryPath) {
}

/**
* Get the recovery path, this will override the default one to get our own maintained
* recovery path.
* Get the path specific to this auxiliary service to use for recovery.
*/
protected Path getRecoveryPath(String fileName) {
return _recoveryPath;
}

/**
* Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled
* when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
* it will uses a YARN local dir.
*/
protected Path getRecoveryPath() {
protected File initRecoveryDb(String dbFileName) {
if (_recoveryPath != null) {
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
if (recoveryFile.exists()) {
return recoveryFile;
}
}
// db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but I'd swap the logic around and first check whether a DB exists in the recovery path, and if it does, just ignore local dirs altogether. Just to avoid doing unneeded work when we know the recovery path is already being used.

for (String dir : localDirs) {
File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
File f = new File(new Path(dir).toUri().getPath(), dbFileName);
if (f.exists()) {
if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code.
_recoveryPath = new Path(dir);
return f;
} else {
// If NM recovery is enabled and the recovery file exists in old NM local dirs, which
// means old version of Spark already generated the recovery file, we should copy the
// old file in to a new recovery path for the compatibility.
if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
// Fail to move recovery file to new path
logger.error("Failed to move recovery file {} to the path {}",
RECOVERY_FILE_NAME, _recoveryPath.toString());
// If the recovery path is set then either NM recovery is enabled or another recovery
// DB has been initialized. If NM recovery is enabled and had set the recovery path
// make sure to move all DBs to the recovery path from the old NM local dirs.
// If another DB was initialized first just make sure all the DBs are in the same
// location.
File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName);
if (!newLoc.equals(f)) {
try {
Files.move(f.toPath(), newLoc.toPath());
} catch (Exception e) {
// Fail to move recovery file to new path, just continue on with new DB location
logger.error("Failed to move recovery file {} to the path {}",
dbFileName, _recoveryPath.toString(), e);
}
}
return newLoc;
}
break;
}
}

if (_recoveryPath == null) {
_recoveryPath = new Path(localDirs[0]);
}

return _recoveryPath;
return new File(_recoveryPath.toUri().getPath(), dbFileName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.stop()
}

test("moving recovery file form NM local dir to recovery path") {
test("moving recovery file from NM local dir to recovery path") {
// This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
// old recovery file to the new path to keep compatibility

// Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
// dir.
s1 = new YarnShuffleService
// set auth to true to test the secrets recovery
yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
val app1Data: ApplicationInitializationContext =
Expand All @@ -286,6 +288,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd

val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
val secretsFile = s1.secretsFile
secretsFile should not be (null)
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)

Expand All @@ -312,10 +316,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.init(yarnConfig)

val execStateFile2 = s2.registeredExecutorFile
val secretsFile2 = s2.secretsFile

recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
recoveryPath.toString should be (new Path(secretsFile2.getParentFile.toURI).toString)
eventually(timeout(10 seconds), interval(5 millis)) {
assert(!execStateFile.exists())
}
eventually(timeout(10 seconds), interval(5 millis)) {
assert(!secretsFile.exists())
}

val handler2 = s2.blockHandler
val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
Expand Down