From 036ebb802f15a0535719be3320992602a2de4b1d Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 18 Mar 2015 15:19:33 +0900 Subject: [PATCH] TAJO-1409 Clients calling remote services returning BoolProto ignores false values --- .../QueryMasterManagerService.java | 10 ++-- .../tajo/worker/TajoWorkerManagerService.java | 5 +- .../org/apache/tajo/rpc/AsyncRpcServer.java | 31 +++++------- .../apache/tajo/rpc/BlockingRpcServer.java | 21 ++------ .../tajo/rpc/ChannelExceptionHandler.java | 48 +++++++++++++++++++ .../apache/tajo/rpc/InvocationFailure.java | 27 +++++++++++ .../apache/tajo/rpc/RemoteCallException.java | 12 ++++- 7 files changed, 109 insertions(+), 45 deletions(-) create mode 100644 tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelExceptionHandler.java create mode 100644 tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/InvocationFailure.java diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java index 85cc553d84..03ff9bc6d9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java @@ -32,6 +32,8 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.*; +import org.apache.tajo.rpc.InvocationFailure; +import org.apache.tajo.rpc.RemoteCallException; import org.apache.tajo.session.Session; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; @@ -168,7 +170,7 @@ public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatus done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); + throw new InvocationFailure(e); } } @@ -193,7 +195,7 @@ public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErr done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); + throw new InvocationFailure(e); } } @@ -209,7 +211,7 @@ public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionRepo done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); + throw new InvocationFailure(e); } } @@ -256,7 +258,7 @@ public void executeQuery(RpcController controller, } catch (Exception e) { workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc(); LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); + throw new InvocationFailure(e); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index 4a097725fc..04d612faf5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -34,6 +34,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.AsyncRpcServer; +import org.apache.tajo.rpc.InvocationFailure; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.event.TaskRunnerStartEvent; @@ -126,7 +127,7 @@ public void startExecutionBlock(RpcController controller, done.run(TajoWorker.TRUE_PROTO); } catch (Throwable t) { LOG.error(t.getMessage(), t); - done.run(TajoWorker.FALSE_PROTO); + throw new InvocationFailure(t); } } @@ -141,7 +142,7 @@ public void stopExecutionBlock(RpcController controller, done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); + throw new InvocationFailure(e); } } diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java index 3b5a747686..d7d08de27d 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -57,7 +57,7 @@ public AsyncRpcServer(final Class protocol, } @ChannelHandler.Sharable - private class ServerHandler extends ChannelInboundHandlerAdapter { + private class ServerHandler extends ChannelExceptionHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @@ -88,7 +88,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName), true); } Message paramProto = null; @@ -97,7 +97,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() .mergeFrom(request.getRequestMessage()).build(); } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); + throw new RemoteCallException(request.getId(), methodDescriptor, t, true); } } @@ -121,28 +121,19 @@ public void run(Message returnValue) { } }; - service.callMethod(methodDescriptor, controller, paramProto, callback); + try { + service.callMethod(methodDescriptor, controller, paramProto, callback); + } catch (RuntimeException t) { + if (t instanceof InvocationFailure) { + throw new RemoteCallException(request.getId(), methodDescriptor, t.getCause(), false); + } + throw t; + } } finally { ReferenceCountUtil.release(msg); } } } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception{ - if (cause instanceof RemoteCallException) { - RemoteCallException callException = (RemoteCallException) cause; - ctx.writeAndFlush(callException.getResponse()); - } else { - LOG.error(cause.getMessage()); - } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } - } - } } \ No newline at end of file diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java index 0ce359fb98..9157baf9dc 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java @@ -62,7 +62,7 @@ public BlockingRpcServer(final Class protocol, } @ChannelHandler.Sharable - private class ServerHandler extends ChannelInboundHandlerAdapter { + private class ServerHandler extends ChannelExceptionHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @@ -94,7 +94,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName), true); } Message paramProto = null; if (request.hasRequestMessage()) { @@ -103,7 +103,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) .mergeFrom(request.getRequestMessage()).build(); } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); + throw new RemoteCallException(request.getId(), methodDescriptor, t, true); } } Message returnValue; @@ -112,7 +112,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) try { returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); + throw new RemoteCallException(request.getId(), methodDescriptor, t, false); } RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); @@ -130,18 +130,5 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) } } } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (cause instanceof RemoteCallException) { - RemoteCallException callException = (RemoteCallException) cause; - ctx.writeAndFlush(callException.getResponse()); - } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } - } - } } \ No newline at end of file diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelExceptionHandler.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelExceptionHandler.java new file mode 100644 index 0000000000..e073874971 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelExceptionHandler.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class ChannelExceptionHandler extends ChannelInboundHandlerAdapter { + + private static final Log LOG = LogFactory.getLog(ChannelExceptionHandler.class); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + + boolean closeConnection = true; + if (cause instanceof RemoteCallException) { + RemoteCallException callException = (RemoteCallException) cause; + closeConnection = callException.shouldCloseConnection(); + ctx.writeAndFlush(callException.getResponse()); + cause = cause.getCause(); + } + if (cause != null) { + LOG.error(cause.toString(), cause); + } + + if (closeConnection && ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); + } + } +} diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/InvocationFailure.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/InvocationFailure.java new file mode 100644 index 0000000000..a0c58297a2 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/InvocationFailure.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +// tricky +// conveys underlying exception to ChannelHandler, which has information to make rpc protocol +public class InvocationFailure extends RuntimeException { + public InvocationFailure(Throwable t) { + super(t); + } +} diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java index 52ef31ab7b..2347732810 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java @@ -29,21 +29,29 @@ public class RemoteCallException extends RemoteException { private int seqId; private String originExceptionClass; + private transient boolean closeConnection; + public RemoteCallException(int seqId, MethodDescriptor methodDesc, - Throwable t) { + Throwable t, boolean closeConnection) { super("Remote call error occurs when " + methodDesc.getFullName() + "is called:", t); this.seqId = seqId; if (t != null) { originExceptionClass = t.getClass().getCanonicalName(); } + this.closeConnection = closeConnection; } - public RemoteCallException(int seqId, Throwable t) { + public RemoteCallException(int seqId, Throwable t, boolean closeConnection) { super(t); this.seqId = seqId; if (t != null) { originExceptionClass = t.getClass().getCanonicalName(); } + this.closeConnection = closeConnection; + } + + public boolean shouldCloseConnection() { + return closeConnection; } public RpcResponse getResponse() {