Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY */
public static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT =
1024*1024;
/** Get the user's real IP */
public static final String IPC_SERVER_RPC_GET_REAL_CLIENT_IP_KEY =
"ipc.server.get.real.client.ip";
public static final boolean IPC_SERVER_RPC_GET_REAL_CLIENT_IP_DEFAULT =
false;
/** Number of threads in RPC server reading from the socket */
public static final String IPC_SERVER_RPC_READ_THREADS_KEY =
"ipc.server.read.threadpool.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
@InterfaceStability.Evolving
public final class CallerContext {
public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;
public static final String CLIENT_IP_STR = "realClientIp";
/** The caller context.
*
* It will be truncated if it exceeds the maximum allowed length in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ protected ResponseBuffer initialValue() {
private final int maxDataLength;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm

private boolean getRealIp;

volatile private boolean running = true; // true while server runs
private CallQueueManager<Call> callQueue;

Expand Down Expand Up @@ -792,6 +794,7 @@ public static class Call implements Schedulable,
// the priority level assigned by scheduler, 0 by default
private long clientStateId;
private boolean isCallCoordinated;
private String realClientIp = null; // the real client Ip

Call() {
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
Expand Down Expand Up @@ -858,6 +861,15 @@ public String toString() {
public Void run() throws Exception {
return null;
}

public void setRealClientIp(String ip) {
realClientIp = ip;
}

public String getRealClientIp() {
return realClientIp;
}

// should eventually be abstract but need to avoid breaking tests
public UserGroupInformation getRemoteUser() {
return null;
Expand Down Expand Up @@ -2696,19 +2708,31 @@ private void processRpcRequest(RpcRequestHeaderProto header,
}

CallerContext callerContext = null;
String realClientIp = null;
if (header.hasCallerContext()) {
callerContext =
new CallerContext.Builder(header.getCallerContext().getContext())
.setSignature(header.getCallerContext().getSignature()
.toByteArray())
.build();

if (getRealIp && callerContext.getContext().
startsWith(CallerContext.CLIENT_IP_STR)) {
realClientIp = callerContext.getContext().split(",")[0].
replace(CallerContext.CLIENT_IP_STR + ":", "");
}
}

RpcCall call = new RpcCall(this, header.getCallId(),
header.getRetryCount(), rpcRequest,
ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), span, callerContext);

if (realClientIp != null) {
if (NetUtils.isValidIPv4(realClientIp)) {
call.setRealClientIp(realClientIp);
}
}
// Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call));
call.markCallCoordinated(false);
Expand Down Expand Up @@ -3112,9 +3136,11 @@ protected Server(String bindAddress, int port,

this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
false);

this.getRealIp = conf.getBoolean(
CommonConfigurationKeys.IPC_SERVER_RPC_GET_REAL_CLIENT_IP_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_GET_REAL_CLIENT_IP_DEFAULT);
// configure supported authentications
this.enabledAuthMethods = getAuthMethods(secretManager, conf);
this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,35 @@ public static InetSocketAddress createSocketAddrForHost(String host, int port) {
}
return addr;
}


/**
* @return true if the given string is a Ipv4
* false otherwise
*/
public static boolean isValidIPv4(String ip) {
if (ip.length() < 7) return false;
if (ip.charAt(0) == '.') return false;
if (ip.charAt(ip.length() - 1) == '.') return false;
String[] tokens = ip.split("\\.");
if (tokens.length != 4) return false;
for (String token : tokens) {
if (!isValidIPv4Token(token)) return false;
}
return true;
}

public static boolean isValidIPv4Token(String token) {
if (token.startsWith("0") && token.length() > 1) return false;
try {
int parsedInt = Integer.parseInt(token);
if (parsedInt < 0 || parsedInt > 255) return false;
if (parsedInt == 0 && token.charAt(0) != '0') return false;
} catch (NumberFormatException nfe) {
return false;
}
return true;
}

/**
* Resolve the uri's hostname and add the default port if not in the uri
* @param uri to resolve
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,14 @@ private Object invokeMethod(
* It adds trace info "clientIp:ip" to caller context if it's absent.
*/
private void appendClientIpToCallerContextIfAbsent() {
String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress();
String clientIpInfo = CallerContext.CLIENT_IP_STR + ":" + Server.getRemoteAddress();
final CallerContext ctx = CallerContext.getCurrent();
if (isClientIpInfoAbsent(clientIpInfo, ctx)) {
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
.append(clientIpInfo)
new CallerContext.Builder(clientIpInfo, contextFieldSeparator)
.append(origContext)
.setSignature(origSignature)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1888,7 +1888,15 @@ private void verifySoftwareVersion(DatanodeRegistration dnReg)
}

private static String getClientMachine() {
String clientMachine = Server.getRemoteAddress();
String clientMachine;
Server.Call call = Server.getCurCall().get();
if (call != null) {
clientMachine = call.getRealClientIp();
if (clientMachine != null) {
return clientMachine;
}
}
clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
}
Expand Down