Skip to content

Commit

Permalink
YARN-8104. Add API to fetch node to attribute mapping. Contributed by…
Browse files Browse the repository at this point in the history
… Bibin A Chundatt.
  • Loading branch information
naga-apache authored and sunilgovind committed Sep 12, 2018
1 parent 0a01b13 commit 5dc7d6e
Show file tree
Hide file tree
Showing 28 changed files with 749 additions and 13 deletions.
Expand Up @@ -551,4 +551,10 @@ public Map<NodeAttribute, Set<String>> getAttributesToNodes(
Set<NodeAttribute> attributes) throws YarnException, IOException {
return client.getAttributesToNodes(attributes);
}

@Override
public Map<String, Set<NodeAttribute>> getNodeToAttributes(
Set<String> hostNames) throws YarnException, IOException {
return client.getNodeToAttributes(hostNames);
}
}
Expand Up @@ -104,6 +104,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
Expand Down Expand Up @@ -538,6 +540,12 @@ public GetClusterNodeAttributesResponse getClusterNodeAttributes(
throws YarnException, IOException {
return null;
}

@Override
public GetNodesToAttributesResponse getNodesToAttributes(
GetNodesToAttributesRequest request) throws YarnException, IOException {
return null;
}
}

class HistoryService extends AMService implements HSClientProtocol {
Expand Down
Expand Up @@ -43,6 +43,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
Expand Down Expand Up @@ -679,4 +681,20 @@ GetAttributesToNodesResponse getAttributesToNodes(
GetClusterNodeAttributesResponse getClusterNodeAttributes(
GetClusterNodeAttributesRequest request)
throws YarnException, IOException;

/**
* <p>
* The interface used by client to get node to attributes mappings.
* in existing cluster.
* </p>
*
* @param request request to get nodes to attributes mapping.
* @return nodes to attributes mappings.
* @throws YarnException if any error happens inside YARN.
* @throws IOException
*/
@Public
@Unstable
GetNodesToAttributesResponse getNodesToAttributes(
GetNodesToAttributesRequest request) throws YarnException, IOException;
}
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.api.protocolrecords;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.util.Records;

import java.util.Set;

/**
* <p>
* The request from clients to get nodes to attributes mapping
* in the cluster from the <code>ResourceManager</code>.
* </p>
*
* @see ApplicationClientProtocol#getNodesToAttributes
* (GetNodesToAttributesRequest)
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class GetNodesToAttributesRequest {

public static GetNodesToAttributesRequest newInstance(Set<String> hostNames) {
GetNodesToAttributesRequest request =
Records.newRecord(GetNodesToAttributesRequest.class);
request.setHostNames(hostNames);
return request;
}

/**
* Set hostnames for which mapping is required.
*
* @param hostnames
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract void setHostNames(Set<String> hostnames);

/**
* Get hostnames for which mapping is required.
*
* @return Set<String> of hostnames.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract Set<String> getHostNames();
}
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.api.protocolrecords;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.util.Records;

import java.util.Map;
import java.util.Set;

/**
* <p>
* The response sent by the <code>ResourceManager</code> to a client requesting
* nodes to attributes mapping.
* </p>
*
* @see ApplicationClientProtocol#getNodesToAttributes
* (GetNodesToAttributesRequest)
*/
@Public
@Evolving
public abstract class GetNodesToAttributesResponse {

public static GetNodesToAttributesResponse newInstance(
Map<String, Set<NodeAttribute>> map) {
GetNodesToAttributesResponse response =
Records.newRecord(GetNodesToAttributesResponse.class);
response.setNodeToAttributes(map);
return response;
}

@Public
@Evolving
public abstract void setNodeToAttributes(Map<String, Set<NodeAttribute>> map);

/**
* Get hostnames to NodeAttributes mapping.
*
* @return Map<String, Set<NodeAttribute>> host to attributes.
*/
@Public
@Evolving
public abstract Map<String, Set<NodeAttribute>> getNodeToAttributes();
}
Expand Up @@ -66,4 +66,5 @@ service ApplicationClientProtocolService {
rpc getResourceTypeInfo(GetAllResourceTypeInfoRequestProto) returns (GetAllResourceTypeInfoResponseProto);
rpc getClusterNodeAttributes (GetClusterNodeAttributesRequestProto) returns (GetClusterNodeAttributesResponseProto);
rpc getAttributesToNodes (GetAttributesToNodesRequestProto) returns (GetAttributesToNodesResponseProto);
rpc getNodesToAttributes (GetNodesToAttributesRequestProto) returns (GetNodesToAttributesResponseProto);
}
Expand Up @@ -144,11 +144,6 @@ message NodesToAttributesMappingRequestProto {
optional bool failOnUnknownNodes = 3;
}

message NodeToAttributesProto {
optional string node = 1;
repeated NodeAttributeProto nodeAttributes = 2;
}

message NodesToAttributesMappingResponseProto {
}
//////////////////////////////////////////////////////////////////
Expand Down
Expand Up @@ -388,6 +388,11 @@ message AttributeToNodesProto {
repeated string hostnames = 2;
}

message NodeToAttributesProto {
optional string node = 1;
repeated NodeAttributeProto nodeAttributes = 2;
}

enum ContainerTypeProto {
APPLICATION_MASTER = 1;
TASK = 2;
Expand Down
Expand Up @@ -272,7 +272,15 @@ message GetAttributesToNodesRequestProto {
}

message GetAttributesToNodesResponseProto {
repeated AttributeToNodesProto attributeToNodes = 1;
repeated AttributeToNodesProto attributesToNodes = 1;
}

message GetNodesToAttributesRequestProto {
repeated string hostnames = 1;
}

message GetNodesToAttributesResponseProto {
repeated NodeToAttributesProto nodesToAttributes = 1;
}

message UpdateApplicationPriorityRequestProto {
Expand Down
Expand Up @@ -934,4 +934,23 @@ public abstract Set<NodeAttribute> getClusterAttributes()
@Unstable
public abstract Map<NodeAttribute, Set<String>> getAttributesToNodes(
Set<NodeAttribute> attributes) throws YarnException, IOException;

/**
* <p>
* The interface used by client to get all node to attribute mapping in
* existing cluster.
* </p>
*
* @param hostNames HostNames for which host to attributes mapping has to
* be retrived.If empty or null is set then will return
* all nodes to attributes mapping in cluster.
* @return Node to attribute mappings
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
public abstract Map<String, Set<NodeAttribute>> getNodeToAttributes(
Set<String> hostNames) throws YarnException, IOException;

}
Expand Up @@ -69,6 +69,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
Expand Down Expand Up @@ -994,4 +995,12 @@ public Map<NodeAttribute, Set<String>> getAttributesToNodes(
GetAttributesToNodesRequest.newInstance(attributes);
return rmClient.getAttributesToNodes(request).getAttributesToNodes();
}

@Override
public Map<String, Set<NodeAttribute>> getNodeToAttributes(
Set<String> hostNames) throws YarnException, IOException {
GetNodesToAttributesRequest request =
GetNodesToAttributesRequest.newInstance(hostNames);
return rmClient.getNodesToAttributes(request).getNodeToAttributes();
}
}
Expand Up @@ -65,6 +65,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
Expand Down Expand Up @@ -133,6 +135,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewReservationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewReservationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToAttributesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToAttributesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoRequestPBImpl;
Expand Down Expand Up @@ -710,4 +714,18 @@ public GetClusterNodeAttributesResponse getClusterNodeAttributes(
return null;
}
}

@Override
public GetNodesToAttributesResponse getNodesToAttributes(
GetNodesToAttributesRequest request) throws YarnException, IOException {
YarnServiceProtos.GetNodesToAttributesRequestProto requestProto =
((GetNodesToAttributesRequestPBImpl) request).getProto();
try {
return new GetNodesToAttributesResponsePBImpl(
proxy.getNodesToAttributes(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
Expand Down Expand Up @@ -98,6 +99,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewReservationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewReservationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToAttributesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToAttributesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoRequestPBImpl;
Expand Down Expand Up @@ -193,6 +196,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetResourceProfileResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodeAttributesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAttributesToNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToAttributesResponseProto;

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
Expand Down Expand Up @@ -738,4 +742,21 @@ public GetAttributesToNodesResponseProto getAttributesToNodes(
throw new ServiceException(ie);
}
}

@Override
public GetNodesToAttributesResponseProto getNodesToAttributes(
RpcController controller,
YarnServiceProtos.GetNodesToAttributesRequestProto proto)
throws ServiceException {
GetNodesToAttributesRequestPBImpl req =
new GetNodesToAttributesRequestPBImpl(proto);
try {
GetNodesToAttributesResponse resp = real.getNodesToAttributes(req);
return ((GetNodesToAttributesResponsePBImpl) resp).getProto();
} catch (YarnException ye) {
throw new ServiceException(ye);
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
}
Expand Up @@ -66,7 +66,7 @@ private void initAttributesToNodes() {
}
YarnServiceProtos.GetAttributesToNodesResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<AttributeToNodesProto> list = p.getAttributeToNodesList();
List<AttributeToNodesProto> list = p.getAttributesToNodesList();
this.attributesToNodes = new HashMap<>();

for (AttributeToNodesProto c : list) {
Expand All @@ -87,7 +87,7 @@ private void maybeInitBuilder() {

private void addAttributesToNodesToProto() {
maybeInitBuilder();
builder.clearAttributeToNodes();
builder.clearAttributesToNodes();
if (attributesToNodes == null) {
return;
}
Expand Down Expand Up @@ -119,7 +119,7 @@ public boolean hasNext() {
return iter.hasNext();
}
};
builder.addAllAttributeToNodes(iterable);
builder.addAllAttributesToNodes(iterable);
}

private NodeAttributePBImpl convertFromProtoFormat(NodeAttributeProto p) {
Expand Down

0 comments on commit 5dc7d6e

Please sign in to comment.