From 4eb81d3044c0f663d580cdbd3b611d5e3b1b4ac5 Mon Sep 17 00:00:00 2001 From: Tadayoshi Sato Date: Tue, 25 Oct 2016 18:45:42 +0900 Subject: [PATCH] [CXF-7109] ClientCallback may be invoked twice when Async HTTP Transport is used --- .../org/apache/cxf/endpoint/ClientImpl.java | 14 ++++++++++--- .../interceptor/ClientOutFaultObserver.java | 3 ++- .../cxf/message/AbstractWrappedMessage.java | 3 +++ .../org/apache/cxf/message/ExchangeImpl.java | 4 ++++ .../org/apache/cxf/message/StringMap.java | 7 +++++++ .../org/apache/cxf/message/StringMapImpl.java | 4 ++++ .../asyncclient/AsyncHTTPConduitTest.java | 21 +++++++++++++++++++ .../cxf/transport/http/HTTPConduit.java | 11 +++++----- 8 files changed, 58 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java index 428f79454b1..848d3bcabe7 100644 --- a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java +++ b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java @@ -791,8 +791,11 @@ public void onMessage(Message message) { if (resCtx != null) { responseContext.put(Thread.currentThread(), resCtx); } - callback.handleException(resCtx, error); - + // remove callback so that it won't be invoked twice + callback = message.getExchange().remove(ClientCallback.class); + if (callback != null) { + callback.handleException(resCtx, error); + } } } else { chain.doIntercept(message); @@ -801,8 +804,13 @@ public void onMessage(Message message) { } callback = message.getExchange().get(ClientCallback.class); + if (callback == null || isPartialResponse(message)) { + return; + } - if (callback != null && !isPartialResponse(message)) { + // remove callback so that it won't be invoked twice + callback = message.getExchange().remove(ClientCallback.class); + if (callback != null) { message.getExchange().setInMessage(message); Map resCtx = CastUtils.cast((Map)message .getExchange() diff --git a/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java b/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java index 0c879c52af4..f139da90a2e 100644 --- a/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java +++ b/core/src/main/java/org/apache/cxf/interceptor/ClientOutFaultObserver.java @@ -50,7 +50,8 @@ public void onMessage(Message m) { return; } Exception ex = m.getContent(Exception.class); - ClientCallback callback = m.getExchange().get(ClientCallback.class); + // remove callback so that it won't be invoked twice + ClientCallback callback = m.getExchange().remove(ClientCallback.class); if (callback != null) { Map resCtx = CastUtils.cast((Map) m.getExchange().getOutMessage().get( diff --git a/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java b/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java index 496b97bc677..094e4322da0 100644 --- a/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java +++ b/core/src/main/java/org/apache/cxf/message/AbstractWrappedMessage.java @@ -157,6 +157,9 @@ public T get(Class key) { public void put(Class key, T value) { message.put(key, value); } + public T remove(Class key) { + return message.remove(key); + } public Object getContextualProperty(String key) { return message.getContextualProperty(key); diff --git a/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java b/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java index d2abd7a85ec..4798827da8d 100644 --- a/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java +++ b/core/src/main/java/org/apache/cxf/message/ExchangeImpl.java @@ -159,6 +159,10 @@ public Object put(String key, Object value) { return super.put(key, value); } + public T remove(Class key) { + return key.cast(super.remove(key.getName())); + } + private void setMessageContextProperty(Message m, String key, Object value) { if (m == null) { return; diff --git a/core/src/main/java/org/apache/cxf/message/StringMap.java b/core/src/main/java/org/apache/cxf/message/StringMap.java index 4d802b817f7..87115fe01ee 100644 --- a/core/src/main/java/org/apache/cxf/message/StringMap.java +++ b/core/src/main/java/org/apache/cxf/message/StringMap.java @@ -38,4 +38,11 @@ public interface StringMap extends Map { * @param value the value */ void put(Class key, T value); + + /** + * Convenience method for removing typed objects from the map. + * equivalent to: (T)remove(key.getName()); + * @param key the key + */ + T remove(Class key); } diff --git a/core/src/main/java/org/apache/cxf/message/StringMapImpl.java b/core/src/main/java/org/apache/cxf/message/StringMapImpl.java index c1428e455c2..f3acdc126b0 100644 --- a/core/src/main/java/org/apache/cxf/message/StringMapImpl.java +++ b/core/src/main/java/org/apache/cxf/message/StringMapImpl.java @@ -46,4 +46,8 @@ public T get(Class key) { public void put(Class key, T value) { put(key.getName(), value); } + + public T remove(Class key) { + return key.cast(remove(key.getName())); + } } diff --git a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java index 3341e3eceda..d4a25f6cce4 100644 --- a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java +++ b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.xml.ws.AsyncHandler; import javax.xml.ws.Endpoint; @@ -197,6 +198,26 @@ public void handleResponse(Response res) { } }).get(); } + + @Test + public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception { + // This test is especially targeted for RHEL 6.8 + updateAddressPort(g, PORT_INV); + int repeat = 100; + final AtomicInteger count = new AtomicInteger(0); + for (int i = 0; i < repeat; i++) { + try { + g.greetMeAsync(request, new AsyncHandler() { + public void handleResponse(Response res) { + count.incrementAndGet(); + } + }).get(); + } catch (Exception e) { + } + } + Thread.sleep(1000); + assertEquals("Callback should be invoked only once per request", repeat, count.intValue()); + } @Test @Ignore("peformance test") diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java index adebe09fd92..e5aa543a026 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java @@ -1626,12 +1626,13 @@ protected void handleResponseInternal() throws IOException { if (isOneway(exchange) && responseCode > 300) { throw new HTTPException(responseCode, getResponseMessage(), url.toURL()); } - ClientCallback cc = exchange.get(ClientCallback.class); - if (null != cc) { - //REVISIT move the decoupled destination property name into api - Endpoint ep = exchange.getEndpoint(); - if (null != ep && null != ep.getEndpointInfo() && null == ep.getEndpointInfo(). + //REVISIT move the decoupled destination property name into api + Endpoint ep = exchange.getEndpoint(); + if (null != ep && null != ep.getEndpointInfo() && null == ep.getEndpointInfo(). getProperty("org.apache.cxf.ws.addressing.MAPAggregator.decoupledDestination")) { + // remove callback so that it won't be invoked twice + ClientCallback cc = exchange.remove(ClientCallback.class); + if (null != cc) { cc.handleResponse(null, null); } }