From 22e952d7a535331dc447e2c1f2cf241a538ee5bc Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 20 Jun 2018 16:45:21 -0700 Subject: [PATCH] merge with REEF-2024 --- .../proto/bridge/DriverClientProtocol.proto | 13 +++++ .../client/grpc/DriverClientService.java | 55 ++++++++++++++++--- .../bridge/driver/common/grpc/GRPCUtils.java | 15 ++++- 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/lang/common/proto/bridge/DriverClientProtocol.proto b/lang/common/proto/bridge/DriverClientProtocol.proto index 40b4c1d8cf..7411e8df7b 100644 --- a/lang/common/proto/bridge/DriverClientProtocol.proto +++ b/lang/common/proto/bridge/DriverClientProtocol.proto @@ -133,6 +133,19 @@ message EvaluatorDescriptorInfo { // name of the runtime string runtime_name = 3; + + // node descriptor info + message NodeDescriptorInfo { + string id = 1; + + string ip_address = 5; + string host_name = 6; + int32 port = 7; + + // rack info + string rack_name = 10; + } + NodeDescriptorInfo node_descriptor_info = 5; } // Information related to an evaluator. diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java index 9a3ef1be13..dd4013c3f3 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java @@ -34,6 +34,8 @@ import org.apache.reef.bridge.driver.common.grpc.ObserverCleanup; import org.apache.reef.bridge.proto.*; import org.apache.reef.bridge.proto.Void; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.driver.catalog.RackDescriptor; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.context.FailedContext; import org.apache.reef.driver.evaluator.EvaluatorDescriptor; @@ -41,7 +43,7 @@ import org.apache.reef.driver.restart.DriverRestarted; import org.apache.reef.driver.task.FailedTask; import org.apache.reef.exception.EvaluatorException; -import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorImpl; +import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorBuilderFactory; import org.apache.reef.runtime.common.utils.ExceptionCodec; import org.apache.reef.tang.InjectionFuture; import org.apache.reef.util.Optional; @@ -53,6 +55,7 @@ import javax.inject.Inject; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -79,6 +82,8 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl private final TcpPortProvider tcpPortProvider; + private final EvaluatorDescriptorBuilderFactory evaluatorDescriptorBuilderFactory; + private final InjectionFuture clientDriverDispatcher; private final Map evaluatorBridgeMap = new HashMap<>(); @@ -89,11 +94,13 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl @Inject private DriverClientService( + final EvaluatorDescriptorBuilderFactory evaluatorDescriptorBuilderFactory, final ExceptionCodec exceptionCodec, final DriverServiceClient driverServiceClient, final TcpPortProvider tcpPortProvider, final InjectionFuture clock, final InjectionFuture clientDriverDispatcher) { + this.evaluatorDescriptorBuilderFactory = evaluatorDescriptorBuilderFactory; this.exceptionCodec = exceptionCodec; this.driverServiceClient = driverServiceClient; this.tcpPortProvider = tcpPortProvider; @@ -567,12 +574,46 @@ private ActiveContextBridge addContextIfMissing(final ContextInfo contextInfo) { } private EvaluatorDescriptor toEvaluatorDescriptor(final EvaluatorDescriptorInfo info) { - return new EvaluatorDescriptorImpl( - null, - info.getMemory(), - info.getCores(), - new JVMClientProcess(), - info.getRuntimeName()); + final NodeDescriptor nodeDescriptor = new NodeDescriptor() { + @Override + public InetSocketAddress getInetSocketAddress() { + return InetSocketAddress.createUnresolved( + info.getNodeDescriptorInfo().getIpAddress(), + info.getNodeDescriptorInfo().getPort()); + } + + @Override + public RackDescriptor getRackDescriptor() { + return new RackDescriptor() { + @Override + public List getNodes() { + return Lists.newArrayList(); + } + + @Override + public String getName() { + return info.getNodeDescriptorInfo().getRackName(); + } + }; + } + + @Override + public String getName() { + return info.getNodeDescriptorInfo().getHostName(); + } + + @Override + public String getId() { + return info.getNodeDescriptorInfo().getId(); + } + }; + return this.evaluatorDescriptorBuilderFactory.newBuilder() + .setNodeDescriptor(nodeDescriptor) + .setMemory(info.getMemory()) + .setNumberOfCores(info.getCores()) + .setEvaluatorProcess(new JVMClientProcess()) + .setRuntimeName(info.getRuntimeName()) + .build(); } private ActiveContextBridge toActiveContext(final ContextInfo contextInfo) { diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/common/grpc/GRPCUtils.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/common/grpc/GRPCUtils.java index 3ba917d9f7..4910f02ada 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/common/grpc/GRPCUtils.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/common/grpc/GRPCUtils.java @@ -77,10 +77,23 @@ public static ExceptionInfo createExceptionInfo(final ExceptionCodec exceptionCo */ public static EvaluatorDescriptorInfo toEvaluatorDescriptorInfo( final EvaluatorDescriptor descriptor) { - return descriptor == null ? null : EvaluatorDescriptorInfo.newBuilder() + if (descriptor == null) { + return null; + } + EvaluatorDescriptorInfo.NodeDescriptorInfo nodeDescriptorInfo = descriptor.getNodeDescriptor() == null ? null : + EvaluatorDescriptorInfo.NodeDescriptorInfo.newBuilder() + .setHostName(descriptor.getNodeDescriptor().getName()) + .setId(descriptor.getNodeDescriptor().getId()) + .setIpAddress(descriptor.getNodeDescriptor().getInetSocketAddress().getAddress().toString()) + .setPort(descriptor.getNodeDescriptor().getInetSocketAddress().getPort()) + .setRackName(descriptor.getNodeDescriptor().getRackDescriptor() == null ? + "" : descriptor.getNodeDescriptor().getRackDescriptor().getName()) + .build(); + return EvaluatorDescriptorInfo.newBuilder() .setCores(descriptor.getNumberOfCores()) .setMemory(descriptor.getMemory()) .setRuntimeName(descriptor.getRuntimeName()) + .setNodeDescriptorInfo(nodeDescriptorInfo) .build(); }