Skip to content

Commit

Permalink
HADOOP-17680. Allow ProtobufRpcEngine to be extensible (#2905) Contri…
Browse files Browse the repository at this point in the history
…buted by Hector Chaverri.

(cherry picked from commit f40e3eb)
  • Loading branch information
hchaverri authored and shvachko committed May 7, 2021
1 parent badb6b3 commit 39bf9e2
Showing 1 changed file with 24 additions and 6 deletions.
Expand Up @@ -115,7 +115,7 @@ public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
factory)), false);
}

private static class Invoker implements RpcInvocationHandler {
protected static class Invoker implements RpcInvocationHandler {
private final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
Expand All @@ -126,7 +126,7 @@ private static class Invoker implements RpcInvocationHandler {
private AtomicBoolean fallbackToSimpleAuth;
private AlignmentContext alignmentContext;

private Invoker(Class<?> protocol, InetSocketAddress addr,
protected Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
Expand All @@ -141,7 +141,7 @@ private Invoker(Class<?> protocol, InetSocketAddress addr,
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
private Invoker(Class<?> protocol, Client.ConnectionId connId,
protected Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
Expand Down Expand Up @@ -218,8 +218,6 @@ public Message invoke(Object proxy, final Method method, Object[] args)
traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
}

RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);

if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
remoteId + ": " + method.getName() +
Expand All @@ -231,7 +229,7 @@ public Message invoke(Object proxy, final Method method, Object[] args)
final RpcWritable.Buffer val;
try {
val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
constructRpcRequest(method, theRequest), remoteId,
fallbackToSimpleAuth, alignmentContext);

} catch (Throwable e) {
Expand Down Expand Up @@ -276,6 +274,11 @@ public boolean isDone() {
}
}

protected Writable constructRpcRequest(Method method, Message theRequest) {
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
return new RpcProtobufRequest(rpcRequestHeader, theRequest);
}

private Message getReturnMessage(final Method method,
final RpcWritable.Buffer buf) throws ServiceException {
Message prototype = null;
Expand Down Expand Up @@ -325,6 +328,14 @@ private Message getReturnProtoType(Method method) throws Exception {
public ConnectionId getConnectionId() {
return remoteId;
}

protected long getClientProtocolVersion() {
return clientProtocolVersion;
}

protected String getProtocolName() {
return protocolName;
}
}

@VisibleForTesting
Expand Down Expand Up @@ -503,6 +514,13 @@ public Writable call(RPC.Server server, String connectionProtocolName,
String declaringClassProtoName =
rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
return call(server, connectionProtocolName, request, receiveTime,
methodName, declaringClassProtoName, clientVersion);
}

protected Writable call(RPC.Server server, String connectionProtocolName,
RpcWritable.Buffer request, long receiveTime, String methodName,
String declaringClassProtoName, long clientVersion) throws Exception {
if (server.verbose)
LOG.info("Call: connectionProtocolName=" + connectionProtocolName +
", method=" + methodName);
Expand Down

0 comments on commit 39bf9e2

Please sign in to comment.