Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

Commit

Permalink
merge with REEF-2024
Browse files Browse the repository at this point in the history
  • Loading branch information
Tyson Condie committed Jun 20, 2018
1 parent 16d18ab commit 22e952d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
13 changes: 13 additions & 0 deletions lang/common/proto/bridge/DriverClientProtocol.proto
Expand Up @@ -133,6 +133,19 @@ message EvaluatorDescriptorInfo {

// name of the runtime
string runtime_name = 3;

// node descriptor info
message NodeDescriptorInfo {
string id = 1;

string ip_address = 5;
string host_name = 6;
int32 port = 7;

// rack info
string rack_name = 10;
}
NodeDescriptorInfo node_descriptor_info = 5;
}

// Information related to an evaluator.
Expand Down
Expand Up @@ -34,14 +34,16 @@
import org.apache.reef.bridge.driver.common.grpc.ObserverCleanup;
import org.apache.reef.bridge.proto.*;
import org.apache.reef.bridge.proto.Void;
import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.driver.catalog.RackDescriptor;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
import org.apache.reef.driver.restart.DriverRestartCompleted;
import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.FailedTask;
import org.apache.reef.exception.EvaluatorException;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorImpl;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorBuilderFactory;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.util.Optional;
Expand All @@ -53,6 +55,7 @@

import javax.inject.Inject;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -79,6 +82,8 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl

private final TcpPortProvider tcpPortProvider;

private final EvaluatorDescriptorBuilderFactory evaluatorDescriptorBuilderFactory;

private final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher;

private final Map<String, AllocatedEvaluatorBridge> evaluatorBridgeMap = new HashMap<>();
Expand All @@ -89,11 +94,13 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl

@Inject
private DriverClientService(
final EvaluatorDescriptorBuilderFactory evaluatorDescriptorBuilderFactory,
final ExceptionCodec exceptionCodec,
final DriverServiceClient driverServiceClient,
final TcpPortProvider tcpPortProvider,
final InjectionFuture<Clock> clock,
final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher) {
this.evaluatorDescriptorBuilderFactory = evaluatorDescriptorBuilderFactory;
this.exceptionCodec = exceptionCodec;
this.driverServiceClient = driverServiceClient;
this.tcpPortProvider = tcpPortProvider;
Expand Down Expand Up @@ -567,12 +574,46 @@ private ActiveContextBridge addContextIfMissing(final ContextInfo contextInfo) {
}

private EvaluatorDescriptor toEvaluatorDescriptor(final EvaluatorDescriptorInfo info) {
return new EvaluatorDescriptorImpl(
null,
info.getMemory(),
info.getCores(),
new JVMClientProcess(),
info.getRuntimeName());
final NodeDescriptor nodeDescriptor = new NodeDescriptor() {
@Override
public InetSocketAddress getInetSocketAddress() {
return InetSocketAddress.createUnresolved(
info.getNodeDescriptorInfo().getIpAddress(),
info.getNodeDescriptorInfo().getPort());
}

@Override
public RackDescriptor getRackDescriptor() {
return new RackDescriptor() {
@Override
public List<NodeDescriptor> getNodes() {
return Lists.newArrayList();
}

@Override
public String getName() {
return info.getNodeDescriptorInfo().getRackName();
}
};
}

@Override
public String getName() {
return info.getNodeDescriptorInfo().getHostName();
}

@Override
public String getId() {
return info.getNodeDescriptorInfo().getId();
}
};
return this.evaluatorDescriptorBuilderFactory.newBuilder()
.setNodeDescriptor(nodeDescriptor)
.setMemory(info.getMemory())
.setNumberOfCores(info.getCores())
.setEvaluatorProcess(new JVMClientProcess())
.setRuntimeName(info.getRuntimeName())
.build();
}

private ActiveContextBridge toActiveContext(final ContextInfo contextInfo) {
Expand Down
Expand Up @@ -77,10 +77,23 @@ public static ExceptionInfo createExceptionInfo(final ExceptionCodec exceptionCo
*/
public static EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(
final EvaluatorDescriptor descriptor) {
return descriptor == null ? null : EvaluatorDescriptorInfo.newBuilder()
if (descriptor == null) {
return null;
}
EvaluatorDescriptorInfo.NodeDescriptorInfo nodeDescriptorInfo = descriptor.getNodeDescriptor() == null ? null :
EvaluatorDescriptorInfo.NodeDescriptorInfo.newBuilder()
.setHostName(descriptor.getNodeDescriptor().getName())
.setId(descriptor.getNodeDescriptor().getId())
.setIpAddress(descriptor.getNodeDescriptor().getInetSocketAddress().getAddress().toString())
.setPort(descriptor.getNodeDescriptor().getInetSocketAddress().getPort())
.setRackName(descriptor.getNodeDescriptor().getRackDescriptor() == null ?
"" : descriptor.getNodeDescriptor().getRackDescriptor().getName())
.build();
return EvaluatorDescriptorInfo.newBuilder()
.setCores(descriptor.getNumberOfCores())
.setMemory(descriptor.getMemory())
.setRuntimeName(descriptor.getRuntimeName())
.setNodeDescriptorInfo(nodeDescriptorInfo)
.build();
}

Expand Down

0 comments on commit 22e952d

Please sign in to comment.