Skip to content

Commit

Permalink
HBASE-15793 Port over AsyncCall improvements
Browse files Browse the repository at this point in the history
Signed-off-by: stack <stack@apache.org>
  • Loading branch information
jurmous authored and saintstack committed May 7, 2016
1 parent ac31ceb commit fa033b6
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 93 deletions.
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;

/**
* Promise for responses
* @param <V> Value type
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface Future<V> extends io.netty.util.concurrent.Future<V> {

}
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import io.netty.util.concurrent.GenericFutureListener;
import org.apache.hadoop.hbase.classification.InterfaceAudience;

/**
* Specific interface for the Response future listener
* @param <V> Value type.
*/
@InterfaceAudience.Private
public interface ResponseFutureListener<V>
extends GenericFutureListener<Future<V>> {
}
Expand Up @@ -19,8 +19,7 @@

import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.DefaultPromise;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -31,51 +30,72 @@
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.ipc.RemoteException;

import java.io.IOException;

/**
* Represents an Async Hbase call and its response.
*
* Responses are passed on to its given doneHandler and failures to the rpcController
*
* @param <T> Type of message returned
* @param <M> Message returned in communication to be converted
*/
@InterfaceAudience.Private
public class AsyncCall extends DefaultPromise<Message> {
public class AsyncCall<M extends Message, T> extends Promise<T> {
private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());

final int id;

private final AsyncRpcChannelImpl channel;

final Descriptors.MethodDescriptor method;
final Message param;
final PayloadCarryingRpcController controller;
final Message responseDefaultType;

private final MessageConverter<M,T> messageConverter;
final long startTime;
final long rpcTimeout;
private final IOExceptionConverter exceptionConverter;

// For only the request
private final CellScanner cellScanner;
private final int priority;

final MetricsConnection.CallStats callStats;

/**
* Constructor
*
* @param eventLoop for call
* @param channel which initiated call
* @param connectId connection id
* @param md the method descriptor
* @param param parameters to send to Server
* @param controller controller for response
* @param cellScanner cellScanner containing cells to send as request
* @param responseDefaultType the default response type
* @param messageConverter converts the messages to what is the expected output
* @param rpcTimeout timeout for this call in ms
* @param priority for this request
*/
public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
param, PayloadCarryingRpcController controller, Message responseDefaultType,
public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor
md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
MetricsConnection.CallStats callStats) {
super(eventLoop);
super(channel.getEventExecutor());
this.channel = channel;

this.id = connectId;

this.method = md;
this.param = param;
this.controller = controller;
this.responseDefaultType = responseDefaultType;

this.messageConverter = messageConverter;
this.exceptionConverter = exceptionConverter;

this.startTime = EnvironmentEdgeManager.currentTime();
this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
this.rpcTimeout = rpcTimeout;

this.priority = priority;
this.cellScanner = cellScanner;

this.callStats = callStats;
}

Expand All @@ -101,17 +121,19 @@ public String toString() {
* @param value to set
* @param cellBlockScanner to set
*/
public void setSuccess(Message value, CellScanner cellBlockScanner) {
if (cellBlockScanner != null) {
controller.setCellScanner(cellBlockScanner);
}

public void setSuccess(M value, CellScanner cellBlockScanner) {
if (LOG.isTraceEnabled()) {
long callTime = EnvironmentEdgeManager.currentTime() - startTime;
LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms");
}

this.setSuccess(value);
try {
this.setSuccess(
this.messageConverter.convert(value, cellBlockScanner)
);
} catch (IOException e) {
this.setFailed(e);
}
}

/**
Expand All @@ -127,6 +149,10 @@ public void setFailed(IOException exception) {
exception = ((RemoteException) exception).unwrapRemoteException();
}

if (this.exceptionConverter != null) {
exception = this.exceptionConverter.convert(exception);
}

this.setFailure(exception);
}

Expand All @@ -138,4 +164,27 @@ public void setFailed(IOException exception) {
public long getRpcTimeout() {
return rpcTimeout;
}


/**
* @return Priority for this call
*/
public int getPriority() {
return priority;
}

/**
* Get the cellScanner for this request.
* @return CellScanner
*/
public CellScanner cellScanner() {
return cellScanner;
}

@Override
public boolean cancel(boolean mayInterupt){
this.channel.removePendingCall(this.id);
return super.cancel(mayInterupt);
}

}
Expand Up @@ -21,11 +21,12 @@
import com.google.protobuf.Message;

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;

import java.net.InetSocketAddress;

import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;

/**
Expand All @@ -37,13 +38,23 @@ public interface AsyncRpcChannel {
/**
* Calls method on channel
* @param method to call
* @param controller to run call with
* @param request to send
* @param cellScanner with cells to send
* @param responsePrototype to construct response with
* @param messageConverter for the messages to expected result
* @param exceptionConverter for converting exceptions
* @param rpcTimeout timeout for request
* @param priority for request
* @param callStats collects stats of the call
* @return Promise for the response Message
*/
Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request,
final Message responsePrototype, MetricsConnection.CallStats callStats);

<R extends Message, O> Future<O> callMethod(
final Descriptors.MethodDescriptor method,
final Message request,final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats);


/**
* Get the EventLoop on which this channel operated
Expand Down
Expand Up @@ -19,8 +19,6 @@

import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
Expand All @@ -32,7 +30,6 @@
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;

import java.io.IOException;
import java.net.ConnectException;
Expand All @@ -51,8 +48,10 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
Expand Down Expand Up @@ -291,36 +290,25 @@ public void run(Timeout timeout) throws Exception {
/**
* Calls method on channel
* @param method to call
* @param controller to run call with
* @param request to send
* @param cellScanner with cells to send
* @param responsePrototype to construct response with
* @param rpcTimeout timeout for request
* @param priority for request
* @return Promise for the response Message
*/
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request,
final Message responsePrototype, MetricsConnection.CallStats callStats) {
final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
method, request, controller, responsePrototype, callStats);
controller.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
// TODO: do not need to call AsyncCall.setFailed?
synchronized (pendingCalls) {
pendingCalls.remove(call.id);
}
}
});
// TODO: this should be handled by PayloadCarryingRpcController.
if (controller.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.cancel(true);
return call;
}

public <R extends Message, O> Future<O> callMethod(
final Descriptors.MethodDescriptor method,
final Message request,final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats) {
final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
rpcTimeout, priority, callStats);
synchronized (pendingCalls) {
if (closed) {
Promise<Message> promise = channel.eventLoop().newPromise();
promise.setFailure(new ConnectException());
return promise;
call.setFailure(new ConnectException());
return call;
}
pendingCalls.put(call.id, call);
// Add timeout for cleanup if none is present
Expand Down Expand Up @@ -398,16 +386,16 @@ private void writeRequest(final AsyncCall call) {
.setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
}

ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
if (cellBlock != null) {
final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
.newBuilder();
cellBlockBuilder.setLength(cellBlock.limit());
requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
// Only pass priority if there one. Let zero be same as no priority.
if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
requestHeaderBuilder.setPriority(call.controller.getPriority());
if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
requestHeaderBuilder.setPriority(call.getPriority());
}

RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
Expand Down

0 comments on commit fa033b6

Please sign in to comment.