diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index 8c4494901..4ccdd03d3 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -88,7 +88,7 @@ private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws IOExcepti this.conf = conf; this.factory = factory; - final RegistrationHandler handler = new RegistrationHandler(); + final RegistrationHandler handler = new RegistrationHandler(conf); try { factory.getServer().registerClient(clientId, secret, handler); // In some scenarios the user may need to configure this endpoint setting explicitly. @@ -303,6 +303,12 @@ private static File writeConfToFile(RSCConf conf) throws IOException { private class RegistrationHandler extends BaseProtocol implements RpcServer.ClientCallback { + private final RSCConf conf; + + private RegistrationHandler(RSCConf conf) { + this.conf = conf; + } + volatile RemoteDriverAddress driverAddress; private Rpc client; @@ -327,9 +333,12 @@ void dispose() { //Note. Your compiler or IDE may identify this method as unused //tests fail without it public void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) { - InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); - String ip = insocket.getAddress().getHostAddress(); - ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret); + String host = msg.host; + if(conf.getBoolean(RPC_CLIENT_GET_DRIVER_IP_FROM_CONNECTION)) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + host = insocket.getAddress().getHostAddress(); + } + ContextInfo info = new ContextInfo(host, msg.port, clientId, secret); if (promise.trySuccess(info)) { timeout.cancel(true); LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(), diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 933948fa3..e83402596 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -63,6 +63,7 @@ public enum Entry implements ConfEntry { RPC_SERVER_ADDRESS("rpc.server.address", null), RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"), RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"), + RPC_CLIENT_GET_DRIVER_IP_FROM_CONNECTION("client.get-driver-ip-from-connection", true), RPC_CHANNEL_LOG_LEVEL("channel.log.level", null), RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024), RPC_MAX_THREADS("rpc.threads", 8),