diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
index 6bd1a39eea414..9dbdc41481a3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
@@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
@@ -62,7 +64,7 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
@Private
@Idempotent
- public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException;
@Private
@@ -72,7 +74,7 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
@Private
@Idempotent
- public RefreshSuperUserGroupsConfigurationResponse
+ public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws StandbyException, YarnException, IOException;
@@ -94,16 +96,16 @@ public RefreshAdminAclsResponse refreshAdminAcls(
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request)
throws YarnException, IOException;
-
+
/**
*
The interface used by admin to update nodes' resources to the
* ResourceManager
.
- *
- * The admin client is required to provide details such as a map from
- * {@link NodeId} to {@link ResourceOption} required to update resources on
+ *
+ *
The admin client is required to provide details such as a map from
+ * {@link NodeId} to {@link ResourceOption} required to update resources on
* a list of RMNode in ResourceManager etc.
* via the {@link UpdateNodeResourceRequest}.
- *
+ *
* @param request request to update resource for a node in cluster.
* @return (empty) response on accepting update.
* @throws YarnException exceptions from yarn servers.
@@ -124,17 +126,17 @@ public RefreshNodesResourcesResponse refreshNodesResources(
@Idempotent
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request) throws YarnException, IOException;
-
+
@Private
@Idempotent
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException;
-
+
@Private
@Idempotent
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException;
-
+
@Private
@Idempotent
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
@@ -153,4 +155,14 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
NodesToAttributesMappingResponse mapAttributesToNodes(
NodesToAttributesMappingRequest request) throws YarnException,
IOException;
+
+ /**
+ * Access the RM database(s) from Admin CLI
+ * @param request - Database access request
+ * @return Database access response
+ */
+ @Private
+ @Idempotent
+ DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request)
+ throws IOException, YarnException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DBRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DBRecord.java
new file mode 100644
index 0000000000000..c14b8e64b18dc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DBRecord.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * DBRecord is setup as a Key Value pair because RM data is stored in this way
+ */
+public abstract class DBRecord {
+
+ public static DBRecord newInstance(String key, String value) {
+ DBRecord record = Records.newRecord(DBRecord.class);
+ record.setKey(key);
+ record.setValue(value);
+ return record;
+ }
+
+ public abstract void setKey(String key);
+
+ public abstract void setValue(String value);
+
+ public abstract String getKey();
+
+ public abstract String getValue();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DatabaseAccessRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DatabaseAccessRequest.java
new file mode 100644
index 0000000000000..1a81c396e1fe8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DatabaseAccessRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+
+/**
+ * The request used by Admins to access the underlying RM database
+ * The request contains details of the operation (get / set / delete), database (RMStateStore or YarnConfigurationStore), key and value
+ */
+public abstract class DatabaseAccessRequest {
+
+ /**
+ *
+ * @param operation - get / set / del
+ * @param database - unique identifier for the database to query
+ * @param key - key to query
+ * @param value - value to set
+ *
+ * @return DatabaseAccessRequest
+ *
+ * Can add dataStore later if required to access other datastores like ZK / mysql etc
+ */
+ public static DatabaseAccessRequest newInstance(String operation, String database, String key, String value) {
+ DatabaseAccessRequest request = Records.newRecord(DatabaseAccessRequest.class);
+ request.setOperation(operation);
+ request.setDatabase(database);
+ request.setKey(key);
+ request.setValue(value);
+ return request;
+ }
+
+ public abstract void setOperation(String operation);
+
+ public abstract void setDatabase(String database);
+
+ public abstract void setKey(String key);
+
+ public abstract void setValue(String value);
+
+ public abstract String getOperation();
+
+ public abstract String getDatabase();
+
+ public abstract String getKey();
+
+ public abstract String getValue();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DatabaseAccessResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DatabaseAccessResponse.java
new file mode 100644
index 0000000000000..71694ef70877a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DatabaseAccessResponse.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import java.util.List;
+import org.apache.hadoop.yarn.util.Records;
+
+
+public abstract class DatabaseAccessResponse {
+
+ public static DatabaseAccessResponse newInstance(List records) {
+ DatabaseAccessResponse response = Records.newRecord(DatabaseAccessResponse.class);
+ response.setRecords(records);
+ return response;
+ }
+
+ public abstract void setRecords(List records);
+
+ public abstract List getRecords();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto
index 2ec08aa1b268e..4f14d5cadc0df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto
@@ -47,4 +47,5 @@ service ResourceManagerAdministrationProtocolService {
rpc checkForDecommissioningNodes(CheckForDecommissioningNodesRequestProto) returns (CheckForDecommissioningNodesResponseProto);
rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto);
rpc mapAttributesToNodes(NodesToAttributesMappingRequestProto) returns (NodesToAttributesMappingResponseProto);
+ rpc accessDatabase(DatabaseAccessRequestProto) returns (DatabaseAccessResponseProto);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index f2145ca73d0b5..a379e9214d238 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -165,3 +165,19 @@ message ActiveRMInfoProto {
optional string clusterId = 1;
optional string rmId = 2;
}
+
+message DatabaseAccessRequestProto {
+ required string operation = 1;
+ required string database = 2;
+ required string key = 3;
+ optional string value = 4;
+}
+
+message DatabaseAccessResponseProto {
+ repeated DBRecordProto records = 1;
+}
+
+message DBRecordProto {
+ required string key = 1;
+ required string value = 2;
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 19d03a7da7341..72d768d160643 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -29,10 +29,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -105,8 +105,8 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
-import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
+import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -147,11 +147,10 @@
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
-
-import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
@Private
@Unstable
public class YarnClientImpl extends YarnClient {
@@ -337,20 +336,20 @@ public YarnClientApplication createApplication()
int pollCount = 0;
long startTime = System.currentTimeMillis();
- EnumSet waitingStates =
+ EnumSet waitingStates =
EnumSet.of(YarnApplicationState.NEW,
YarnApplicationState.NEW_SAVING,
YarnApplicationState.SUBMITTED);
- EnumSet failToSubmitStates =
+ EnumSet failToSubmitStates =
EnumSet.of(YarnApplicationState.FAILED,
- YarnApplicationState.KILLED);
+ YarnApplicationState.KILLED);
while (true) {
try {
ApplicationReport appReport = getApplicationReport(applicationId);
YarnApplicationState state = appReport.getYarnApplicationState();
if (!waitingStates.contains(state)) {
if(failToSubmitStates.contains(state)) {
- throw new YarnException("Failed to submit " + applicationId +
+ throw new YarnException("Failed to submit " + applicationId +
" to YARN : " + appReport.getDiagnostics());
}
LOG.info("Submitted application {}", applicationId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index 465f1ad7c3b7d..c33ce0fb6948e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -62,9 +62,12 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DBRecord;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -90,7 +93,7 @@
@Unstable
public class RMAdminCLI extends HAAdmin {
- private final RecordFactory recordFactory =
+ private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
static CommonNodeLabelsManager localNodeLabelsManager = null;
private static final String NO_MAPPING_ERR_MSG =
@@ -166,6 +169,23 @@ public class RMAdminCLI extends HAAdmin {
+ " \n\t\tor\n\t\t[NodeID] [resourcetypes] "
+ "([OvercommitTimeout]). ",
"Update resource on specific node."))
+ .put("-accessDataStore",
+ new UsageInfo("get|set|del [database] [key] [value]",
+ "Access the underlying levelDB data store through CRUD APIs.\n\t\t"
+ + "'key' should be a complete key which will return / update / delete 1 or 0 records. \n\t\t"
+ + "'value' is optional & ignored for get & del calls. It is mandatory for set calls\n\t\t"
+ + "If 'database' is 'yarn-rm-state'"
+ + " it will provide access to apps, app attempts, delegation tokens, master keys, reservations, AM RM tokens.\n\t\t\t"
+ + "'key' for this database is 'RMAppRoot/', RMAppRoot//, ReservationSystemRoot//,"
+ + " RMDTSecretManagerRoot/RMDelegationToken_, RMDTSecretManagerRoot/DelegationKey_,"
+ + " RMDTSecretManagerRoot/RMDTSequentialNumber, AMRMTokenSecretManagerRoot."
+ + " Additional metadata keys are 'RMVersionNode', 'EpochNode' \n\t\t\t"
+ + "Most of the values for these keys are binary data & hence get / set will not provide any useful info. But keys can be deleted.\n\t\t"
+ + "If 'database' is 'yarn-conf-store' "
+ + " it will provide access to the scheduler conf & its related internal keys.\n\t\t\t"
+ + "'key' for this database is the config name which is seen in /scheduler-conf or scheduler xml file."
+ + " Additional metadata keys are 'version', 'log'"
+ ))
.build();
public RMAdminCLI() {
@@ -286,7 +306,8 @@ private static void printHelp(String cmd, boolean isHAEnabled) {
+ " [-refreshClusterMaxPriority]"
+ " [-updateNodeResource [NodeID] [MemSize] [vCores]"
+ " ([OvercommitTimeout]) or -updateNodeResource [NodeID] "
- + "[ResourceTypes] ([OvercommitTimeout])]");
+ + "[ResourceTypes] ([OvercommitTimeout])]"
+ + " [-accessDataStore get|set|del [database] [key] [value]]");
if (isHAEnabled) {
appendHAUsage(summary);
}
@@ -329,7 +350,7 @@ private static void printUsage(String cmd, boolean isHAEnabled) {
ToolRunner.printGenericCommandUsage(System.err);
}
-
+
protected ResourceManagerAdministrationProtocol createAdminProtocol()
throws IOException {
// Get the current configuration
@@ -341,7 +362,7 @@ protected ResourceManagerAdministrationProtocol createAdminProtocol()
private int refreshQueues() throws IOException, YarnException {
// Refresh the queue properties
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
- RefreshQueuesRequest request =
+ RefreshQueuesRequest request =
recordFactory.newRecordInstance(RefreshQueuesRequest.class);
adminProtocol.refreshQueues(request);
return 0;
@@ -433,35 +454,35 @@ private int refreshUserToGroupsMappings() throws IOException,
YarnException {
// Refresh the user-to-groups mappings
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
- RefreshUserToGroupsMappingsRequest request =
+ RefreshUserToGroupsMappingsRequest request =
recordFactory.newRecordInstance(RefreshUserToGroupsMappingsRequest.class);
adminProtocol.refreshUserToGroupsMappings(request);
return 0;
}
-
+
private int refreshSuperUserGroupsConfiguration() throws IOException,
YarnException {
// Refresh the super-user groups
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
- RefreshSuperUserGroupsConfigurationRequest request =
+ RefreshSuperUserGroupsConfigurationRequest request =
recordFactory.newRecordInstance(RefreshSuperUserGroupsConfigurationRequest.class);
adminProtocol.refreshSuperUserGroupsConfiguration(request);
return 0;
}
-
+
private int refreshAdminAcls() throws IOException, YarnException {
// Refresh the admin acls
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
- RefreshAdminAclsRequest request =
+ RefreshAdminAclsRequest request =
recordFactory.newRecordInstance(RefreshAdminAclsRequest.class);
adminProtocol.refreshAdminAcls(request);
return 0;
}
-
+
private int refreshServiceAcls() throws IOException, YarnException {
// Refresh the service acls
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
- RefreshServiceAclsRequest request =
+ RefreshServiceAclsRequest request =
recordFactory.newRecordInstance(RefreshServiceAclsRequest.class);
adminProtocol.refreshServiceAcls(request);
return 0;
@@ -505,7 +526,7 @@ private int getGroups(String[] usernames) throws IOException {
if (usernames.length == 0) {
usernames = new String[] { UserGroupInformation.getCurrentUser().getUserName() };
}
-
+
for (String username : usernames) {
StringBuilder sb = new StringBuilder();
sb.append(username + " :");
@@ -515,10 +536,10 @@ private int getGroups(String[] usernames) throws IOException {
}
System.out.println(sb);
}
-
+
return 0;
}
-
+
// Make it protected to make unit test can change it.
protected static synchronized CommonNodeLabelsManager
getNodeLabelManagerInstance(Configuration conf) {
@@ -607,7 +628,7 @@ private int handleRemoveFromClusterNodeLabels(String[] args, String cmd,
return 0;
}
-
+
private Map> buildNodeLabelsMapFromStr(String args) {
Map> map = new HashMap>();
@@ -644,7 +665,7 @@ private Map> buildNodeLabelsMapFromStr(String args) {
map.get(nodeId).add(splits[i].trim());
}
}
-
+
int nLabels = map.get(nodeId).size();
Preconditions.checkArgument(nLabels <= 1, "%s labels specified on host=%s"
+ ", please note that we do not support specifying multiple"
@@ -778,6 +799,8 @@ public int run(String[] args) throws Exception {
exitCode = handleRemoveFromClusterNodeLabels(args, cmd, isHAEnabled);
} else if ("-replaceLabelsOnNode".equals(cmd)) {
exitCode = handleReplaceLabelsOnNodes(args, cmd, isHAEnabled);
+ } else if ("-accessDataStore".equals(cmd)) {
+ exitCode = handleDBAccess(args, cmd);
} else {
exitCode = -1;
System.err.println(cmd.substring(1) + ": Unknown command");
@@ -787,6 +810,7 @@ public int run(String[] args) throws Exception {
} catch (IllegalArgumentException arge) {
exitCode = -1;
System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
+ arge.printStackTrace();
printUsage(cmd, isHAEnabled);
} catch (RemoteException e) {
//
@@ -796,16 +820,16 @@ public int run(String[] args) throws Exception {
try {
String[] content;
content = e.getLocalizedMessage().split("\n");
- System.err.println(cmd.substring(1) + ": "
- + content[0]);
+ System.err.println(cmd.substring(1) + ": " + content[0]);
+ e.printStackTrace();
} catch (Exception ex) {
- System.err.println(cmd.substring(1) + ": "
- + ex.getLocalizedMessage());
+ System.err.println(cmd.substring(1) + ": " + ex.getLocalizedMessage());
+ ex.printStackTrace();
}
} catch (Exception e) {
exitCode = -1;
- System.err.println(cmd.substring(1) + ": "
- + e.getLocalizedMessage());
+ System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage());
+ e.printStackTrace();
}
if (null != localNodeLabelsManager) {
localNodeLabelsManager.stop();
@@ -956,6 +980,61 @@ private Resource parseCommandAndCreateResource(String resourceTypes) {
return resource;
}
+ private int handleDBAccess(String[] args, String cmd)
+ throws IOException, YarnException {
+ boolean invalidParam = false;
+ String operation = null;
+ String database = null;
+ String key = null;
+ String value = null;
+
+ if (args.length < 4) {
+ invalidParam = true;
+ } else {
+ operation = args[1];
+ if (operation.equals("set")) {
+ if (args.length != 5 || StringUtils.isEmpty(args[4])) {
+ invalidParam = true;
+ } else {
+ value = args[4];
+ }
+ } else if (operation.equals("get") || operation.equals("del")) {
+ if (args.length != 4) {
+ invalidParam = true;
+ }
+ } else {
+ invalidParam = true;
+ }
+
+ database = args[2];
+ key = args[3];
+ if (StringUtils.isEmpty(key) || StringUtils.isEmpty(database)) {
+ invalidParam = true;
+ }
+ }
+
+ if (invalidParam) {
+ System.err.println("Number of parameters specified for accessDataStore is wrong. " + Arrays.toString(args));
+ StringBuilder helpMsg = new StringBuilder();
+ buildHelpMsg(cmd, helpMsg);
+ System.out.println(helpMsg);
+ return -1;
+ }
+ return accessDB(operation, database, key, value);
+ }
+
+ private int accessDB(String operation, String database, String key, String value)
+ throws IOException, YarnException {
+ ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
+ DatabaseAccessRequest request = DatabaseAccessRequest.newInstance(operation, database, key, value);
+ DatabaseAccessResponse response = adminProtocol.accessDatabase(request);
+ List records = response.getRecords();
+ for(DBRecord record : records) {
+ System.out.println(record.getKey() + " = " + record.getValue());
+ }
+ return 0;
+ }
+
private int validateTimeout(String strTimeout) {
int timeout;
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
index 6eb1f2fc986a6..f3120b2359269 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
@@ -18,38 +18,25 @@
package org.apache.hadoop.yarn.client.cli;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceTarget;
-import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -65,6 +52,9 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DBRecord;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -81,12 +71,14 @@
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
public class TestRMAdminCLI {
@@ -588,6 +580,51 @@ public void testGetGroups() throws Exception {
}
}
+ @Test
+ public void testDatabaseAccess() throws Exception {
+ when(admin.accessDatabase(Matchers.any())).thenReturn(
+ DatabaseAccessResponse.newInstance(new ArrayList<>())
+ );
+ String[] args = {"-accessDataStore", "get", "db", "key"};
+ assertEquals(0, rmAdminCLI.run(args));
+
+ args = new String[]{"-accessDataStore", "set", "db", "key", "value"};
+ assertEquals(0, rmAdminCLI.run(args));
+
+ args = new String[]{"-accessDataStore", "del", "db", "key"};
+ assertEquals(0, rmAdminCLI.run(args));
+
+ verify(admin, times(3)).accessDatabase(Matchers.any());
+ }
+
+ @Test
+ public void testDatabaseAccessInvalidInput() throws Exception {
+
+ when(admin.accessDatabase(Matchers.any())).thenReturn(
+ DatabaseAccessResponse.newInstance(new ArrayList())
+ );
+
+ String[] args = {"-accessDataStore", "get", "db"};
+ assertEquals(-1, rmAdminCLI.run(args));
+
+ args = new String[]{"-accessDataStore", "set", "db", "key"};
+ assertEquals(-1, rmAdminCLI.run(args));
+
+ args = new String[]{"-accessDataStore", "del", "db"};
+ assertEquals(-1, rmAdminCLI.run(args));
+
+ args = new String[]{"-accessDataStore", "get", "db", ""};
+ assertEquals(-1, rmAdminCLI.run(args));
+
+ args = new String[]{"-accessDataStore", "set", "", "key", "val"};
+ assertEquals(-1, rmAdminCLI.run(args));
+
+ args = new String[]{"-accessDataStore", "invalidcmd", "db", "key", "val"};
+ assertEquals(-1, rmAdminCLI.run(args));
+
+ verify(admin, never()).accessDatabase(Matchers.any());
+ }
+
@Test
public void testTransitionToActive() throws Exception {
String[] args = {"-transitionToActive", "rm1"};
@@ -708,7 +745,8 @@ public void testHelp() throws Exception {
"[-directlyAccessNodeLabelStore] [-refreshClusterMaxPriority] " +
"[-updateNodeResource [NodeID] [MemSize] [vCores] "
+ "([OvercommitTimeout]) or -updateNodeResource "
- + "[NodeID] [ResourceTypes] ([OvercommitTimeout])] "
+ + "[NodeID] [ResourceTypes] ([OvercommitTimeout])] " +
+ "[-accessDataStore get|set|del [database] [key] [value]] "
+ "[-help [cmd]]"));
assertTrue(dataOut
.toString()
@@ -802,6 +840,7 @@ public void testHelp() throws Exception {
+ "([OvercommitTimeout]) "
+ "or -updateNodeResource [NodeID] [ResourceTypes] "
+ "([OvercommitTimeout])] "
+ + "[-accessDataStore get|set|del [database] [key] [value]] "
+ "[-transitionToActive [--forceactive] ] "
+ "[-transitionToStandby ] "
+ "[-getServiceState ] [-getAllServiceState] "
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
index 20729a3cc8a73..6866fe6961940 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
@@ -21,16 +21,17 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
-
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DatabaseAccessRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.NodesToAttributesMappingRequestProto;
@@ -51,6 +52,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@@ -79,6 +82,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DatabaseAccessRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DatabaseAccessResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
@@ -104,16 +109,14 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
-
@Private
public class ResourceManagerAdministrationProtocolPBClientImpl implements ResourceManagerAdministrationProtocol, Closeable {
private ResourceManagerAdministrationProtocolPB proxy;
-
- public ResourceManagerAdministrationProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
+
+ public ResourceManagerAdministrationProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,
Configuration conf) throws IOException {
- RPC.setProtocolEngine(conf, ResourceManagerAdministrationProtocolPB.class,
+ RPC.setProtocolEngine(conf, ResourceManagerAdministrationProtocolPB.class,
ProtobufRpcEngine2.class);
proxy = (ResourceManagerAdministrationProtocolPB)RPC.getProxy(
ResourceManagerAdministrationProtocolPB.class, clientVersion, addr, conf);
@@ -129,7 +132,7 @@ public void close() {
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnException, IOException {
- RefreshQueuesRequestProto requestProto =
+ RefreshQueuesRequestProto requestProto =
((RefreshQueuesRequestPBImpl)request).getProto();
try {
return new RefreshQueuesResponsePBImpl(
@@ -143,7 +146,7 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException, IOException {
- RefreshNodesRequestProto requestProto =
+ RefreshNodesRequestProto requestProto =
((RefreshNodesRequestPBImpl)request).getProto();
try {
return new RefreshNodesResponsePBImpl(
@@ -158,7 +161,7 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws YarnException, IOException {
- RefreshSuperUserGroupsConfigurationRequestProto requestProto =
+ RefreshSuperUserGroupsConfigurationRequestProto requestProto =
((RefreshSuperUserGroupsConfigurationRequestPBImpl)request).getProto();
try {
return new RefreshSuperUserGroupsConfigurationResponsePBImpl(
@@ -173,7 +176,7 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request) throws YarnException,
IOException {
- RefreshUserToGroupsMappingsRequestProto requestProto =
+ RefreshUserToGroupsMappingsRequestProto requestProto =
((RefreshUserToGroupsMappingsRequestPBImpl)request).getProto();
try {
return new RefreshUserToGroupsMappingsResponsePBImpl(
@@ -187,7 +190,7 @@ public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
- RefreshAdminAclsRequestProto requestProto =
+ RefreshAdminAclsRequestProto requestProto =
((RefreshAdminAclsRequestPBImpl)request).getProto();
try {
return new RefreshAdminAclsResponsePBImpl(
@@ -202,7 +205,7 @@ public RefreshAdminAclsResponse refreshAdminAcls(
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnException,
IOException {
- RefreshServiceAclsRequestProto requestProto =
+ RefreshServiceAclsRequestProto requestProto =
((RefreshServiceAclsRequestPBImpl)request).getProto();
try {
return new RefreshServiceAclsResponsePBImpl(proxy.refreshServiceAcls(
@@ -215,7 +218,7 @@ public RefreshServiceAclsResponse refreshServiceAcls(
@Override
public String[] getGroupsForUser(String user) throws IOException {
- GetGroupsForUserRequestProto requestProto =
+ GetGroupsForUserRequestProto requestProto =
GetGroupsForUserRequestProto.newBuilder().setUser(user).build();
try {
GetGroupsForUserResponseProto responseProto =
@@ -343,4 +346,16 @@ public NodesToAttributesMappingResponse mapAttributesToNodes(
return null;
}
}
+
+ @Override
+ public DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request)
+ throws IOException, YarnException {
+ DatabaseAccessRequestProto requestProto = ((DatabaseAccessRequestPBImpl) request).getProto();
+ try {
+ return new DatabaseAccessResponsePBImpl(proxy.accessDatabase(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java
index 3c2f1619cbf6a..09b31b6d6d827 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
@@ -57,6 +58,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
@@ -75,6 +78,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DatabaseAccessRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DatabaseAccessResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodesToAttributesMappingResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
@@ -107,11 +112,11 @@
public class ResourceManagerAdministrationProtocolPBServiceImpl implements ResourceManagerAdministrationProtocolPB {
private ResourceManagerAdministrationProtocol real;
-
+
public ResourceManagerAdministrationProtocolPBServiceImpl(ResourceManagerAdministrationProtocol impl) {
this.real = impl;
}
-
+
@Override
public RefreshQueuesResponseProto refreshQueues(RpcController controller,
RefreshQueuesRequestProto proto) throws ServiceException {
@@ -130,7 +135,7 @@ public RefreshQueuesResponseProto refreshQueues(RpcController controller,
public RefreshAdminAclsResponseProto refreshAdminAcls(
RpcController controller, RefreshAdminAclsRequestProto proto)
throws ServiceException {
- RefreshAdminAclsRequestPBImpl request =
+ RefreshAdminAclsRequestPBImpl request =
new RefreshAdminAclsRequestPBImpl(proto);
try {
RefreshAdminAclsResponse response = real.refreshAdminAcls(request);
@@ -157,15 +162,15 @@ public RefreshNodesResponseProto refreshNodes(RpcController controller,
}
@Override
- public RefreshSuperUserGroupsConfigurationResponseProto
+ public RefreshSuperUserGroupsConfigurationResponseProto
refreshSuperUserGroupsConfiguration(
RpcController controller,
RefreshSuperUserGroupsConfigurationRequestProto proto)
throws ServiceException {
- RefreshSuperUserGroupsConfigurationRequestPBImpl request =
+ RefreshSuperUserGroupsConfigurationRequestPBImpl request =
new RefreshSuperUserGroupsConfigurationRequestPBImpl(proto);
try {
- RefreshSuperUserGroupsConfigurationResponse response =
+ RefreshSuperUserGroupsConfigurationResponse response =
real.refreshSuperUserGroupsConfiguration(request);
return ((RefreshSuperUserGroupsConfigurationResponsePBImpl)response).getProto();
} catch (YarnException e) {
@@ -179,10 +184,10 @@ public RefreshNodesResponseProto refreshNodes(RpcController controller,
public RefreshUserToGroupsMappingsResponseProto refreshUserToGroupsMappings(
RpcController controller, RefreshUserToGroupsMappingsRequestProto proto)
throws ServiceException {
- RefreshUserToGroupsMappingsRequestPBImpl request =
+ RefreshUserToGroupsMappingsRequestPBImpl request =
new RefreshUserToGroupsMappingsRequestPBImpl(proto);
try {
- RefreshUserToGroupsMappingsResponse response =
+ RefreshUserToGroupsMappingsResponse response =
real.refreshUserToGroupsMappings(request);
return ((RefreshUserToGroupsMappingsResponsePBImpl)response).getProto();
} catch (YarnException e) {
@@ -196,10 +201,10 @@ public RefreshUserToGroupsMappingsResponseProto refreshUserToGroupsMappings(
public RefreshServiceAclsResponseProto refreshServiceAcls(
RpcController controller, RefreshServiceAclsRequestProto proto)
throws ServiceException {
- RefreshServiceAclsRequestPBImpl request =
+ RefreshServiceAclsRequestPBImpl request =
new RefreshServiceAclsRequestPBImpl(proto);
try {
- RefreshServiceAclsResponse response =
+ RefreshServiceAclsResponse response =
real.refreshServiceAcls(request);
return ((RefreshServiceAclsResponsePBImpl)response).getProto();
} catch (YarnException e) {
@@ -226,11 +231,11 @@ public GetGroupsForUserResponseProto getGroupsForUser(
throw new ServiceException(e);
}
}
-
+
@Override
public UpdateNodeResourceResponseProto updateNodeResource(RpcController controller,
UpdateNodeResourceRequestProto proto) throws ServiceException {
- UpdateNodeResourceRequestPBImpl request =
+ UpdateNodeResourceRequestPBImpl request =
new UpdateNodeResourceRequestPBImpl(proto);
try {
UpdateNodeResourceResponse response = real.updateNodeResource(request);
@@ -359,4 +364,18 @@ public NodesToAttributesMappingResponseProto mapAttributesToNodes(
throw new ServiceException(e);
}
}
+
+ @Override
+ public YarnServerResourceManagerServiceProtos.DatabaseAccessResponseProto accessDatabase(
+ RpcController controller,
+ YarnServerResourceManagerServiceProtos.DatabaseAccessRequestProto proto)
+ throws ServiceException {
+ DatabaseAccessRequest request = new DatabaseAccessRequestPBImpl(proto);
+ try {
+ DatabaseAccessResponse response = real.accessDatabase(request);
+ return ((DatabaseAccessResponsePBImpl) response).getProto();
+ } catch (IOException | YarnException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DBRecordPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DBRecordPBImpl.java
new file mode 100644
index 0000000000000..ab2da8d1553e6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DBRecordPBImpl.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DBRecordProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DBRecordProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DBRecord;
+
+public class DBRecordPBImpl extends DBRecord {
+
+ DBRecordProto proto = DBRecordProto.getDefaultInstance();
+ DBRecordProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public DBRecordPBImpl() {
+ builder = DBRecordProto.newBuilder();
+ }
+
+ public DBRecordPBImpl(DBRecordProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public DBRecordProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public void setKey(String key) {
+ maybeInitBuilder();
+ if (key != null) {
+ builder.setKey(key);
+ } else {
+ builder.clearKey();
+ }
+ }
+
+ @Override
+ public void setValue(String value) {
+ maybeInitBuilder();;
+ if (value != null) {
+ builder.setValue(value);
+ } else {
+ builder.clearValue();
+ }
+ }
+
+ @Override
+ public String getKey() {
+ DBRecordProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasKey() ? p.getKey() : null;
+ }
+
+ @Override
+ public String getValue() {
+ DBRecordProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasValue() ? p.getValue() : null;
+ }
+
+ private synchronized void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = DBRecordProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DatabaseAccessRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DatabaseAccessRequestPBImpl.java
new file mode 100644
index 0000000000000..a94550666f54c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DatabaseAccessRequestPBImpl.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DatabaseAccessRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DatabaseAccessRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+
+public class DatabaseAccessRequestPBImpl extends DatabaseAccessRequest {
+
+ DatabaseAccessRequestProto proto = DatabaseAccessRequestProto.getDefaultInstance();
+ DatabaseAccessRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public DatabaseAccessRequestPBImpl() {
+ builder = DatabaseAccessRequestProto.newBuilder();
+ }
+
+ public DatabaseAccessRequestPBImpl(DatabaseAccessRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public DatabaseAccessRequestProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public void setOperation(String operation) {
+ maybeInitBuilder();
+ if (operation != null) {
+ builder.setOperation(operation);
+ } else {
+ builder.clearOperation();
+ }
+ }
+
+ @Override
+ public void setDatabase(String database) {
+ maybeInitBuilder();
+ if (database != null) {
+ builder.setDatabase(database);
+ } else {
+ builder.clearDatabase();
+ }
+ }
+
+ @Override
+ public void setKey(String key) {
+ maybeInitBuilder();
+ if (key != null) {
+ builder.setKey(key);
+ } else {
+ builder.clearKey();
+ }
+ }
+
+ @Override
+ public void setValue(String value) {
+ maybeInitBuilder();
+ if (value != null) {
+ builder.setValue(value);
+ } else {
+ builder.clearValue();
+ }
+ }
+
+ @Override
+ public String getOperation() {
+ DatabaseAccessRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasOperation() ? p.getOperation() : null;
+ }
+
+ @Override
+ public String getDatabase() {
+ DatabaseAccessRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasDatabase() ? p.getDatabase() : null;
+ }
+
+ @Override
+ public String getKey() {
+ DatabaseAccessRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasKey() ? p.getKey() : null;
+ }
+
+ @Override
+ public String getValue() {
+ DatabaseAccessRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.hasValue() ? p.getValue() : null;
+ }
+
+ private synchronized void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = DatabaseAccessRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DatabaseAccessResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DatabaseAccessResponsePBImpl.java
new file mode 100644
index 0000000000000..6954d00c44f81
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DatabaseAccessResponsePBImpl.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DBRecordProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DatabaseAccessResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DatabaseAccessResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DBRecord;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
+
+public class DatabaseAccessResponsePBImpl extends DatabaseAccessResponse {
+
+ DatabaseAccessResponseProto proto = DatabaseAccessResponseProto.getDefaultInstance();
+ DatabaseAccessResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ List dbRecords;
+
+ public DatabaseAccessResponsePBImpl() {
+ builder = DatabaseAccessResponseProto.newBuilder();
+ }
+
+ public DatabaseAccessResponsePBImpl(DatabaseAccessResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public DatabaseAccessResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ private void initDBRecords() {
+ if (this.dbRecords != null) {
+ return;
+ }
+ DatabaseAccessResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List dbRecordPBS = p.getRecordsList();
+ this.dbRecords = new ArrayList<>();
+ for (YarnServerResourceManagerServiceProtos.DBRecordProto record : dbRecordPBS) {
+ this.dbRecords.add(new DBRecordPBImpl(record));
+ }
+ }
+
+ @Override
+ public void setRecords(List records) {
+ if (this.dbRecords != null) {
+ dbRecords.clear();
+ } else {
+ dbRecords = new ArrayList<>();
+ }
+ dbRecords.addAll(records);
+ }
+
+ @Override
+ public List getRecords() {
+ initDBRecords();
+ return this.dbRecords;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.dbRecords != null) {
+ addDbRecordsToProto();
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = DatabaseAccessResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addDbRecordsToProto() {
+ maybeInitBuilder();
+ builder.clearRecords();
+ if (dbRecords == null) {
+ return;
+ }
+ Iterable iterable = new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = dbRecords.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public DBRecordProto next() {
+ DBRecord record = iter.next();
+ return ((DBRecordPBImpl)record).getProto();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllRecords(iterable);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 40ddbf6b56c29..fb1ed621349a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -144,6 +144,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
@@ -904,6 +906,12 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
return RefreshClusterMaxPriorityResponse.newInstance();
}
+ @Override
+ public DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request)
+ throws IOException {
+ return null;
+ }
+
@Override
public String[] getGroupsForUser(String user) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 0d04f9b55aafb..13de0394fde11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -28,10 +28,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
@@ -52,6 +51,7 @@
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
@@ -70,6 +70,9 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DBRecord;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
@@ -96,16 +99,20 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.KVStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.LevelDbStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.LeveldbConfigurationStore;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
-
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.thirdparty.protobuf.BlockingService;
+import org.iq80.leveldb.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AdminService extends CompositeService implements
HAServiceProtocol, ResourceManagerAdministrationProtocol {
@@ -133,6 +140,9 @@ public class AdminService extends CompositeService implements
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
+ // Map of database name to corresponding KVStore instance
+ Map kvStoreMap = new ConcurrentHashMap<>();
+
public AdminService(ResourceManager rm) {
super(AdminService.class.getName());
this.rm = rm;
@@ -223,6 +233,9 @@ protected void stopServer() throws Exception {
if (this.server != null) {
this.server.stop();
}
+ for (KVStore store : kvStoreMap.values()) {
+ store.close();
+ }
}
private UserGroupInformation checkAccess(String method) throws IOException {
@@ -977,6 +990,76 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
}
}
+ @Override
+ public DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request)
+ throws IOException {
+
+ checkAccess("accessDatabase");
+
+ // Do not enable access if RM is started.
+ // This is to prevent inconsistencies between in memory data structures & the database
+ // in classes like RMStateStore & MutableCSConfigurationProvider
+ if (!rm.isInState(STATE.NOTINITED)) {
+ throw new IllegalStateException("RM is started & accessing DB is prohibited in this state");
+ }
+
+ if (request.getKey() == null || request.getDatabase() == null) {
+ throw new IllegalArgumentException("Invalid Request " + request);
+ }
+
+ KVStore store = getKVStore(request);
+
+ String charsetName = "UTF-8";
+ byte[] key = request.getKey().getBytes(charsetName);
+
+ List records = new ArrayList<>();
+ switch (request.getOperation()) {
+ case "get":
+ byte[] val = store.get(key);
+ if (val != null) {
+ DBRecord record = DBRecord.newInstance(request.getKey(), new String(val, charsetName));
+ records.add(record);
+ }
+ break;
+ case "set":
+ if (request.getValue() == null) {
+ throw new IllegalArgumentException("Value can't be null when inserting to database");
+ }
+ store.set(key, request.getValue().getBytes(charsetName));
+ break;
+ case "del":
+ store.del(key);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid Operation");
+ }
+
+ return DatabaseAccessResponse.newInstance(records);
+ }
+
+ private String getActualDatabase(DatabaseAccessRequest request) {
+ if (request.getDatabase().equals(LeveldbConfigurationStore.DB_NAME)) {
+ return getConfig().get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH) + "/" + request.getDatabase();
+ } else if (request.getDatabase().equals(LeveldbRMStateStore.DB_NAME)) {
+ return getConfig().get(YarnConfiguration.RM_LEVELDB_STORE_PATH) + "/" + request.getDatabase();
+ } else {
+ throw new IllegalArgumentException("Invalid Database " + request.getDatabase());
+ }
+ }
+
+ private KVStore getKVStore(DatabaseAccessRequest request) throws IOException {
+ String database = getActualDatabase(request);
+ if (!kvStoreMap.containsKey(database)) {
+ // Currently it enables access to only Level DB
+ Options options = new Options();
+ options.createIfMissing(false);
+ KVStore store = new LevelDbStore(database, options);
+ store.init();
+ kvStoreMap.putIfAbsent(database, store);
+ }
+ return kvStoreMap.get(database);
+ }
+
private void refreshClusterMaxPriority() throws IOException, YarnException {
Configuration conf =
getConfiguration(new Configuration(false),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index da500add4cbf6..09798b44f8f47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1756,6 +1756,8 @@ public static void main(String argv[]) {
} else if (argv[0].equals("-remove-application-from-state-store")
&& argv.length == 2) {
removeApplication(conf, argv[1]);
+ } else if (argv[0].equals("-safe-mode")) {
+ ResourceManager.startRMInSafeMode();
} else {
printUsage(System.err);
}
@@ -1773,6 +1775,34 @@ public static void main(String argv[]) {
}
}
+ /**
+ * When RM fails to function regularly / start,
+ * we want RM to start with basic functionality which can be used to correct & start it
+ * Possible applicable scenarios
+ * 1. DB Corruption - In case of embedded DB, its required to bring up RM to access the DB
+ * In safe mode, basic DB ops like CRUD are made accessible through CLI via AdminService
+ *
+ * This has been tested with LevelDB as RMStateStore & YarnConfigurationStore
+ */
+ static ResourceManager startRMInSafeMode() {
+
+ Configuration conf = new YarnConfiguration();
+
+ ResourceManager resourceManager = new ResourceManager();
+ RMContextImpl rmContext = new RMContextImpl();
+ rmContext.setResourceManager(resourceManager);
+ resourceManager.rmContext = rmContext;
+
+ AdminService adminService = resourceManager.createAdminService();
+ adminService.init(conf);
+ adminService.start();
+
+ resourceManager.adminService = adminService;
+ resourceManager.addService(adminService);
+
+ return resourceManager;
+ }
+
/**
* Register the handlers for alwaysOn services
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/KVStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/KVStore.java
new file mode 100644
index 0000000000000..4ca070b32d7d0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/KVStore.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.IOException;
+
+/**
+ * Interface to RM Databases (State Store, Conf Store) which provide a KV access
+ * This interface can eventually be used by RMStateStore, YarnConfigurationStore which provide
+ * application specific logic on top of general KV store
+ *
+ * Implementations / APIs of this interface should not be tied to any application logic and
+ * should be reusable across KV use cases
+ *
+ * It is acceptable for this to be tied to the Hadoop ecosystem (FileSystem, Conf, etc) as long as it
+ * solves the purpose to be reusable across use cases without being tied to app logic
+ */
+public interface KVStore {
+
+ public void init() throws IOException;
+
+ public byte[] get(byte[] key) throws IOException;
+
+ public void set(byte[] key, byte[] value) throws IOException;
+
+ public void del(byte[] key) throws IOException;
+
+ public void close() throws IOException;
+
+ // TODO - Add iterator / range APIs with a start key / prefix and a limit / end key
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LevelDbStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LevelDbStore.java
new file mode 100644
index 0000000000000..f661756277841
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LevelDbStore.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.File;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.LeveldbConfigurationStore;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Options;
+
+/**
+ * Generic wrapper for LevelDB which abstracts common logic required to interact with levelDB
+ * TODO
+ * 1. Refactor LeveldbRMStateStore and LeveldbConfigurationStore to use this
+ * 2. Evaluate moving to https://github.com/dain/leveldb which has an iterator API with prefix key support
+ */
+public class LevelDbStore implements KVStore {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(LeveldbRMStateStore.class);
+
+ private DB db;
+ private final String dbPath;
+ private final Options options;
+
+ public LevelDbStore(String dbPath, Options options) {
+ this.dbPath = dbPath;
+ this.options = options;
+
+ // LevelDB database can only be accessed through the comparator that was used to create it
+ // Conf Store uses a custom comparator and thus the same comparator needs to be used to access it again
+ if (dbPath.contains(LeveldbConfigurationStore.DB_NAME)) {
+ this.options.comparator(LeveldbConfigurationStore.getDBComparator());
+ }
+ }
+
+ /**
+ * Initialises the level db database
+ * Constructs the database if it doesn't exist with the necessary permissions
+ */
+ @Override
+ public void init() throws IOException {
+ File dbfile = new File(dbPath);
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ } catch (NativeDB.DBException e) {
+ LOG.error("Unable to open database with error : " + e.getMessage());
+ throw e;
+ }
+ }
+
+ @Override
+ public byte[] get(byte[] key) throws IOException {
+ try {
+ return db.get(key);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void set(byte[] key, byte[] value) throws IOException {
+ try {
+ db.put(key, value);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void del(byte[] key) throws IOException {
+ try {
+ db.delete(key);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (db != null) {
+ db.close();
+ db = null;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index 026f9e476a34f..d155ff9505c7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -78,7 +78,7 @@ public class LeveldbRMStateStore extends RMStateStore {
LoggerFactory.getLogger(LeveldbRMStateStore.class);
private static final String SEPARATOR = "/";
- private static final String DB_NAME = "yarn-rm-state";
+ public static final String DB_NAME = "yarn-rm-state";
private static final String RM_DT_MASTER_KEY_KEY_PREFIX =
RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX;
private static final String RM_DT_TOKEN_KEY_PREFIX =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 94b854dceb43c..483f898cbd161 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -326,5 +326,4 @@ public void deleteStore() throws Exception {
@Override
public void removeApplication(ApplicationId removeAppId) throws Exception {
}
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 6aa37f399e41a..67f9da0bee5d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -59,7 +59,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
public static final Logger LOG =
LoggerFactory.getLogger(LeveldbConfigurationStore.class);
- private static final String DB_NAME = "yarn-conf-store";
+ public static final String DB_NAME = "yarn-conf-store";
private static final String LOG_KEY = "log";
private static final String VERSION_KEY = "version";
private static final String CONF_VERSION_NAME = "conf-version-store";
@@ -105,19 +105,8 @@ public void format() throws Exception {
fs.delete(getStorageDir(DB_NAME), true);
}
- private void initDatabase() throws Exception {
- Path confVersion = createStorageDir(CONF_VERSION_NAME);
- Options confOptions = new Options();
- confOptions.createIfMissing(false);
- File confVersionFile = new File(confVersion.toString());
-
- versionDb = versionDbManager.initDatabase(confVersionFile, confOptions,
- this::initVersionDb);
-
- Path storeRoot = createStorageDir(DB_NAME);
- Options options = new Options();
- options.createIfMissing(false);
- options.comparator(new DBComparator() {
+ public static DBComparator getDBComparator() {
+ return new DBComparator() {
@Override
public int compare(byte[] key1, byte[] key2) {
String key1Str = new String(key1, StandardCharsets.UTF_8);
@@ -148,7 +137,23 @@ public byte[] findShortestSeparator(byte[] start, byte[] limit) {
public byte[] findShortSuccessor(byte[] key) {
return key;
}
- });
+ };
+ }
+
+ private void initDatabase() throws Exception {
+ Path confVersion = createStorageDir(CONF_VERSION_NAME);
+ Options confOptions = new Options();
+ confOptions.createIfMissing(false);
+ File confVersionFile = new File(confVersion.toString());
+
+ versionDb = versionDbManager.initDatabase(confVersionFile, confOptions,
+ this::initVersionDb);
+
+ Path storeRoot = createStorageDir(DB_NAME);
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.comparator(getDBComparator());
+
LOG.info("Using conf database at {}", storeRoot);
File dbFile = new File(storeRoot.toString());
db = dbManager.initDatabase(dbFile, options, this::initDb);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
index 457e9d47dd0b4..5297e68c7ab92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
@@ -18,10 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -33,17 +29,18 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.GroupMappingServiceProvider;
@@ -52,6 +49,9 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
@@ -66,8 +66,14 @@
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@@ -82,30 +88,26 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.LevelDbStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PROXY_USER_PREFIX;
-import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.NODES;
-import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.PREFIX;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.LeveldbConfigurationStore;
+import org.iq80.leveldb.Options;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
-
-import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
+import static org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
public class TestRMAdminService {
@@ -1542,6 +1544,65 @@ private void checkBadConfiguration(Configuration conf) {
}
}
+ @Test
+ public void testDatabaseAccessInSafeMode() throws IOException {
+ ResourceManager resourceManager = ResourceManager.startRMInSafeMode();
+
+ // Create the required dependencies - leveldb, local filesystem
+ String dbName = LeveldbConfigurationStore.DB_NAME;
+ String dbPath = configuration.get(RM_SCHEDCONF_STORE_PATH) + "/" + dbName;
+ FileSystem fs = FileSystem.getLocal(configuration);
+ fs.mkdirs(new Path(dbPath), new FsPermission((short) 0700));
+
+ try {
+ // Initialise database (create if it doesn't exist)
+ Options options = new Options();
+ options.createIfMissing(true);
+
+ LevelDbStore store = new LevelDbStore(dbPath, options);
+ store.init();
+ store.close();
+
+ // Verify the entry doesn't exist initially
+ DatabaseAccessRequest getRequest = DatabaseAccessRequest.newInstance("get", dbName, "key", null);
+ DatabaseAccessResponse response = resourceManager.adminService.accessDatabase(getRequest);
+ assertEquals(0, response.getRecords().size());
+
+ // Verify key is inserted correctly
+ DatabaseAccessRequest request = DatabaseAccessRequest.newInstance("set", dbName, "key", "val");
+ resourceManager.adminService.accessDatabase(request);
+ response = resourceManager.adminService.accessDatabase(getRequest);
+ assertEquals(1, response.getRecords().size());
+ assertEquals("val", response.getRecords().get(0).getValue());
+
+ // Verify key is deleted correctly
+ request = DatabaseAccessRequest.newInstance("del", dbName, "key", null);
+ resourceManager.adminService.accessDatabase(request);
+ response = resourceManager.adminService.accessDatabase(getRequest);
+ assertEquals(0, response.getRecords().size());
+ } finally {
+ resourceManager.stop();
+
+ // Cleanup datastore
+ File file = new File(dbPath);
+ FileUtils.deleteDirectory(file);
+ }
+
+ }
+
+ @Test
+ public void testDatabaseAccessFailsInRegularMode() {
+ try {
+ rm = new MockRM(configuration);
+ rm.init(configuration);
+ rm.start();
+ rm.adminService.accessDatabase(null);
+ fail("Database access should not be possible when RM is started");
+ } catch(Exception ex) {
+ assertEquals(ex.getClass().getName(), IllegalStateException.class.getName());
+ }
+ }
+
@Test(timeout = 30000)
public void testAdminAddToClusterNodeLabelsWithDeprecatedAPIs()
throws Exception, YarnException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
index b9c5500a7d20b..4142d5c6c110f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
@@ -18,14 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
@@ -33,6 +28,7 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -60,6 +56,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.*;
+import static org.junit.Assert.*;
+
public class TestResourceManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestResourceManager.class);
@@ -384,4 +383,21 @@ public void testUserProvidedUGIConf() throws Exception {
dummyResourceManager.stop();
}
}
+
+ /**
+ * Tests that RM object can be initialised in safe mode but RM service isn't started
+ * Also that Admin service is started
+ */
+ @Test
+ public void testRMInSafeMode() {
+ ResourceManager rm = ResourceManager.startRMInSafeMode();
+ try {
+ Assert.assertTrue(rm.isInState(Service.STATE.NOTINITED));
+ Assert.assertTrue(rm.adminService.isInState(Service.STATE.STARTED));
+ } finally {
+ rm.stop();
+ }
+ Assert.assertTrue(rm.isInState(Service.STATE.STOPPED));
+ Assert.assertTrue(rm.adminService.isInState(Service.STATE.STOPPED));
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
index 122782aef47ad..6592aacba3906 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
@@ -20,7 +20,7 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
-
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -32,6 +32,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@@ -59,8 +61,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.VisibleForTesting;
-
/**
* Extends the {@link AbstractRMAdminRequestInterceptor} class and provides an
* implementation that simply forwards the client requests to the cluster
@@ -222,4 +222,9 @@ public NodesToAttributesMappingResponse mapAttributesToNodes(
throws YarnException, IOException {
return rmAdminProxy.mapAttributesToNodes(request);
}
+
+ @Override
+ public DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request) throws IOException, YarnException {
+ return rmAdminProxy.accessDatabase(request);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
index c3cac82e38cb3..c53cf9388e228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -642,6 +644,11 @@ public NodesToAttributesMappingResponse mapAttributesToNodes(
throw new NotImplementedException();
}
+ @Override
+ public DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request) {
+ throw new NotImplementedException();
+ }
+
@Override
public String[] getGroupsForUser(String user) throws IOException {
return new String[0];
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
index 8024cdb82f126..a416a332edaa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Map;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -39,6 +40,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@@ -383,4 +386,9 @@ public NodesToAttributesMappingResponse mapAttributesToNodes(
RequestInterceptorChainWrapper pipeline = getInterceptorChain();
return pipeline.getRootInterceptor().mapAttributesToNodes(request);
}
+
+ @Override
+ public DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request) {
+ throw new NotImplementedException();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
index e3be25009fc18..4485be86efa3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
@@ -26,6 +26,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@@ -142,6 +144,12 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
return getNextInterceptor().refreshClusterMaxPriority(request);
}
+ @Override
+ public DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request)
+ throws IOException {
+ return null;
+ }
+
@Override
public String[] getGroupsForUser(String user) throws IOException {
return getNextInterceptor().getGroupsForUser(user);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
index 3e549398fec8d..c5f536128d146 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md
@@ -206,6 +206,7 @@ Usage: `yarn resourcemanager [-format-state-store]`
| -format-state-store | Formats the RMStateStore. This will clear the RMStateStore and is useful if past applications are no longer needed. This should be run only when the ResourceManager is not running. |
| -remove-application-from-state-store \ | Remove the application from RMStateStore. This should be run only when the ResourceManager is not running. |
| -format-conf-store | Formats the YarnConfigurationStore. This will clear the persisted scheduler configuration under YarnConfigurationStore. This should be run only when the ResourceManager is not running. |
+| -safe-mode | Starts RM in safe Mode with only Admin Service. This enables certain commands from RMAdminCLI like accessing the underlying RM Datastore. |
| -convert-fs-configuration [-y|yarnsiteconfig] [-f|fsconfig] [-r|rulesconfig] [-o|output-directory] [-p|print] [-c|cluster-resource] | WARNING: This feature is experimental and not intended for production use! Development is still in progress so the converter should not be considered complete!
Converts the specified Fair Scheduler configuration to Capacity Scheduler configuration. Requires two mandatory input files. First, the yarn-site.xml with the following format: [-y|yarnsiteconfig [\]. Secondly, the fair-scheduler.xml with the following format: [-f|fsconfig [\]. This config is not mandatory if there is a reference in yarn-site.xml to the fair-scheduler.xml with the property 'yarn.scheduler.fair.allocation.file'. If both are defined, the -f option has precedence. The output directory of the config files should be specified as well, with: \[-o|output-directory\ \]. An optional rules config file could be also specified with the following format: [-r|rulesconfig \]. The rule config file's format is a property file. There's an additional \[-p|print\] parameter, which is optional. If defined, the configuration will be emitted to the console instead. In its normal operation, the output files (yarn-site.xml and capacity-scheduler.xml) of this command is generated to the specified output directory. The cluster resource parameter (\[-c|cluster-resource\] \\]) needs to be specified if any queue has a maxResources setting with value as percentage. The format of the resource string is the same as in fair-scheduler.xml.) ] |
Start the ResourceManager