From 3e08eb8119e8776683033a0c7b3d7e1f35c4800c Mon Sep 17 00:00:00 2001 From: John Sirois Date: Mon, 8 Feb 2016 13:04:36 -0700 Subject: [PATCH 1/3] Fully parameterize java async client code. The parametrization brings the existing actual parametrization with client call implementation objects to the fore and so this change also fixes that parametrization to be a simple parametrization over the return type as is done in the server-side AsyncProcessor code. NB: This is a breaking change in both generated code and the client libs. --- compiler/cpp/src/generate/t_java_generator.cc | 22 +- .../thrift/async/AsyncMethodCallback.java | 34 +++- .../apache/thrift/async/TAsyncMethodCall.java | 23 ++- .../thrift/async/TestTAsyncClientManager.java | 192 +++++++++++------- .../thrift/protocol/ProtocolTestBase.java | 5 + .../apache/thrift/server/ServerTestBase.java | 188 ++++++++--------- test/DebugProtoTest.thrift | 2 + 7 files changed, 275 insertions(+), 191 deletions(-) diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc index 6a2b8889ad0..2613a667267 100644 --- a/compiler/cpp/src/generate/t_java_generator.cc +++ b/compiler/cpp/src/generate/t_java_generator.cc @@ -3029,7 +3029,8 @@ void t_java_generator::generate_service_async_client(t_service* tservice) { // TAsyncMethod object for this function call indent(f_service_) << "public static class " + funclassname - + " extends org.apache.thrift.async.TAsyncMethodCall {" << endl; + + " extends org.apache.thrift.async.TAsyncMethodCall<" + + type_name((*f_iter)->get_returntype(), true) + "> {" << endl; indent_up(); // Member variables @@ -3082,7 +3083,7 @@ void t_java_generator::generate_service_async_client(t_service* tservice) { indent(f_service_) << "}" << endl << endl; // Return method - indent(f_service_) << "public " + type_name(ret_type) + " getResult() throws "; + indent(f_service_) << "public " + type_name(ret_type, true) + " getResult() throws "; vector::const_iterator x_iter; for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { f_service_ << type_name((*x_iter)->get_type(), false, false) + ", "; @@ -3099,12 +3100,11 @@ void t_java_generator::generate_service_async_client(t_service* tservice) { "org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());" << endl << indent() << "org.apache.thrift.protocol.TProtocol prot = " "client.getProtocolFactory().getProtocol(memoryTransport);" << endl; - if (!(*f_iter)->is_oneway()) { - indent(f_service_); - if (!ret_type->is_void()) { - f_service_ << "return "; - } - f_service_ << "(new Client(prot)).recv" + sep + javaname + "();" << endl; + indent(f_service_); + if (ret_type->is_void()) { // NB: Includes oneways which always return void. + f_service_ << "return null;" << endl; + } else { + f_service_ << "return (new Client(prot)).recv" + sep + javaname + "();" << endl; } // Close function @@ -4224,7 +4224,8 @@ string t_java_generator::async_function_call_arglist(t_function* tfunc, } if (include_types) { - arglist += "org.apache.thrift.async.AsyncMethodCallback "; + arglist += "org.apache.thrift.async.AsyncMethodCallback<"; + arglist += type_name(tfunc->get_returntype(), true) + "> "; } arglist += "resultHandler"; @@ -4279,7 +4280,8 @@ string t_java_generator::async_argument_list(t_function* tfunct, result += ", "; } if (include_types) { - result += "org.apache.thrift.async.AsyncMethodCallback "; + result += "org.apache.thrift.async.AsyncMethodCallback<"; + result += type_name(tfunct->get_returntype(), true) + "> "; } result += "resultHandler"; return result; diff --git a/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java b/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java index 00004b7577c..4ebde074136 100644 --- a/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java +++ b/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java @@ -18,22 +18,34 @@ */ package org.apache.thrift.async; - +/** + * A handler interface asynchronous clients can implement to receive future + * notice of the results of an asynchronous method call. + * + * @param The return type of the asynchronously invoked method. + */ public interface AsyncMethodCallback { /** * This method will be called when the remote side has completed invoking - * your method call and the result is fully read. For oneway method calls, - * this method will be called as soon as we have completed writing out the - * request. - * @param response + * your method call and the result is fully read. For {@code oneway} method + * calls, this method will be called as soon as we have completed writing out + * the request. + * + * @param response The return value of the asynchronously invoked method; + * {@code null} for void methods which includes + * {@code oneway} methods. */ - public void onComplete(T response); + void onComplete(T response); /** - * This method will be called when there is an unexpected clientside - * exception. This does not include application-defined exceptions that - * appear in the IDL, but rather things like IOExceptions. - * @param exception + * This method will be called when there is either an unexpected client-side + * exception like an IOException or else when the remote method raises an + * exception, either declared in the IDL or due to an unexpected server-side + * error. + * + * @param exception The exception encountered processing the the asynchronous + * method call, may be a local exception or an unmarshalled + * remote exception. */ - public void onError(Exception exception); + void onError(Exception exception); } diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index 5c679b62b4a..a36e961662b 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -33,11 +33,15 @@ import org.apache.thrift.transport.TTransportException; /** - * Encapsulates an async method call + * Encapsulates an async method call. + *

* Need to generate: - * - private void write_args(TProtocol protocol) - * - public T getResult() throws , , ... - * @param + *

    + *
  • protected abstract void write_args(TProtocol protocol)
  • + *
  • protected abstract T getResult() throws , , ...
  • + *
+ * + * @param The return type of the encapsulated method call. */ public abstract class TAsyncMethodCall { @@ -113,6 +117,8 @@ public long getTimeoutTimestamp() { protected abstract void write_args(TProtocol protocol) throws TException; + protected abstract T getResult() throws Exception; + /** * Initialize buffers. * @throws TException if buffer initialization fails @@ -225,8 +231,13 @@ private void cleanUpAndFireCallback(SelectionKey key) { key.interestOps(0); // this ensures that the TAsyncMethod instance doesn't hang around key.attach(null); - client.onComplete(); - callback.onComplete((T)this); + try { + T result = this.getResult(); + client.onComplete(); + callback.onComplete(result); + } catch (Exception e) { + onError(e); + } } private void doReadingResponseSize() throws IOException { diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java index 12d0eaf3332..c483cf24ee5 100644 --- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java +++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java @@ -22,11 +22,13 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -39,12 +41,9 @@ import org.apache.thrift.transport.TNonblockingSocket; import thrift.test.CompactProtoTestStruct; +import thrift.test.ExceptionWithAMap; import thrift.test.Srv; import thrift.test.Srv.Iface; -import thrift.test.Srv.AsyncClient.Janky_call; -import thrift.test.Srv.AsyncClient.onewayMethod_call; -import thrift.test.Srv.AsyncClient.primitiveMethod_call; -import thrift.test.Srv.AsyncClient.voidMethod_call; public class TestTAsyncClientManager extends TestCase { @@ -83,53 +82,113 @@ public void testBasicCallWithTimeout() throws Exception { basicCall(client); } - public void testTimeoutCall() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - Srv.AsyncClient client = getClient(); - client.setTimeout(100); - client.primitiveMethod(new AsyncMethodCallback() { - @Override - public void onError(Exception exception) { - try { - if (!(exception instanceof TimeoutException)) { - StringWriter sink = new StringWriter(); - exception.printStackTrace(new PrintWriter(sink, true)); - fail("expected TimeoutException but got " + sink.toString()); - } - } finally { + private static abstract class ErrorCallTest { + final void runTest() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference error = new AtomicReference(); + C client = executeErroringCall(new AsyncMethodCallback() { + @Override + public void onComplete(R response) { latch.countDown(); } - } - @Override - public void onComplete(primitiveMethod_call response) { - try { - fail("Should not have finished timed out call."); - } finally { + @Override + public void onError(Exception exception) { + error.set(exception); latch.countDown(); } + }); + latch.await(2, TimeUnit.SECONDS); + assertTrue(client.hasError()); + Exception exception = error.get(); + assertNotNull(exception); + assertSame(exception, client.getError()); + validateError(client, exception); + } + + /** + * Executes a call that is expected to raise an exception. + * + * @param callback The testing callback that should be installed. + * @return The client the call was made against. + * @throws Exception if there was a problem setting up the client or making the call. + */ + abstract C executeErroringCall(AsyncMethodCallback callback) throws Exception; + + /** + * Further validates the properties of the error raised in the remote call and the state of the + * client after that call. + * + * @param client The client returned from {@link #executeErroringCall(AsyncMethodCallback)}. + * @param error The exception raised by the remote call. + */ + abstract void validateError(C client, Exception error); + } + + public void testUnexpectedRemoteExceptionCall() throws Exception { + new ErrorCallTest() { + @Override + Srv.AsyncClient executeErroringCall(AsyncMethodCallback callback) throws Exception { + Srv.AsyncClient client = getClient(); + client.declaredExceptionMethod(false, callback); + return client; } - }); - latch.await(2, TimeUnit.SECONDS); - assertTrue(client.hasError()); - assertTrue(client.getError() instanceof TimeoutException); + + @Override + void validateError(Srv.AsyncClient client, Exception error) { + assertFalse(client.hasTimeout()); + assertTrue(error instanceof TException); + } + }.runTest(); + } + + public void testDeclaredRemoteExceptionCall() throws Exception { + new ErrorCallTest() { + @Override + Srv.AsyncClient executeErroringCall(AsyncMethodCallback callback) throws Exception { + Srv.AsyncClient client = getClient(); + client.declaredExceptionMethod(true, callback); + return client; + } + + @Override + void validateError(Srv.AsyncClient client, Exception error) { + assertFalse(client.hasTimeout()); + assertEquals(ExceptionWithAMap.class, error.getClass()); + ExceptionWithAMap exceptionWithAMap = (ExceptionWithAMap) error; + assertEquals("blah", exceptionWithAMap.getBlah()); + assertEquals(new HashMap(), exceptionWithAMap.getMap_field()); + } + }.runTest(); + } + + public void testTimeoutCall() throws Exception { + new ErrorCallTest() { + @Override + Srv.AsyncClient executeErroringCall(AsyncMethodCallback callback) throws Exception { + Srv.AsyncClient client = getClient(); + client.setTimeout(100); + client.primitiveMethod(callback); + return client; + } + + @Override + void validateError(Srv.AsyncClient client, Exception error) { + assertTrue(client.hasTimeout()); + assertTrue(error instanceof TimeoutException); + } + }.runTest(); } public void testVoidCall() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean returned = new AtomicBoolean(false); Srv.AsyncClient client = getClient(); - client.voidMethod(new FailureLessCallback() { + client.voidMethod(new FailureLessCallback() { @Override - public void onComplete(voidMethod_call response) { - try { - response.getResult(); - returned.set(true); - } catch (TException e) { - fail(e); - } finally { - latch.countDown(); - } + public void onComplete(Void response) { + returned.set(true); + latch.countDown(); } }); latch.await(1, TimeUnit.SECONDS); @@ -140,17 +199,11 @@ public void testOnewayCall() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean returned = new AtomicBoolean(false); Srv.AsyncClient client = getClient(); - client.onewayMethod(new FailureLessCallback() { + client.onewayMethod(new FailureLessCallback() { @Override - public void onComplete(onewayMethod_call response) { - try { - response.getResult(); - returned.set(true); - } catch (TException e) { - fail(e); - } finally { - latch.countDown(); - } + public void onComplete(Void response) { + returned.set(true); + latch.countDown(); } }); latch.await(1, TimeUnit.SECONDS); @@ -188,17 +241,12 @@ private Srv.AsyncClient getClient() throws IOException { private void basicCall(Srv.AsyncClient client) throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean returned = new AtomicBoolean(false); - client.Janky(1, new FailureLessCallback() { + client.Janky(1, new FailureLessCallback() { @Override - public void onComplete(Janky_call response) { - try { - assertEquals(3, response.getResult()); - returned.set(true); - } catch (TException e) { - fail(e); - } finally { - latch.countDown(); - } + public void onComplete(Integer response) { + assertEquals(3, response.intValue()); + returned.set(true); + latch.countDown(); } @Override @@ -250,9 +298,18 @@ public void voidMethod() throws TException { @Override public void onewayMethod() throws TException { } + + @Override + public boolean declaredExceptionMethod(boolean shouldThrowDeclared) throws TException { + if (shouldThrowDeclared) { + throw new ExceptionWithAMap("blah", new HashMap()); + } else { + throw new TException("Unexpected!"); + } + } } - private static abstract class FailureLessCallback implements AsyncMethodCallback { + private static abstract class FailureLessCallback implements AsyncMethodCallback { @Override public void onError(Exception exception) { fail(exception); @@ -287,18 +344,13 @@ public void run() { // connect an async client final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean returned = new AtomicBoolean(false); - client_.Janky(1, new AsyncMethodCallback() { + client_.Janky(1, new AsyncMethodCallback() { @Override - public void onComplete(Janky_call response) { - try { - assertEquals(3, response.getResult()); - returned.set(true); - latch.countDown(); - } catch (TException e) { - latch.countDown(); - fail(e); - } + public void onComplete(Integer result) { + assertEquals(3, result.intValue()); + returned.set(true); + latch.countDown(); } @Override @@ -323,4 +375,4 @@ public void onError(Exception exception) { } } } -} \ No newline at end of file +} diff --git a/lib/java/test/org/apache/thrift/protocol/ProtocolTestBase.java b/lib/java/test/org/apache/thrift/protocol/ProtocolTestBase.java index 2ac02113fbe..0386d839371 100644 --- a/lib/java/test/org/apache/thrift/protocol/ProtocolTestBase.java +++ b/lib/java/test/org/apache/thrift/protocol/ProtocolTestBase.java @@ -327,6 +327,11 @@ public void methodWithDefaultArgs(int something) throws TException { @Override public void onewayMethod() throws TException { } + + @Override + public boolean declaredExceptionMethod(boolean shouldThrow) throws TException { + return shouldThrow; + } }; Srv.Processor testProcessor = new Srv.Processor(handler); diff --git a/lib/java/test/org/apache/thrift/server/ServerTestBase.java b/lib/java/test/org/apache/thrift/server/ServerTestBase.java index bc1fb96315e..5dd483e3d6c 100755 --- a/lib/java/test/org/apache/thrift/server/ServerTestBase.java +++ b/lib/java/test/org/apache/thrift/server/ServerTestBase.java @@ -563,124 +563,124 @@ private void testException(ThriftTest.Client testClient) throws TException, Xcep } - public static class AsyncTestHandler implements ThriftTest.AsyncIface { + public static class AsyncTestHandler implements ThriftTest.AsyncIface { - TestHandler handler = new TestHandler(); + TestHandler handler = new TestHandler(); - @Override - public void testVoid(AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(null); - } + @Override + public void testVoid(AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(null); + } - @Override - public void testString(String thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testString(thing)); - } + @Override + public void testString(String thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testString(thing)); + } - @Override - public void testBool(boolean thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testBool(thing)); - } + @Override + public void testBool(boolean thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testBool(thing)); + } - @Override - public void testByte(byte thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testByte(thing)); - } + @Override + public void testByte(byte thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testByte(thing)); + } - @Override - public void testI32(int thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testI32(thing)); - } + @Override + public void testI32(int thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testI32(thing)); + } - @Override - public void testI64(long thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testI64(thing)); - } + @Override + public void testI64(long thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testI64(thing)); + } - @Override - public void testDouble(double thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testDouble(thing)); - } + @Override + public void testDouble(double thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testDouble(thing)); + } - @Override - public void testBinary(ByteBuffer thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testBinary(thing)); - } + @Override + public void testBinary(ByteBuffer thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testBinary(thing)); + } - @Override - public void testStruct(Xtruct thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testStruct(thing)); - } + @Override + public void testStruct(Xtruct thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testStruct(thing)); + } - @Override - public void testNest(Xtruct2 thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testNest(thing)); - } + @Override + public void testNest(Xtruct2 thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testNest(thing)); + } - @Override - public void testMap(Map thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testMap(thing)); - } + @Override + public void testMap(Map thing, AsyncMethodCallback> resultHandler) throws TException { + resultHandler.onComplete(handler.testMap(thing)); + } - @Override - public void testStringMap(Map thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testStringMap(thing)); - } + @Override + public void testStringMap(Map thing, AsyncMethodCallback> resultHandler) throws TException { + resultHandler.onComplete(handler.testStringMap(thing)); + } - @Override - public void testSet(Set thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testSet(thing)); - } + @Override + public void testSet(Set thing, AsyncMethodCallback> resultHandler) throws TException { + resultHandler.onComplete(handler.testSet(thing)); + } - @Override - public void testList(List thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testList(thing)); - } + @Override + public void testList(List thing, AsyncMethodCallback> resultHandler) throws TException { + resultHandler.onComplete(handler.testList(thing)); + } - @Override - public void testEnum(Numberz thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testEnum(thing)); - } + @Override + public void testEnum(Numberz thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testEnum(thing)); + } - @Override - public void testTypedef(long thing, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testTypedef(thing)); - } + @Override + public void testTypedef(long thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testTypedef(thing)); + } - @Override - public void testMapMap(int hello, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testMapMap(hello)); - } + @Override + public void testMapMap(int hello, AsyncMethodCallback>> resultHandler) throws TException { + resultHandler.onComplete(handler.testMapMap(hello)); + } - @Override - public void testInsanity(Insanity argument, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testInsanity(argument)); - } + @Override + public void testInsanity(Insanity argument, AsyncMethodCallback>> resultHandler) throws TException { + resultHandler.onComplete(handler.testInsanity(argument)); + } - @Override - public void testMulti(byte arg0, int arg1, long arg2, Map arg3, Numberz arg4, long arg5, AsyncMethodCallback resultHandler) throws TException { - resultHandler.onComplete(handler.testMulti(arg0,arg1,arg2,arg3,arg4,arg5)); - } + @Override + public void testMulti(byte arg0, int arg1, long arg2, Map arg3, Numberz arg4, long arg5, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testMulti(arg0,arg1,arg2,arg3,arg4,arg5)); + } - @Override - public void testException(String arg, AsyncMethodCallback resultHandler) throws TException { - try { - // handler.testException(); - } catch (Exception e) { + @Override + public void testException(String arg, AsyncMethodCallback resultHandler) throws TException { + try { + // handler.testException(); + } catch (Exception e) { - } - } + } + } - @Override - public void testMultiException(String arg0, String arg1, AsyncMethodCallback resultHandler) throws TException { - //To change body of implemented methods use File | Settings | File Templates. - } + @Override + public void testMultiException(String arg0, String arg1, AsyncMethodCallback resultHandler) throws TException { + //To change body of implemented methods use File | Settings | File Templates. + } - @Override - public void testOneway(int secondsToSleep, AsyncMethodCallback resultHandler) throws TException { - handler.testOneway(secondsToSleep); - resultHandler.onComplete(null); - } + @Override + public void testOneway(int secondsToSleep, AsyncMethodCallback resultHandler) throws TException { + handler.testOneway(secondsToSleep); + resultHandler.onComplete(null); } + } } diff --git a/test/DebugProtoTest.thrift b/test/DebugProtoTest.thrift index 9726d004d76..df0fb30868d 100644 --- a/test/DebugProtoTest.thrift +++ b/test/DebugProtoTest.thrift @@ -248,6 +248,8 @@ service Srv { void methodWithDefaultArgs(1: i32 something = MYCONST); oneway void onewayMethod(); + + bool declaredExceptionMethod(1: bool shouldThrow) throws (1: ExceptionWithAMap xwamap); } service Inherited extends Srv { From a97c5fbe1525e4af5610d82087b719466e75df04 Mon Sep 17 00:00:00 2001 From: Nobuaki Sukegawa Date: Sat, 27 Feb 2016 16:03:21 +0900 Subject: [PATCH 2/3] Follow-up THRIFT-3112: key.cancel when error --- lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index a36e961662b..2df9700013d 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -236,6 +236,7 @@ private void cleanUpAndFireCallback(SelectionKey key) { client.onComplete(); callback.onComplete(result); } catch (Exception e) { + key.cancel(); onError(e); } } From 2c5df874e8459b445b7e689a35e98e83709780d8 Mon Sep 17 00:00:00 2001 From: Nobuaki Sukegawa Date: Sat, 27 Feb 2016 16:04:30 +0900 Subject: [PATCH 3/3] THRIFT-3680 Java async processor fails to notify errors to clients --- compiler/cpp/src/generate/t_java_generator.cc | 86 +++++++++++-------- lib/java/build.xml | 0 .../apache/thrift/AsyncProcessFunction.java | 6 +- .../apache/thrift/TBaseAsyncProcessor.java | 10 ++- .../thrift/TNonblockingMultiFetchClient.java | 0 .../thrift/TNonblockingMultiFetchStats.java | 0 .../src/org/apache/thrift/TServiceClient.java | 4 +- .../apache/thrift/async/TAsyncMethodCall.java | 2 +- .../thrift/server/TThreadPoolServer.java | 0 .../transport/TSSLTransportFactory.java | 0 .../apache/thrift/server/ServerTestBase.java | 47 ++++++---- 11 files changed, 96 insertions(+), 59 deletions(-) mode change 100755 => 100644 lib/java/build.xml mode change 100755 => 100644 lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java mode change 100755 => 100644 lib/java/src/org/apache/thrift/TNonblockingMultiFetchStats.java mode change 100755 => 100644 lib/java/src/org/apache/thrift/server/TThreadPoolServer.java mode change 100755 => 100644 lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java mode change 100755 => 100644 lib/java/test/org/apache/thrift/server/ServerTestBase.java diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc index 2613a667267..0fa51cd5d2d 100644 --- a/compiler/cpp/src/generate/t_java_generator.cc +++ b/compiler/cpp/src/generate/t_java_generator.cc @@ -438,8 +438,6 @@ string t_java_generator::java_type_imports() { + "import org.apache.thrift.protocol.TProtocolException;\n" + "import org.apache.thrift.EncodingUtils;\n" + option - + "import org.apache.thrift.TException;\n" - + "import org.apache.thrift.async.AsyncMethodCallback;\n" + "import org.apache.thrift.server.AbstractNonblockingServer.*;\n" + "import java.util.List;\n" + "import java.util.ArrayList;\n" + "import java.util.Map;\n" + "import java.util.HashMap;\n" + "import java.util.EnumMap;\n" + "import java.util.Set;\n" @@ -1366,7 +1364,7 @@ void t_java_generator::generate_java_struct_definition(ofstream& out, << tstruct->get_name() << " "; if (is_exception) { - out << "extends TException "; + out << "extends org.apache.thrift.TException "; } out << "implements org.apache.thrift.TBase<" << tstruct->get_name() << ", " << tstruct->get_name() << "._Fields>, java.io.Serializable, Cloneable, Comparable<" << tstruct->get_name() << ">"; @@ -3303,11 +3301,12 @@ void t_java_generator::generate_process_async_function(t_service* tservice, t_fu indent(f_service_) << " return new " << argsname << "();" << endl; indent(f_service_) << "}" << endl << endl; - indent(f_service_) << "public AsyncMethodCallback<" << resulttype + indent(f_service_) << "public org.apache.thrift.async.AsyncMethodCallback<" << resulttype << "> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {" << endl; indent_up(); indent(f_service_) << "final org.apache.thrift.AsyncProcessFunction fcall = this;" << endl; - indent(f_service_) << "return new AsyncMethodCallback<" << resulttype << ">() { " << endl; + indent(f_service_) << "return new org.apache.thrift.async.AsyncMethodCallback<" << resulttype + << ">() { " << endl; indent_up(); indent(f_service_) << "public void onComplete(" << resulttype << " o) {" << endl; @@ -3341,48 +3340,65 @@ void t_java_generator::generate_process_async_function(t_service* tservice, t_fu indent(f_service_) << "public void onError(Exception e) {" << endl; indent_up(); - if (!tfunction->is_oneway()) { + if (tfunction->is_oneway()) { + f_service_ << indent() << "LOGGER.error(\"Exception inside oneway handler\", e);" << endl; + } else { indent(f_service_) << "byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;" << endl; - indent(f_service_) << "org.apache.thrift.TBase msg;" << endl; + indent(f_service_) << "org.apache.thrift.TSerializable msg;" << endl; indent(f_service_) << resultname << " result = new " << resultname << "();" << endl; t_struct* xs = tfunction->get_xceptions(); const std::vector& xceptions = xs->get_members(); + vector::const_iterator x_iter; if (xceptions.size() > 0) { for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { - if (x_iter != xceptions.begin()) - indent(f_service_) << "else "; - indent(f_service_) << "if (e instanceof " << type_name((*x_iter)->get_type(), false, false) - << ") {" << endl; - indent(f_service_) << indent() << "result." << (*x_iter)->get_name() << " = (" - << type_name((*x_iter)->get_type(), false, false) << ") e;" << endl; - indent(f_service_) << indent() << "result.set" << get_cap_name((*x_iter)->get_name()) - << get_cap_name("isSet") << "(true);" << endl; - indent(f_service_) << indent() << "msg = result;" << endl; - - indent(f_service_) << "}" << endl; + if (x_iter == xceptions.begin()) + f_service_ << indent(); + string type = type_name((*x_iter)->get_type(), false, false); + string name = (*x_iter)->get_name(); + f_service_ << "if (e instanceof " << type << ") {" << endl; + indent_up(); + f_service_ << indent() << "result." << name << " = (" << type << ") e;" << endl + << indent() << "result.set" << get_cap_name(name) << get_cap_name("isSet") + << "(true);" << endl + << indent() << "msg = result;" << endl; + indent_down(); + indent(f_service_) << "} else "; } - indent(f_service_) << " else " << endl; + } else { + indent(f_service_); } - - indent(f_service_) << "{" << endl; + f_service_ << "if (e instanceof org.apache.thrift.transport.TTransportException) {" << endl; indent_up(); - indent(f_service_) << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl; - indent(f_service_) << "msg = (org.apache.thrift.TBase)new " - "org.apache.thrift.TApplicationException(org.apache.thrift." - "TApplicationException.INTERNAL_ERROR, e.getMessage());" << endl; + f_service_ << indent() << "LOGGER.error(\"TTransportException inside handler\", e);" << endl + << indent() << "fb.close();" << endl + << indent() << "return;" << endl; indent_down(); - indent(f_service_) << "}" << endl; - - indent(f_service_) << "try {" << endl; - indent(f_service_) << " fcall.sendResponse(fb,msg,msgType,seqid);" << endl; - indent(f_service_) << " return;" << endl; - indent(f_service_) << "} catch (Exception ex) {" << endl; - indent(f_service_) << " LOGGER.error(\"Exception writing to internal frame buffer\", ex);" + indent(f_service_) << "} else if (e instanceof org.apache.thrift.TApplicationException) {" << endl; - indent(f_service_) << "}" << endl; - indent(f_service_) << "fb.close();" << endl; + indent_up(); + f_service_ << indent() << "LOGGER.error(\"TApplicationException inside handler\", e);" << endl + << indent() << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl + << indent() << "msg = (org.apache.thrift.TApplicationException)e;" << endl; + indent_down(); + indent(f_service_) << "} else {" << endl; + indent_up(); + f_service_ << indent() << "LOGGER.error(\"Exception inside handler\", e);" << endl + << indent() << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl + << indent() << "msg = new " + "org.apache.thrift.TApplicationException(org.apache.thrift." + "TApplicationException.INTERNAL_ERROR, e.getMessage());" + << endl; + indent_down(); + f_service_ << indent() << "}" << endl + << indent() << "try {" << endl + << indent() << " fcall.sendResponse(fb,msg,msgType,seqid);" << endl + << indent() << "} catch (Exception ex) {" << endl + << indent() << " LOGGER.error(\"Exception writing to internal frame buffer\", ex);" + << endl + << indent() << " fb.close();" << endl + << indent() << "}" << endl; } indent_down(); indent(f_service_) << "}" << endl; @@ -3397,7 +3413,7 @@ void t_java_generator::generate_process_async_function(t_service* tservice, t_fu indent(f_service_) << "public void start(I iface, " << argsname << " args, org.apache.thrift.async.AsyncMethodCallback<" << resulttype - << "> resultHandler) throws TException {" << endl; + << "> resultHandler) throws org.apache.thrift.TException {" << endl; indent_up(); // Generate the function call diff --git a/lib/java/build.xml b/lib/java/build.xml old mode 100755 new mode 100644 diff --git a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java index 799e02d58ac..550ebd532b7 100644 --- a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java +++ b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java @@ -24,7 +24,7 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.AbstractNonblockingServer; -public abstract class AsyncProcessFunction { +public abstract class AsyncProcessFunction { final String methodName; public AsyncProcessFunction(String methodName) { @@ -37,13 +37,13 @@ public AsyncProcessFunction(String methodName) { public abstract T getEmptyArgsInstance(); - public abstract AsyncMethodCallback getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid); + public abstract AsyncMethodCallback getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid); public String getMethodName() { return methodName; } - public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TBase result, final byte type, final int seqid) throws TException { + public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TSerializable result, final byte type, final int seqid) throws TException { TProtocol oprot = fb.getOutputProtocol(); oprot.writeMessageBegin(new TMessage(getMethodName(), type, seqid)); diff --git a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java index ed6c323451c..d0257dbbf51 100644 --- a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java +++ b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java @@ -19,6 +19,7 @@ package org.apache.thrift; import org.apache.thrift.protocol.*; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.server.AbstractNonblockingServer.*; import org.slf4j.Logger; @@ -63,7 +64,7 @@ public boolean process(final AsyncFrameBuffer fb) throws TException { } //Get Args - TBase args = (TBase)fn.getEmptyArgsInstance(); + TBase args = fn.getEmptyArgsInstance(); try { args.read(in); @@ -81,7 +82,12 @@ public boolean process(final AsyncFrameBuffer fb) throws TException { //start off processing function - fn.start(iface, args,fn.getResultHandler(fb,msg.seqid)); + AsyncMethodCallback resultHandler = fn.getResultHandler(fb,msg.seqid); + try { + fn.start(iface, args, resultHandler); + } catch (Exception e) { + resultHandler.onError(e); + } return true; } diff --git a/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java b/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java old mode 100755 new mode 100644 diff --git a/lib/java/src/org/apache/thrift/TNonblockingMultiFetchStats.java b/lib/java/src/org/apache/thrift/TNonblockingMultiFetchStats.java old mode 100755 new mode 100644 diff --git a/lib/java/src/org/apache/thrift/TServiceClient.java b/lib/java/src/org/apache/thrift/TServiceClient.java index 31153eccfe4..6619b9ce68d 100644 --- a/lib/java/src/org/apache/thrift/TServiceClient.java +++ b/lib/java/src/org/apache/thrift/TServiceClient.java @@ -81,8 +81,10 @@ protected void receiveBase(TBase result, String methodName) throws TExcepti iprot_.readMessageEnd(); throw x; } + System.out.format("Received %d%n", msg.seqid); if (msg.seqid != seqid_) { - throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response"); + throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, + String.format("%s failed: out of sequence response: expected %d but got %d", methodName, seqid_, msg.seqid)); } result.read(iprot_); iprot_.readMessageEnd(); diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index 2df9700013d..99ca4dc4c6b 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -173,7 +173,7 @@ protected ByteBuffer getFrameBuffer() { * select interests without worrying about concurrency. * @param key */ - protected void transition(SelectionKey key) { + void transition(SelectionKey key) { // Ensure key is valid if (!key.isValid()) { key.cancel(); diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java old mode 100755 new mode 100644 diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java old mode 100755 new mode 100644 diff --git a/lib/java/test/org/apache/thrift/server/ServerTestBase.java b/lib/java/test/org/apache/thrift/server/ServerTestBase.java old mode 100755 new mode 100644 index 5dd483e3d6c..f4c1f2702ae --- a/lib/java/test/org/apache/thrift/server/ServerTestBase.java +++ b/lib/java/test/org/apache/thrift/server/ServerTestBase.java @@ -223,12 +223,12 @@ public Xtruct testMulti(byte arg0, int arg1, long arg2, Map arg3, public void testException(String arg) throws Xception, TException { System.out.print("testException("+arg+")\n"); - if (arg.equals("Xception")) { + if ("Xception".equals(arg)) { Xception x = new Xception(); x.errorCode = 1001; x.message = arg; throw x; - } else if (arg.equals("TException")) { + } else if ("TException".equals(arg)) { throw new TException(arg); } else { Xtruct result = new Xtruct(); @@ -411,8 +411,10 @@ public void testIt() throws Exception { testTypedef(testClient); testNestedMap(testClient); testInsanity(testClient); - testOneway(testClient); testException(testClient); + testOneway(testClient); + // FIXME: a call after oneway does not work for async client + // testI32(testClient); transport.close(); stopServer(); @@ -548,18 +550,19 @@ public void testTransportFactory() throws Exception { } private void testException(ThriftTest.Client testClient) throws TException, Xception { - //@TODO testException - //testClient.testException("no Exception"); - /*try { - testClient.testException("Xception"); + try { + testClient.testException("Xception"); + assert false; } catch(Xception e) { - assertEquals(e.message, "Xception"); - }*/ - /*try { - testClient.testException("ApplicationException"); + assertEquals(e.message, "Xception"); + assertEquals(e.errorCode, 1001); + } + try { + testClient.testException("TException"); + assert false; } catch(TException e) { - assertEquals(e.message, "ApplicationException"); - }*/ + } + testClient.testException("no Exception"); } @@ -664,11 +667,21 @@ public void testMulti(byte arg0, int arg1, long arg2, Map arg3, N @Override public void testException(String arg, AsyncMethodCallback resultHandler) throws TException { - try { - // handler.testException(); - } catch (Exception e) { - + System.out.print("testException("+arg+")\n"); + if ("Xception".equals(arg)) { + Xception x = new Xception(); + x.errorCode = 1001; + x.message = arg; + // throw and onError yield the same result. + // resultHandler.onError(x); + // return; + throw x; + } else if ("TException".equals(arg)) { + // throw new TException(arg); + resultHandler.onError(new TException(arg)); + return; } + resultHandler.onComplete(null); } @Override