Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DatabaseAccessResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
Expand Down Expand Up @@ -62,7 +64,7 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr

@Private
@Idempotent
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException;

@Private
Expand All @@ -72,7 +74,7 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)

@Private
@Idempotent
public RefreshSuperUserGroupsConfigurationResponse
public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws StandbyException, YarnException, IOException;
Expand All @@ -94,16 +96,16 @@ public RefreshAdminAclsResponse refreshAdminAcls(
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request)
throws YarnException, IOException;

/**
* <p>The interface used by admin to update nodes' resources to the
* <code>ResourceManager</code> </p>.
*
* <p>The admin client is required to provide details such as a map from
* {@link NodeId} to {@link ResourceOption} required to update resources on
*
* <p>The admin client is required to provide details such as a map from
* {@link NodeId} to {@link ResourceOption} required to update resources on
* a list of <code>RMNode</code> in <code>ResourceManager</code> etc.
* via the {@link UpdateNodeResourceRequest}.</p>
*
*
* @param request request to update resource for a node in cluster.
* @return (empty) response on accepting update.
* @throws YarnException exceptions from yarn servers.
Expand All @@ -124,17 +126,17 @@ public RefreshNodesResourcesResponse refreshNodesResources(
@Idempotent
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request) throws YarnException, IOException;

@Private
@Idempotent
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException;

@Private
@Idempotent
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException;

@Private
@Idempotent
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
Expand All @@ -153,4 +155,14 @@ public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
NodesToAttributesMappingResponse mapAttributesToNodes(
NodesToAttributesMappingRequest request) throws YarnException,
IOException;

/**
* Access the RM database(s) from Admin CLI
* @param request - Database access request
* @return Database access response
*/
@Private
@Idempotent
DatabaseAccessResponse accessDatabase(DatabaseAccessRequest request)
throws IOException, YarnException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.hadoop.yarn.util.Records;

/**
* DBRecord is setup as a Key Value pair because RM data is stored in this way
*/
public abstract class DBRecord {

public static DBRecord newInstance(String key, String value) {
DBRecord record = Records.newRecord(DBRecord.class);
record.setKey(key);
record.setValue(value);
return record;
}

public abstract void setKey(String key);

public abstract void setValue(String value);

public abstract String getKey();

public abstract String getValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.hadoop.yarn.util.Records;


/**
* The request used by Admins to access the underlying RM database
* The request contains details of the operation (get / set / delete), database (RMStateStore or YarnConfigurationStore), key and value
*/
public abstract class DatabaseAccessRequest {

/**
*
* @param operation - get / set / del
* @param database - unique identifier for the database to query
* @param key - key to query
* @param value - value to set
*
* @return DatabaseAccessRequest
*
* Can add dataStore later if required to access other datastores like ZK / mysql etc
*/
public static DatabaseAccessRequest newInstance(String operation, String database, String key, String value) {
DatabaseAccessRequest request = Records.newRecord(DatabaseAccessRequest.class);
request.setOperation(operation);
request.setDatabase(database);
request.setKey(key);
request.setValue(value);
return request;
}

public abstract void setOperation(String operation);

public abstract void setDatabase(String database);

public abstract void setKey(String key);

public abstract void setValue(String value);

public abstract String getOperation();

public abstract String getDatabase();

public abstract String getKey();

public abstract String getValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.api.protocolrecords;

import java.util.List;
import org.apache.hadoop.yarn.util.Records;


public abstract class DatabaseAccessResponse {

public static DatabaseAccessResponse newInstance(List<DBRecord> records) {
DatabaseAccessResponse response = Records.newRecord(DatabaseAccessResponse.class);
response.setRecords(records);
return response;
}

public abstract void setRecords(List<DBRecord> records);

public abstract List<DBRecord> getRecords();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ service ResourceManagerAdministrationProtocolService {
rpc checkForDecommissioningNodes(CheckForDecommissioningNodesRequestProto) returns (CheckForDecommissioningNodesResponseProto);
rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto);
rpc mapAttributesToNodes(NodesToAttributesMappingRequestProto) returns (NodesToAttributesMappingResponseProto);
rpc accessDatabase(DatabaseAccessRequestProto) returns (DatabaseAccessResponseProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,19 @@ message ActiveRMInfoProto {
optional string clusterId = 1;
optional string rmId = 2;
}

message DatabaseAccessRequestProto {
required string operation = 1;
required string database = 2;
required string key = 3;
optional string value = 4;
}

message DatabaseAccessResponseProto {
repeated DBRecordProto records = 1;
}

message DBRecordProto {
required string key = 1;
required string value = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -105,8 +105,8 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeReport;
Expand Down Expand Up @@ -147,11 +147,10 @@
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Private
@Unstable
public class YarnClientImpl extends YarnClient {
Expand Down Expand Up @@ -337,20 +336,20 @@ public YarnClientApplication createApplication()

int pollCount = 0;
long startTime = System.currentTimeMillis();
EnumSet<YarnApplicationState> waitingStates =
EnumSet<YarnApplicationState> waitingStates =
EnumSet.of(YarnApplicationState.NEW,
YarnApplicationState.NEW_SAVING,
YarnApplicationState.SUBMITTED);
EnumSet<YarnApplicationState> failToSubmitStates =
EnumSet<YarnApplicationState> failToSubmitStates =
EnumSet.of(YarnApplicationState.FAILED,
YarnApplicationState.KILLED);
YarnApplicationState.KILLED);
while (true) {
try {
ApplicationReport appReport = getApplicationReport(applicationId);
YarnApplicationState state = appReport.getYarnApplicationState();
if (!waitingStates.contains(state)) {
if(failToSubmitStates.contains(state)) {
throw new YarnException("Failed to submit " + applicationId +
throw new YarnException("Failed to submit " + applicationId +
" to YARN : " + appReport.getDiagnostics());
}
LOG.info("Submitted application {}", applicationId);
Expand Down
Loading