Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -121,6 +122,51 @@ public String toString() {
return str;
}

/**
* Try to obtain the value corresponding to the key by parsing the content.
* @param content the full content to be parsed.
* @param key trying to obtain the value of the key.
* @return the value corresponding to the key.
*/
@VisibleForTesting
public static String parseSpecialValue(String content, String key) {
int posn = content.indexOf(key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small javadoc with an example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some javadoc later.

if (posn != -1) {
posn += key.length();
int end = content.indexOf(",", posn);
return end == -1 ? content.substring(posn) : content.substring(posn, end);
}
return null;
}

/**
* Get client ip content from caller context.
* @param context The context here is obtained from outside.
* @return Filter the value carried by 'clientIp:' from the context.
* If not, return null.
*/
public static String getRealClientIp(String context) {
if (context != null && !context.equals("")) {

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string here is obtained from outside.

String ipKey = CLIENT_IP_STR + Builder.KEY_VALUE_SEPARATOR;
return parseSpecialValue(context, ipKey);
}
return null;
}

/**
* Get client port content from caller context.
* @param context The context here is obtained from outside.
* @return Filter the value carried by 'clientPort:' from the context.
* If not, return null.
*/
public static String getRealClientPort(String context) {
if (context != null && !context.equals("")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!context.equals("") -> context.isContextValid()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string here is obtained from outside.

String portKey = CLIENT_PORT_STR + Builder.KEY_VALUE_SEPARATOR;
return parseSpecialValue(context, portKey);
}
return null;
}

/** The caller context builder. */
public static final class Builder {
public static final String KEY_VALUE_SEPARATOR = ":";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.ipc;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
Expand Down Expand Up @@ -1110,6 +1112,10 @@ public void setDeferredResponse(Writable response) {

public void setDeferredError(Throwable t) {
}

public CallerContext getCallerContext() {
return callerContext;
}
}

/** A RPC extended call queued for handling. */
Expand Down Expand Up @@ -1351,6 +1357,16 @@ private class ResponseParams {

@Override
public String toString() {
boolean isCallerContextEnabled = conf.getBoolean(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good idea to get the value from conf every time. And HADOOP_CALLER_CONTEXT_ENABLED_KEY is used to control whether output callerContext into audit log. We plan use this key here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the essence is to obtain the real client ip, and it is not a problem to use HADOOP_CALLER_CONTEXT_ENABLED_KEY.

HADOOP_CALLER_CONTEXT_ENABLED_KEY,
HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT);
CallerContext context = getCallerContext();
if (isCallerContextEnabled && context != null && context.isContextValid()) {
String cc = context.getContext();
return super.toString() + " " + rpcRequest + " from " + connection +
", client=" + CallerContext.getRealClientIp(cc) + ":" +
CallerContext.getRealClientPort(cc);
}
return super.toString() + " " + rpcRequest + " from " + connection;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.junit.Assert;
import org.junit.Test;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
import static org.junit.Assert.assertEquals;

public class TestCallerContext {
@Test
Expand Down Expand Up @@ -81,4 +83,32 @@ public void testNewBuilder() {
CallerContext.Builder builder = new CallerContext.Builder(null, conf);
builder.build();
}

@Test
public void testParseSpecialValue() {
String mockContent = "mockContent,clientIp:1.1.1.1,clientPort:40820";
String clientIp = CallerContext.parseSpecialValue(mockContent, "clientIp:");
assertEquals("1.1.1.1", clientIp);
String clientPort = CallerContext.parseSpecialValue(mockContent, "clientPort:");
assertEquals("40820", clientPort);
}

@Test
public void testRealClient() {
Configuration conf = new Configuration();
String contextFieldSeparator = conf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
CallerContext.Builder builder =
new CallerContext.Builder("", contextFieldSeparator).append("key", "value");
String context = builder.getContext();
Assert.assertNull(CallerContext.getRealClientIp(context));
Assert.assertNull(CallerContext.getRealClientPort(context));

// Test client ip and client port.
builder.append(CallerContext.CLIENT_IP_STR, "1.1.1.1")
.append(CallerContext.CLIENT_PORT_STR, "8191");
context = builder.getContext();
Assert.assertTrue("1.1.1.1".equals(CallerContext.getRealClientIp(context)));
Assert.assertTrue("8191".equals(CallerContext.getRealClientPort(context)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents;
Expand Down Expand Up @@ -127,6 +128,7 @@
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -200,13 +202,17 @@ public int compare(
private String routerFile;
/** File in the Namenode. */
private String nnFile;
/** Configuration in the Namenode. */
private static Configuration namenodeConf;

static {
namenodeConf = new Configuration();
namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
true);
}

@BeforeClass
public static void globalSetUp() throws Exception {
Configuration namenodeConf = new Configuration();
namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
true);
// It's very easy to become overloaded for some specific dn in this small
// cluster, which will cause the EC file block allocation failure. To avoid
// this issue, we disable considerLoad option.
Expand Down Expand Up @@ -2070,6 +2076,49 @@ public void testSetBalancerBandwidth() throws Exception {
}, 100, 60 * 1000);
}

@Test
public void testRecordRealClientEnable() {
GenericTestUtils.LogCapturer logger =
GenericTestUtils.LogCapturer.captureLogs(Server.LOG);
CallerContext.setCurrent(
new CallerContext.Builder(
"clientContext,clientIp:2.2.2.2,clientPort:2345").build());
try {
String filePath = "/test/f2.log";
routerProtocol.getBlockLocations(filePath, 0, 100);
} catch (Exception e) {
// do nothing
}
assertTrue(logger.getOutput().contains("client=2.2.2.2:2345"));
}

@Test
public void testRecordRealClientDisable() throws Exception {
GenericTestUtils.LogCapturer logger =
GenericTestUtils.LogCapturer.captureLogs(Server.LOG);
// set hadoop.caller.context.enabled = false
namenodeConf.setBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY, false);
// restart cluster
tearDown();
globalSetUp();
testSetup();
CallerContext.setCurrent(
new CallerContext.Builder(
"clientContext,clientIp:1.1.1.1,clientPort:1234").build());
try {
String filePath = "/test/f1.log";
routerProtocol.getBlockLocations(filePath, 0, 100);
} catch (Exception e) {
// do nothing
}
assertFalse(logger.getOutput().contains("client="));

// reset hadoop.caller.context.enabled = true
namenodeConf.setBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY, true);
tearDown();
globalSetUp();
}

@Test
public void testAddClientIpPortToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,7 @@ public static String getClientMachine(final String[] ipProxyUsers) {
if (cc != null) {
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
// return "1.2.3.4" as the client machine.
String key = CallerContext.CLIENT_IP_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
return parseSpecialValue(cc, key);
return CallerContext.getRealClientIp(cc);
}

String clientMachine = Server.getRemoteAddress();
Expand Down