Skip to content

Commit

Permalink
Fixing HDFS state-store. Contributed by Arun Suresh.
Browse files Browse the repository at this point in the history
  • Loading branch information
vinoduec committed May 14, 2015
1 parent 6b710a4 commit 9a2a955
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 45 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;

import org.apache.commons.logging.Log;
Expand All @@ -38,6 +39,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand Down Expand Up @@ -84,7 +86,8 @@ public class FileSystemRMStateStore extends RMStateStore {
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
"AMRMTokenSecretManagerNode";

@VisibleForTesting
private static final String UNREADABLE_BY_SUPERUSER_XATTRIB =
"security.hdfs.unreadable.by.superuser";
protected FileSystem fs;
@VisibleForTesting
protected Configuration fsConf;
Expand All @@ -97,6 +100,7 @@ public class FileSystemRMStateStore extends RMStateStore {
private Path dtSequenceNumberPath = null;
private int fsNumRetries;
private long fsRetryInterval;
private boolean isHDFS;

@VisibleForTesting
Path fsWorkingPath;
Expand Down Expand Up @@ -141,11 +145,17 @@ protected synchronized void startInternal() throws Exception {
}

fs = fsWorkingPath.getFileSystem(fsConf);
isHDFS = fs.getScheme().toLowerCase().contains("hdfs");
mkdirsWithRetries(rmDTSecretManagerRoot);
mkdirsWithRetries(rmAppRoot);
mkdirsWithRetries(amrmTokenSecretManagerRoot);
}

@VisibleForTesting
void setIsHDFS(boolean isHDFS) {
this.isHDFS = isHDFS;
}

@Override
protected synchronized void closeInternal() throws Exception {
closeWithRetries();
Expand Down Expand Up @@ -175,9 +185,9 @@ protected synchronized void storeVersion() throws Exception {
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
if (existsWithRetries(versionNodePath)) {
updateFile(versionNodePath, data);
updateFile(versionNodePath, data, false);
} else {
writeFileWithRetries(versionNodePath, data);
writeFileWithRetries(versionNodePath, data, false);
}
}

Expand All @@ -194,12 +204,12 @@ public synchronized long getAndIncrementEpoch() throws Exception {
// increment epoch and store it
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
updateFile(epochNodePath, storeData);
updateFile(epochNodePath, storeData, false);
} else {
// initialize epoch file with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
writeFileWithRetries(epochNodePath, storeData);
writeFileWithRetries(epochNodePath, storeData, false);
}
return currentEpoch;
}
Expand Down Expand Up @@ -253,7 +263,9 @@ private void loadRMAppState(RMState rmState) throws Exception {
continue;
}
byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
childNodeStatus.getLen());
childNodeStatus.getLen());
// Set attribute if not already set
setUnreadableBySuperuserXattrib(childNodeStatus.getPath());
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -326,7 +338,7 @@ public boolean accept(Path path) {
assert newChildNodeStatus.isFile();
String newChildNodeName = newChildNodeStatus.getPath().getName();
String childNodeName = newChildNodeName.substring(
0, newChildNodeName.length() - ".new".length());
0, newChildNodeName.length() - ".new".length());
Path childNodePath =
new Path(newChildNodeStatus.getPath().getParent(), childNodeName);
replaceFile(newChildNodeStatus.getPath(), childNodePath);
Expand Down Expand Up @@ -394,7 +406,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
try {
// currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS
writeFileWithRetries(nodeCreatePath, appStateData);
writeFileWithRetries(nodeCreatePath, appStateData, true);
} catch (Exception e) {
LOG.info("Error storing info for app: " + appId, e);
throw e;
Expand All @@ -412,7 +424,7 @@ public synchronized void updateApplicationStateInternal(ApplicationId appId,
try {
// currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS
updateFile(nodeCreatePath, appStateData);
updateFile(nodeCreatePath, appStateData, true);
} catch (Exception e) {
LOG.info("Error updating info for app: " + appId, e);
throw e;
Expand All @@ -433,7 +445,7 @@ public synchronized void storeApplicationAttemptStateInternal(
try {
// currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS
writeFileWithRetries(nodeCreatePath, attemptStateData);
writeFileWithRetries(nodeCreatePath, attemptStateData, true);
} catch (Exception e) {
LOG.info("Error storing info for attempt: " + appAttemptId, e);
throw e;
Expand All @@ -454,7 +466,7 @@ public synchronized void updateApplicationAttemptStateInternal(
try {
// currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS
updateFile(nodeCreatePath, attemptStateData);
updateFile(nodeCreatePath, attemptStateData, true);
} catch (Exception e) {
LOG.info("Error updating info for attempt: " + appAttemptId, e);
throw e;
Expand Down Expand Up @@ -483,7 +495,7 @@ public synchronized void storeRMDelegationTokenState(
public synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier identifier) throws Exception {
Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
deleteFileWithRetries(nodeCreatePath);
}
Expand All @@ -505,10 +517,10 @@ private void storeOrUpdateRMDelegationTokenState(
new RMDelegationTokenIdentifierData(identifier, renewDate);
if (isUpdate) {
LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
updateFile(nodeCreatePath, identifierData.toByteArray());
updateFile(nodeCreatePath, identifierData.toByteArray(), true);
} else {
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
writeFileWithRetries(nodeCreatePath, identifierData.toByteArray(), true);

// store sequence number
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
Expand Down Expand Up @@ -539,7 +551,7 @@ public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
try (DataOutputStream fsOut = new DataOutputStream(os)) {
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
masterKey.write(fsOut);
writeFileWithRetries(nodeCreatePath, os.toByteArray());
writeFileWithRetries(nodeCreatePath, os.toByteArray(), true);
}
}

Expand Down Expand Up @@ -572,6 +584,16 @@ private Path getAppDir(Path root, ApplicationId appId) {
return getNodePath(root, appId.toString());
}

@VisibleForTesting
protected Path getAppDir(ApplicationId appId) {
return getAppDir(rmAppRoot, appId);
}

@VisibleForTesting
protected Path getAppAttemptDir(ApplicationAttemptId appAttId) {
return getNodePath(getAppDir(appAttId.getApplicationId()), appAttId
.toString());
}
// FileSystem related code

private boolean checkAndRemovePartialRecordWithRetries(final Path record)
Expand All @@ -594,12 +616,13 @@ public Void run() throws Exception {
}.runWithRetries();
}

private void writeFileWithRetries(final Path outputPath,final byte[] data)
throws Exception {
private void writeFileWithRetries(final Path outputPath, final byte[] data,
final boolean makeUnreadableByAdmin)
throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws Exception {
writeFile(outputPath, data);
writeFile(outputPath, data, makeUnreadableByAdmin);
return null;
}
}.runWithRetries();
Expand Down Expand Up @@ -746,14 +769,18 @@ private FileStatus getFileStatus(Path path) throws Exception {
* data to .tmp file and then rename it. Here we are assuming that rename is
* atomic for underlying file system.
*/
private void writeFile(Path outputPath, byte[] data) throws Exception {
protected void writeFile(Path outputPath, byte[] data, boolean
makeUnradableByAdmin) throws Exception {
Path tempPath =
new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
FSDataOutputStream fsOut = null;
// This file will be overwritten when app/attempt finishes for saving the
// final status.
try {
fsOut = fs.create(tempPath, true);
if (makeUnradableByAdmin) {
setUnreadableBySuperuserXattrib(tempPath);
}
fsOut.write(data);
fsOut.close();
fsOut = null;
Expand All @@ -768,10 +795,11 @@ private void writeFile(Path outputPath, byte[] data) throws Exception {
* data to .new file and then rename it. Here we are assuming that rename is
* atomic for underlying file system.
*/
protected void updateFile(Path outputPath, byte[] data) throws Exception {
protected void updateFile(Path outputPath, byte[] data, boolean
makeUnradableByAdmin) throws Exception {
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
// use writeFileWithRetries to make sure .new file is created atomically
writeFileWithRetries(newPath, data);
writeFileWithRetries(newPath, data, makeUnradableByAdmin);
replaceFile(newPath, outputPath);
}

Expand Down Expand Up @@ -810,9 +838,9 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
byte[] stateData = data.getProto().toByteArray();
if (isUpdate) {
updateFile(nodeCreatePath, stateData);
updateFile(nodeCreatePath, stateData, true);
} else {
writeFileWithRetries(nodeCreatePath, stateData);
writeFileWithRetries(nodeCreatePath, stateData, true);
}
}

Expand All @@ -825,4 +853,13 @@ public int getNumRetries() {
public long getRetryInterval() {
return fsRetryInterval;
}

private void setUnreadableBySuperuserXattrib(Path p)
throws IOException {
if (isHDFS &&
!fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
fs.setXAttr(p, UNREADABLE_BY_SUPERUSER_XATTRIB, null,
EnumSet.of(XAttrSetFlag.CREATE));
}
}
}
Expand Up @@ -112,6 +112,12 @@ public EventHandler getEventHandler() {

}

public static class StoreStateVerifier {
void afterStoreApp(RMStateStore store, ApplicationId appId) {}
void afterStoreAppAttempt(RMStateStore store, ApplicationAttemptId
appAttId) {}
}

interface RMStateStoreHelper {
RMStateStore getRMStateStore() throws Exception;
boolean isFinalStateValid() throws Exception;
Expand Down Expand Up @@ -173,14 +179,20 @@ protected ContainerId storeAttempt(RMStateStore store,
when(mockAttempt.getRMAppAttemptMetrics())
.thenReturn(mockRmAppAttemptMetrics);
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
.thenReturn(new AggregateAppResourceUsage(0,0));
.thenReturn(new AggregateAppResourceUsage(0, 0));
dispatcher.attemptId = attemptId;
store.storeNewApplicationAttempt(mockAttempt);
waitNotify(dispatcher);
return container.getId();
}

void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
throws Exception {
testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());
}

void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
StoreStateVerifier verifier)
throws Exception {
long submitTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis() + 1234;
Expand All @@ -205,6 +217,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
.toApplicationAttemptId("appattempt_1352994193343_0001_000001");
ApplicationId appId1 = attemptId1.getApplicationId();
storeApp(store, appId1, submitTime, startTime);
verifier.afterStoreApp(store, appId1);

// create application token and client token key for attempt1
Token<AMRMTokenIdentifier> appAttemptToken1 =
Expand Down Expand Up @@ -236,6 +249,7 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
storeApp(store, appIdRemoved, submitTime, startTime);
storeAttempt(store, attemptIdRemoved,
"container_1352994193343_0002_01_000001", null, null, dispatcher);
verifier.afterStoreAppAttempt(store, attemptIdRemoved);

RMApp mockRemovedApp = mock(RMApp.class);
RMAppAttemptMetrics mockRmAppAttemptMetrics =
Expand Down

0 comments on commit 9a2a955

Please sign in to comment.