Skip to content

Commit

Permalink
YARN-5799. Fix Opportunistic Allocation to set the correct value of N…
Browse files Browse the repository at this point in the history
…ode Http Address. (asuresh)
  • Loading branch information
xslogic committed Oct 29, 2016
1 parent 1c8ab41 commit aa3cab1
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 82 deletions.
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;


import java.util.List; import java.util.List;
Expand Down Expand Up @@ -58,9 +57,10 @@ public static DistributedSchedulingAllocateResponse newInstance(


@Public @Public
@Unstable @Unstable
public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling); public abstract void setNodesForScheduling(
List<RemoteNode> nodesForScheduling);


@Public @Public
@Unstable @Unstable
public abstract List<NodeId> getNodesForScheduling(); public abstract List<RemoteNode> getNodesForScheduling();
} }
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;


Expand Down Expand Up @@ -99,10 +98,11 @@ public abstract void setRegisterResponse(


@Public @Public
@Unstable @Unstable
public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling); public abstract void setNodesForScheduling(
List<RemoteNode> nodesForScheduling);


@Public @Public
@Unstable @Unstable
public abstract List<NodeId> getNodesForScheduling(); public abstract List<RemoteNode> getNodesForScheduling();


} }
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.api.protocolrecords;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;

/**
* This class is used to encapsulate the {@link NodeId} as well as the HTTP
* address that can be used to communicate with the Node.
*/
@Private
@Unstable
public abstract class RemoteNode implements Comparable<RemoteNode> {

/**
* Create new Instance.
* @param nodeId NodeId.
* @param httpAddress Http address.
* @return RemoteNode instance.
*/
@Private
@Unstable
public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
remoteNode.setNodeId(nodeId);
remoteNode.setHttpAddress(httpAddress);
return remoteNode;
}

/**
* Get {@link NodeId}.
* @return NodeId.
*/
@Private
@Unstable
public abstract NodeId getNodeId();

/**
* Set {@link NodeId}.
* @param nodeId NodeId.
*/
@Private
@Unstable
public abstract void setNodeId(NodeId nodeId);

/**
* Get HTTP address.
* @return Http Address.
*/
@Private
@Unstable
public abstract String getHttpAddress();

/**
* Set HTTP address.
* @param httpAddress HTTP address.
*/
@Private
@Unstable
public abstract void setHttpAddress(String httpAddress);

/**
* Use the underlying {@link NodeId} comparator.
* @param other RemoteNode.
* @return Comparison.
*/
@Override
public int compareTo(RemoteNode other) {
return this.getNodeId().compareTo(other.getNodeId());
}
}
Expand Up @@ -21,12 +21,13 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;


import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;



import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;

import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
Expand All @@ -45,7 +46,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
boolean viaProto = false; boolean viaProto = false;


private AllocateResponse allocateResponse; private AllocateResponse allocateResponse;
private List<NodeId> nodesForScheduling; private List<RemoteNode> nodesForScheduling;


public DistributedSchedulingAllocateResponsePBImpl() { public DistributedSchedulingAllocateResponsePBImpl() {
builder = YarnServerCommonServiceProtos. builder = YarnServerCommonServiceProtos.
Expand Down Expand Up @@ -86,8 +87,8 @@ private synchronized void mergeLocalToProto() {
private synchronized void mergeLocalToBuilder() { private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) { if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling(); builder.clearNodesForScheduling();
Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable( Iterable<YarnServerCommonServiceProtos.RemoteNodeProto> iterable =
this.nodesForScheduling); getNodeIdProtoIterable(this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable); builder.addAllNodesForScheduling(iterable);
} }
if (this.allocateResponse != null) { if (this.allocateResponse != null) {
Expand Down Expand Up @@ -123,7 +124,7 @@ public AllocateResponse getAllocateResponse() {
} }


@Override @Override
public void setNodesForScheduling(List<NodeId> nodesForScheduling) { public void setNodesForScheduling(List<RemoteNode> nodesForScheduling) {
maybeInitBuilder(); maybeInitBuilder();
if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
if (this.nodesForScheduling != null) { if (this.nodesForScheduling != null) {
Expand All @@ -137,7 +138,7 @@ public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
} }


@Override @Override
public List<NodeId> getNodesForScheduling() { public List<RemoteNode> getNodesForScheduling() {
if (nodesForScheduling != null) { if (nodesForScheduling != null) {
return nodesForScheduling; return nodesForScheduling;
} }
Expand All @@ -149,33 +150,34 @@ private synchronized void initLocalNodesForSchedulingList() {
YarnServerCommonServiceProtos. YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProtoOrBuilder p = DistributedSchedulingAllocateResponseProtoOrBuilder p =
viaProto ? proto : builder; viaProto ? proto : builder;
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList(); List<YarnServerCommonServiceProtos.RemoteNodeProto> list =
p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>(); nodesForScheduling = new ArrayList<>();
if (list != null) { if (list != null) {
for (YarnProtos.NodeIdProto t : list) { for (YarnServerCommonServiceProtos.RemoteNodeProto t : list) {
nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); nodesForScheduling.add(new RemoteNodePBImpl(t));
} }
} }
} }


private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable( private synchronized Iterable<RemoteNodeProto> getNodeIdProtoIterable(
final List<NodeId> nodeList) { final List<RemoteNode> nodeList) {
maybeInitBuilder(); maybeInitBuilder();
return new Iterable<YarnProtos.NodeIdProto>() { return new Iterable<RemoteNodeProto>() {
@Override @Override
public synchronized Iterator<YarnProtos.NodeIdProto> iterator() { public synchronized Iterator<RemoteNodeProto> iterator() {
return new Iterator<YarnProtos.NodeIdProto>() { return new Iterator<RemoteNodeProto>() {


Iterator<NodeId> iter = nodeList.iterator(); Iterator<RemoteNode> iter = nodeList.iterator();


@Override @Override
public boolean hasNext() { public boolean hasNext() {
return iter.hasNext(); return iter.hasNext();
} }


@Override @Override
public YarnProtos.NodeIdProto next() { public RemoteNodeProto next() {
return ProtoUtils.convertToProtoFormat(iter.next()); return ((RemoteNodePBImpl)iter.next()).getProto();
} }


@Override @Override
Expand All @@ -186,5 +188,4 @@ public void remove() {
} }
}; };
} }

} }
Expand Up @@ -23,13 +23,15 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;


import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;



import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;

import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
Expand All @@ -52,7 +54,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
private Resource maxContainerResource; private Resource maxContainerResource;
private Resource minContainerResource; private Resource minContainerResource;
private Resource incrContainerResource; private Resource incrContainerResource;
private List<NodeId> nodesForScheduling; private List<RemoteNode> nodesForScheduling;
private RegisterApplicationMasterResponse registerApplicationMasterResponse; private RegisterApplicationMasterResponse registerApplicationMasterResponse;


public RegisterDistributedSchedulingAMResponsePBImpl() { public RegisterDistributedSchedulingAMResponsePBImpl() {
Expand Down Expand Up @@ -95,8 +97,8 @@ private synchronized void mergeLocalToProto() {
private synchronized void mergeLocalToBuilder() { private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) { if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling(); builder.clearNodesForScheduling();
Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable( Iterable<YarnServerCommonServiceProtos.RemoteNodeProto> iterable =
this.nodesForScheduling); getNodeIdProtoIterable(this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable); builder.addAllNodesForScheduling(iterable);
} }
if (this.maxContainerResource != null) { if (this.maxContainerResource != null) {
Expand Down Expand Up @@ -261,7 +263,7 @@ public long getContainerIdStart() {
} }


@Override @Override
public void setNodesForScheduling(List<NodeId> nodesForScheduling) { public void setNodesForScheduling(List<RemoteNode> nodesForScheduling) {
maybeInitBuilder(); maybeInitBuilder();
if (nodesForScheduling == null || nodesForScheduling.isEmpty()) { if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
if (this.nodesForScheduling != null) { if (this.nodesForScheduling != null) {
Expand All @@ -275,7 +277,7 @@ public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
} }


@Override @Override
public List<NodeId> getNodesForScheduling() { public List<RemoteNode> getNodesForScheduling() {
if (nodesForScheduling != null) { if (nodesForScheduling != null) {
return nodesForScheduling; return nodesForScheduling;
} }
Expand All @@ -287,33 +289,34 @@ private synchronized void initLocalNodesForSchedulingList() {
YarnServerCommonServiceProtos. YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p = RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder; viaProto ? proto : builder;
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList(); List<YarnServerCommonServiceProtos.RemoteNodeProto> list =
p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>(); nodesForScheduling = new ArrayList<>();
if (list != null) { if (list != null) {
for (YarnProtos.NodeIdProto t : list) { for (YarnServerCommonServiceProtos.RemoteNodeProto t : list) {
nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t)); nodesForScheduling.add(new RemoteNodePBImpl(t));
} }
} }
} }


private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable( private synchronized Iterable<RemoteNodeProto> getNodeIdProtoIterable(
final List<NodeId> nodeList) { final List<RemoteNode> nodeList) {
maybeInitBuilder(); maybeInitBuilder();
return new Iterable<YarnProtos.NodeIdProto>() { return new Iterable<RemoteNodeProto>() {
@Override @Override
public synchronized Iterator<YarnProtos.NodeIdProto> iterator() { public synchronized Iterator<RemoteNodeProto> iterator() {
return new Iterator<YarnProtos.NodeIdProto>() { return new Iterator<RemoteNodeProto>() {


Iterator<NodeId> iter = nodeList.iterator(); Iterator<RemoteNode> iter = nodeList.iterator();


@Override @Override
public boolean hasNext() { public boolean hasNext() {
return iter.hasNext(); return iter.hasNext();
} }


@Override @Override
public YarnProtos.NodeIdProto next() { public RemoteNodeProto next() {
return ProtoUtils.convertToProtoFormat(iter.next()); return ((RemoteNodePBImpl)iter.next()).getProto();
} }


@Override @Override
Expand Down

0 comments on commit aa3cab1

Please sign in to comment.