diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 07776763e97a4..6d32696401143 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -65,6 +65,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY */ public static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024*1024; + /** Get the user's real IP */ + public static final String IPC_SERVER_RPC_GET_REAL_CLIENT_IP_KEY = + "ipc.server.get.real.client.ip"; + public static final boolean IPC_SERVER_RPC_GET_REAL_CLIENT_IP_DEFAULT = + false; /** Number of threads in RPC server reading from the socket */ public static final String IPC_SERVER_RPC_READ_THREADS_KEY = "ipc.server.read.threadpool.size"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java index 378b83d13b0c7..abced2f472ec3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java @@ -43,6 +43,7 @@ @InterfaceStability.Evolving public final class CallerContext { public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8; + public static final String CLIENT_IP_STR = "realClientIp"; /** The caller context. * * It will be truncated if it exceeds the maximum allowed length in diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index d37e4a1b24b6a..7640033677726 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -485,6 +485,8 @@ protected ResponseBuffer initialValue() { private final int maxDataLength; private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + private boolean getRealIp; + volatile private boolean running = true; // true while server runs private CallQueueManager callQueue; @@ -792,6 +794,7 @@ public static class Call implements Schedulable, // the priority level assigned by scheduler, 0 by default private long clientStateId; private boolean isCallCoordinated; + private String realClientIp = null; // the real client Ip Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -858,6 +861,15 @@ public String toString() { public Void run() throws Exception { return null; } + + public void setRealClientIp(String ip) { + realClientIp = ip; + } + + public String getRealClientIp() { + return realClientIp; + } + // should eventually be abstract but need to avoid breaking tests public UserGroupInformation getRemoteUser() { return null; @@ -2696,12 +2708,19 @@ private void processRpcRequest(RpcRequestHeaderProto header, } CallerContext callerContext = null; + String realClientIp = null; if (header.hasCallerContext()) { callerContext = new CallerContext.Builder(header.getCallerContext().getContext()) .setSignature(header.getCallerContext().getSignature() .toByteArray()) .build(); + + if (getRealIp && callerContext.getContext(). + startsWith(CallerContext.CLIENT_IP_STR)) { + realClientIp = callerContext.getContext().split(",")[0]. + replace(CallerContext.CLIENT_IP_STR + ":", ""); + } } RpcCall call = new RpcCall(this, header.getCallId(), @@ -2709,6 +2728,11 @@ private void processRpcRequest(RpcRequestHeaderProto header, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), span, callerContext); + if (realClientIp != null) { + if (NetUtils.isValidIPv4(realClientIp)) { + call.setRealClientIp(realClientIp); + } + } // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); call.markCallCoordinated(false); @@ -3112,9 +3136,11 @@ protected Server(String bindAddress, int port, this.secretManager = (SecretManager) secretManager; this.authorize = - conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, + conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); - + this.getRealIp = conf.getBoolean( + CommonConfigurationKeys.IPC_SERVER_RPC_GET_REAL_CLIENT_IP_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_GET_REAL_CLIENT_IP_DEFAULT); // configure supported authentications this.enabledAuthMethods = getAuthMethods(secretManager, conf); this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index 0f4dd9d897777..d2729ed5511eb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -300,7 +300,35 @@ public static InetSocketAddress createSocketAddrForHost(String host, int port) { } return addr; } - + + /** + * @return true if the given string is a Ipv4 + * false otherwise + */ + public static boolean isValidIPv4(String ip) { + if (ip.length() < 7) return false; + if (ip.charAt(0) == '.') return false; + if (ip.charAt(ip.length() - 1) == '.') return false; + String[] tokens = ip.split("\\."); + if (tokens.length != 4) return false; + for (String token : tokens) { + if (!isValidIPv4Token(token)) return false; + } + return true; + } + + public static boolean isValidIPv4Token(String token) { + if (token.startsWith("0") && token.length() > 1) return false; + try { + int parsedInt = Integer.parseInt(token); + if (parsedInt < 0 || parsedInt > 255) return false; + if (parsedInt == 0 && token.charAt(0) != '0') return false; + } catch (NumberFormatException nfe) { + return false; + } + return true; + } + /** * Resolve the uri's hostname and add the default port if not in the uri * @param uri to resolve diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 87f2ed761d4c6..f691eacef7630 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -548,14 +548,14 @@ private Object invokeMethod( * It adds trace info "clientIp:ip" to caller context if it's absent. */ private void appendClientIpToCallerContextIfAbsent() { - String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress(); + String clientIpInfo = CallerContext.CLIENT_IP_STR + ":" + Server.getRemoteAddress(); final CallerContext ctx = CallerContext.getCurrent(); if (isClientIpInfoAbsent(clientIpInfo, ctx)) { String origContext = ctx == null ? null : ctx.getContext(); byte[] origSignature = ctx == null ? null : ctx.getSignature(); CallerContext.setCurrent( - new CallerContext.Builder(origContext, contextFieldSeparator) - .append(clientIpInfo) + new CallerContext.Builder(clientIpInfo, contextFieldSeparator) + .append(origContext) .setSignature(origSignature) .build()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index aee4f68bdc7f2..d9324a2c4cf5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1888,7 +1888,15 @@ private void verifySoftwareVersion(DatanodeRegistration dnReg) } private static String getClientMachine() { - String clientMachine = Server.getRemoteAddress(); + String clientMachine; + Server.Call call = Server.getCurCall().get(); + if (call != null) { + clientMachine = call.getRealClientIp(); + if (clientMachine != null) { + return clientMachine; + } + } + clientMachine = Server.getRemoteAddress(); if (clientMachine == null) { //not a RPC client clientMachine = ""; }