From 5508615c73d200dacbcf548c1647425deaf48e06 Mon Sep 17 00:00:00 2001 From: Ivan Dzikovsky Date: Thu, 12 Jan 2023 00:42:09 +0200 Subject: [PATCH] Provide a way to disable the fix for LIVY-697 --- .../org/apache/livy/rsc/ContextLauncher.java | 17 +++++++++++++---- .../main/java/org/apache/livy/rsc/RSCConf.java | 1 + 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index c59136d55..ed7098358 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -92,7 +92,7 @@ private ContextLauncher(RSCClientFactory factory, RSCConf conf) throws IOExcepti this.conf = conf; this.factory = factory; - final RegistrationHandler handler = new RegistrationHandler(); + final RegistrationHandler handler = new RegistrationHandler(conf); try { factory.getServer().registerClient(clientId, secret, handler); String replMode = conf.get("repl"); @@ -319,6 +319,12 @@ private static File writeConfToFile(RSCConf conf) throws IOException { private class RegistrationHandler extends BaseProtocol implements RpcServer.ClientCallback { + private final RSCConf conf; + + private RegistrationHandler(RSCConf conf) { + this.conf = conf; + } + volatile RemoteDriverAddress driverAddress; private Rpc client; @@ -341,9 +347,12 @@ void dispose() { } private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) { - InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); - String ip = insocket.getAddress().getHostAddress(); - ContextInfo info = new ContextInfo(ip, msg.port, clientId, secret); + String host = msg.host; + if(conf.getBoolean(RPC_CLIENT_GET_DRIVER_IP_FROM_CONNECTION)) { + InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); + host = insocket.getAddress().getHostAddress(); + } + ContextInfo info = new ContextInfo(host, msg.port, clientId, secret); if (promise.trySuccess(info)) { timeout.cancel(true); LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(), diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index 4c45956d7..9d5845547 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -63,6 +63,7 @@ public static enum Entry implements ConfEntry { RPC_SERVER_ADDRESS("rpc.server.address", null), RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90s"), RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10s"), + RPC_CLIENT_GET_DRIVER_IP_FROM_CONNECTION("client.get-driver-ip-from-connection", true), RPC_CHANNEL_LOG_LEVEL("channel.log.level", null), RPC_MAX_MESSAGE_SIZE("rpc.max.size", 50 * 1024 * 1024), RPC_MAX_THREADS("rpc.threads", 8),