Skip to content

Commit

Permalink
HBASE-13397 Purge duplicate rpc request thread local
Browse files Browse the repository at this point in the history
  • Loading branch information
saintstack committed Apr 3, 2015
1 parent f561ef7 commit 6c22333
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
Expand All @@ -35,28 +34,26 @@
/**
* The request processing logic, which is usually executed in thread pools provided by an
* {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
* {@link RpcServer.Call}
* RpcServer.Call
*/
@InterfaceAudience.Private
public class CallRunner {
private Call call;
private RpcServerInterface rpcServer;
private MonitoredRPCHandler status;
private UserProvider userProvider;

/**
* On construction, adds the size of this call to the running count of outstanding call sizes.
* Presumption is that we are put on a queue while we wait on an executor to run us. During this
* time we occupy heap.
*/
// The constructor is shutdown so only RpcServer in this class can make one of these.
CallRunner(final RpcServerInterface rpcServer, final Call call, UserProvider userProvider) {
CallRunner(final RpcServerInterface rpcServer, final Call call) {
this.call = call;
this.rpcServer = rpcServer;
// Add size of the call to queue size.
this.rpcServer.addCallSize(call.getSize());
this.status = getStatus();
this.userProvider = userProvider;
}

public Call getCall() {
Expand All @@ -70,7 +67,6 @@ private void cleanup() {
this.call = null;
this.rpcServer = null;
this.status = null;
this.userProvider = null;
}

public void run() {
Expand Down Expand Up @@ -101,8 +97,6 @@ public void run() {
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
}
RequestContext.set(userProvider.create(call.connection.user), RpcServer.getRemoteIp(),
call.connection.service);
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
call.timestamp, this.status);
Expand All @@ -117,11 +111,8 @@ public void run() {
if (traceScope != null) {
traceScope.close();
}
// Must always clear the request context to avoid leaking
// credentials between requests.
RequestContext.clear();
RpcServer.CurCall.set(null);
}
RpcServer.CurCall.set(null);
// Set the response for undelayed calls and delayed calls with
// undelayed responses.
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.ipc;

import java.net.InetAddress;

import org.apache.hadoop.hbase.security.User;


public interface RpcCallContext extends Delayable {
/**
Expand All @@ -36,4 +40,21 @@ public interface RpcCallContext extends Delayable {
* @return True if the client supports cellblocks, else return all content in pb
*/
boolean isClientCellBlockSupport();

/**
* Returns the user credentials associated with the current RPC request or
* <code>null</code> if no credentials were provided.
* @return A User
*/
User getRequestUser();

/**
* @return Current request's user name or null if none ongoing.
*/
String getRequestUserName();

/**
* @return Address of remote client if a request is ongoing, else null
*/
InetAddress getRemoteAddress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.SaslStatus;
Expand Down Expand Up @@ -299,9 +300,12 @@ class Call implements RpcCallContext {
protected TraceInfo tinfo;
private ByteBuffer cellBlock = null;

private User user;
private InetAddress remoteAddress;

Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
long size, TraceInfo tinfo) {
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
this.id = id;
this.service = service;
this.md = md;
Expand All @@ -316,6 +320,8 @@ class Call implements RpcCallContext {
this.isError = false;
this.size = size;
this.tinfo = tinfo;
this.user = connection.user == null? null: userProvider.create(connection.user);
this.remoteAddress = remoteAddress;
}

/**
Expand Down Expand Up @@ -522,6 +528,22 @@ public synchronized void sendResponseIfReady() throws IOException {
public UserGroupInformation getRemoteUser() {
return connection.user;
}

@Override
public User getRequestUser() {
return user;
}

@Override
public String getRequestUserName() {
User user = getRequestUser();
return user == null? null: user.getShortName();
}

@Override
public InetAddress getRemoteAddress() {
return remoteAddress;
}
}

/** Listens on the socket. Creates jobs for the handler threads*/
Expand Down Expand Up @@ -1196,13 +1218,14 @@ public class Connection {
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall =
new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null);
new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null,
null);
private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
private final Call saslCall =
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null);
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);

public UserGroupInformation attemptingUser = null; // user name before auth

Expand Down Expand Up @@ -1590,7 +1613,7 @@ private int doBadPreambleHandling(final String msg) throws IOException {

private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null);
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
Expand Down Expand Up @@ -1749,7 +1772,7 @@ protected void processRequest(byte[] buf) throws IOException, InterruptedExcepti
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null);
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
"Call queue is full on " + getListenerAddress() +
Expand Down Expand Up @@ -1794,7 +1817,7 @@ protected void processRequest(byte[] buf) throws IOException, InterruptedExcepti

final Call readParamsFailedCall =
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null);
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage());
Expand All @@ -1806,9 +1829,8 @@ protected void processRequest(byte[] buf) throws IOException, InterruptedExcepti
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null;
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize,
traceInfo);
scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
totalRequestSize, traceInfo, RpcServer.getRemoteIp());
scheduler.dispatch(new CallRunner(RpcServer.this, call));
}

private boolean authorizeConnection() throws IOException {
Expand Down Expand Up @@ -2345,6 +2367,33 @@ public static RpcCallContext getCurrentCall() {
return CurCall.get();
}

/**
* Returns the user credentials associated with the current RPC request or
* <code>null</code> if no credentials were provided.
* @return A User
*/
public static User getRequestUser() {
RpcCallContext ctx = getCurrentCall();
return ctx == null? null: ctx.getRequestUser();
}

/**
* Returns the username for any user associated with the current RPC
* request or <code>null</code> if no user is set.
*/
public static String getRequestUserName() {
User user = getRequestUser();
return user == null? null: user.getShortName();
}

/**
* @return Address of remote client if a request is ongoing, else null
*/
public static InetAddress getRemoteAddress() {
RpcCallContext ctx = getCurrentCall();
return ctx == null? null: ctx.getRemoteAddress();
}

/**
* @param serviceName Some arbitrary string that represents a 'service'.
* @param services Available service instances
Expand Down
Loading

0 comments on commit 6c22333

Please sign in to comment.