diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java b/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java index 150e946b690..3d6fe289f8f 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java @@ -38,12 +38,7 @@ */ public abstract class Proxy { - public static final InvocationHandler RETURN_NULL_INVOKER = new InvocationHandler() { - @Override - public Object invoke(Object proxy, Method method, Object[] args) { - return null; - } - }; + public static final InvocationHandler RETURN_NULL_INVOKER = (proxy, method, args) -> null; public static final InvocationHandler THROW_UNSUPPORTED_INVOKER = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) { @@ -107,11 +102,7 @@ public static Proxy getProxy(ClassLoader cl, Class... ics) { // get cache by class loader. Map cache; synchronized (ProxyCacheMap) { - cache = ProxyCacheMap.get(cl); - if (cache == null) { - cache = new HashMap(); - ProxyCacheMap.put(cl, cache); - } + cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>()); } Proxy proxy = null; @@ -144,8 +135,8 @@ public static Proxy getProxy(ClassLoader cl, Class... ics) { try { ccp = ClassGenerator.newInstance(cl); - Set worked = new HashSet(); - List methods = new ArrayList(); + Set worked = new HashSet<>(); + List methods = new ArrayList<>(); for (int i = 0; i < ics.length; i++) { if (!Modifier.isPublic(ics[i].getModifiers())) { @@ -175,7 +166,7 @@ public static Proxy getProxy(ClassLoader cl, Class... ics) { for (int j = 0; j < pts.length; j++) { code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); } - code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);"); + code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);"); if (!Void.TYPE.equals(rt)) { code.append(" return ").append(asArgument(rt, "ret")).append(";"); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java index 50c7bb10a2b..4b25eb02b52 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.ConcurrentHashSet; import org.apache.dubbo.common.utils.ConfigUtils; import org.apache.dubbo.common.utils.NamedThreadFactory; @@ -51,7 +52,6 @@ /** * AbstractRegistry. (SPI, Prototype, ThreadSafe) - * */ public abstract class AbstractRegistry implements Registry { @@ -61,16 +61,16 @@ public abstract class AbstractRegistry implements Registry { private static final String URL_SPLIT = "\\s+"; // Log output protected final Logger logger = LoggerFactory.getLogger(getClass()); - // Local disk cache, where the special key value.registies records the list of registry centers, and the others are the list of notified service providers + // Local disk cache, where the special key value.registries records the list of registry centers, and the others are the list of notified service providers private final Properties properties = new Properties(); // File cache timing writing private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); // Is it synchronized to save the file private final boolean syncSaveFile; private final AtomicLong lastCacheChanged = new AtomicLong(); - private final Set registered = new ConcurrentHashSet(); - private final ConcurrentMap> subscribed = new ConcurrentHashMap>(); - private final ConcurrentMap>> notified = new ConcurrentHashMap>>(); + private final Set registered = new ConcurrentHashSet<>(); + private final ConcurrentMap> subscribed = new ConcurrentHashMap<>(); + private final ConcurrentMap>> notified = new ConcurrentHashMap<>(); private URL registryUrl; // Local disk cache file private File file; @@ -90,13 +90,15 @@ public AbstractRegistry(URL url) { } } this.file = file; + // When starting the subscription center, + // we need to read the local cache file for future Registry fault tolerance processing. loadProperties(); notify(url.getBackupUrls()); } protected static List filterEmpty(URL url, List urls) { - if (urls == null || urls.isEmpty()) { - List result = new ArrayList(1); + if (CollectionUtils.isEmpty(urls)) { + List result = new ArrayList<>(1); result.add(url.setProtocol(Constants.EMPTY_PROTOCOL)); return result; } @@ -221,7 +223,7 @@ public List getCacheUrls(URL url) { && (Character.isLetter(key.charAt(0)) || key.charAt(0) == '_') && value != null && value.length() > 0) { String[] arr = value.trim().split(URL_SPLIT); - List urls = new ArrayList(); + List urls = new ArrayList<>(); for (String u : arr) { urls.add(URL.valueOf(u)); } @@ -233,7 +235,7 @@ public List getCacheUrls(URL url) { @Override public List lookup(URL url) { - List result = new ArrayList(); + List result = new ArrayList<>(); Map> notifiedUrls = getNotified().get(url); if (notifiedUrls != null && notifiedUrls.size() > 0) { for (List urls : notifiedUrls.values()) { @@ -244,11 +246,11 @@ public List lookup(URL url) { } } } else { - final AtomicReference> reference = new AtomicReference>(); + final AtomicReference> reference = new AtomicReference<>(); NotifyListener listener = reference::set; subscribe(url, listener); // Subscribe logic guarantees the first notify to return List urls = reference.get(); - if (urls != null && !urls.isEmpty()) { + if (CollectionUtils.isNotEmpty(urls)) { for (URL u : urls) { if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) { result.add(u); @@ -292,10 +294,7 @@ public void subscribe(URL url, NotifyListener listener) { if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } - - Set listeners = subscribed.computeIfAbsent(url, k -> { - return new ConcurrentHashSet<>(); - }); + Set listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>()); listeners.add(listener); } @@ -318,7 +317,7 @@ public void unsubscribe(URL url, NotifyListener listener) { protected void recover() throws Exception { // register - Set recoverRegistered = new HashSet(getRegistered()); + Set recoverRegistered = new HashSet<>(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); @@ -328,7 +327,7 @@ protected void recover() throws Exception { } } // subscribe - Map> recoverSubscribed = new HashMap>(getSubscribed()); + Map> recoverSubscribed = new HashMap<>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); @@ -343,7 +342,7 @@ protected void recover() throws Exception { } protected void notify(List urls) { - if (urls == null || urls.isEmpty()) { + if (CollectionUtils.isEmpty(urls)) { return; } @@ -367,6 +366,13 @@ protected void notify(List urls) { } } + /** + * Notify changes from the Provider side. + * + * @param url consumer side url + * @param listener listener + * @param urls provider latest urls + */ protected void notify(URL url, NotifyListener listener, List urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); @@ -374,7 +380,7 @@ protected void notify(URL url, NotifyListener listener, List urls) { if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } - if ((urls == null || urls.isEmpty()) + if ((CollectionUtils.isEmpty(urls)) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; @@ -382,28 +388,27 @@ protected void notify(URL url, NotifyListener listener, List urls) { if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } - Map> result = new HashMap>(); + // keep every provider's category. + Map> result = new HashMap<>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); - List categoryList = result.computeIfAbsent(category, k -> { - return new ArrayList<>(); - }); + List categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); categoryList.add(u); } } if (result.size() == 0) { return; } - Map> categoryNotified = notified.computeIfAbsent(url, k -> { - return new ConcurrentHashMap<>(); - }); + Map> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry> entry : result.entrySet()) { String category = entry.getKey(); List categoryList = entry.getValue(); categoryNotified.put(category, categoryList); - saveProperties(url); listener.notify(categoryList); + // We will update our cache file after each notification. + // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL. + saveProperties(url); } } @@ -442,9 +447,9 @@ public void destroy() { if (logger.isInfoEnabled()) { logger.info("Destroy registry:" + getUrl()); } - Set destroyRegistered = new HashSet(getRegistered()); + Set destroyRegistered = new HashSet<>(getRegistered()); if (!destroyRegistered.isEmpty()) { - for (URL url : new HashSet(getRegistered())) { + for (URL url : new HashSet<>(getRegistered())) { if (url.getParameter(Constants.DYNAMIC_KEY, true)) { try { unregister(url); @@ -457,7 +462,7 @@ public void destroy() { } } } - Map> destroySubscribed = new HashMap>(getSubscribed()); + Map> destroySubscribed = new HashMap<>(getSubscribed()); if (!destroySubscribed.isEmpty()) { for (Map.Entry> entry : destroySubscribed.entrySet()) { URL url = entry.getKey(); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java index c7a4bb9bcba..cbf1527231f 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.timer.HashedWheelTimer; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.retry.FailedNotifiedTask; @@ -291,7 +292,7 @@ public void subscribe(URL url, NotifyListener listener) { Throwable t = e; List urls = getCacheUrls(url); - if (urls != null && !urls.isEmpty()) { + if (CollectionUtils.isNotEmpty(urls)) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { diff --git a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java index 06835dac6e5..86d871fe69a 100644 --- a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java +++ b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java @@ -397,7 +397,7 @@ public void unsubscribe(URL url, NotifyListener listener) { @Override public List lookup(URL url) { - List urls = new ArrayList(); + List urls = new ArrayList<>(); Map> notifiedUrls = getNotified().get(url); if (notifiedUrls != null && notifiedUrls.size() > 0) { for (List values : notifiedUrls.values()) { diff --git a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java index c0c7c22ff22..0a8798442c6 100644 --- a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java +++ b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.ExecutorUtil; import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.StringUtils; @@ -36,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -155,14 +157,11 @@ public RedisRegistry(URL url) { this.root = group; this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT); - this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - deferExpired(); // Extend the expiration time - } catch (Throwable t) { // Defensive fault tolerance - logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t); - } + this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> { + try { + deferExpired(); // Extend the expiration time + } catch (Throwable t) { // Defensive fault tolerance + logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t); } }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS); } @@ -171,9 +170,8 @@ private void deferExpired() { for (Map.Entry entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { - Jedis jedis = jedisPool.getResource(); - try { - for (URL url : new HashSet(getRegistered())) { + try (Jedis jedis = jedisPool.getResource()) { + for (URL url : new HashSet<>(getRegistered())) { if (url.getParameter(Constants.DYNAMIC_KEY, true)) { String key = toCategoryPath(url); if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) { @@ -187,8 +185,6 @@ private void deferExpired() { if (!replicate) { break;//  If the server side has synchronized data, just write a single machine } - } finally { - jedis.close(); } } catch (Throwable t) { logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t); @@ -230,13 +226,10 @@ private void clean(Jedis jedis) { public boolean isAvailable() { for (JedisPool jedisPool : jedisPools.values()) { try { - Jedis jedis = jedisPool.getResource(); - try { + try (Jedis jedis = jedisPool.getResource()) { if (jedis.isConnected()) { return true; // At least one single machine is available. } - } finally { - jedis.close(); } } catch (Throwable t) { } @@ -280,16 +273,13 @@ public void doRegister(URL url) { for (Map.Entry entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { - Jedis jedis = jedisPool.getResource(); - try { + try (Jedis jedis = jedisPool.getResource()) { jedis.hset(key, value, expire); jedis.publish(key, Constants.REGISTER); success = true; if (!replicate) { break; //  If the server side has synchronized data, just write a single machine } - } finally { - jedis.close(); } } catch (Throwable t) { exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); @@ -313,16 +303,13 @@ public void doUnregister(URL url) { for (Map.Entry entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { - Jedis jedis = jedisPool.getResource(); - try { + try (Jedis jedis = jedisPool.getResource()) { jedis.hdel(key, value); jedis.publish(key, Constants.UNREGISTER); success = true; if (!replicate) { break; //  If the server side has synchronized data, just write a single machine } - } finally { - jedis.close(); } } catch (Throwable t) { exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); @@ -354,8 +341,7 @@ public void doSubscribe(final URL url, final NotifyListener listener) { for (Map.Entry entry : jedisPools.entrySet()) { JedisPool jedisPool = entry.getValue(); try { - Jedis jedis = jedisPool.getResource(); - try { + try (Jedis jedis = jedisPool.getResource()) { if (service.endsWith(Constants.ANY_VALUE)) { admin = true; Set keys = jedis.keys(service); @@ -363,24 +349,18 @@ public void doSubscribe(final URL url, final NotifyListener listener) { Map> serviceKeys = new HashMap<>(); for (String key : keys) { String serviceKey = toServicePath(key); - Set sk = serviceKeys.get(serviceKey); - if (sk == null) { - sk = new HashSet<>(); - serviceKeys.put(serviceKey, sk); - } + Set sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>()); sk.add(key); } for (Set sk : serviceKeys.values()) { - doNotify(jedis, sk, url, Arrays.asList(listener)); + doNotify(jedis, sk, url, Collections.singletonList(listener)); } } } else { - doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Arrays.asList(listener)); + doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Collections.singletonList(listener)); } success = true; break; // Just read one server's data - } finally { - jedis.close(); } } catch (Throwable t) { // Try the next server exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t); @@ -401,7 +381,7 @@ public void doUnsubscribe(URL url, NotifyListener listener) { private void doNotify(Jedis jedis, String key) { for (Map.Entry> entry : new HashMap<>(getSubscribed()).entrySet()) { - doNotify(jedis, Arrays.asList(key), entry.getKey(), new HashSet<>(entry.getValue())); + doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue())); } } @@ -449,7 +429,7 @@ private void doNotify(Jedis jedis, Collection keys, URL url, Collection< logger.info("redis notify: " + key + " = " + urls); } } - if (result == null || result.isEmpty()) { + if (CollectionUtils.isEmpty(result)) { return; } for (NotifyListener listener : listeners) { diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java index 37f1f21a16f..cdac09f2eb5 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.ConcurrentHashSet; import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.registry.NotifyListener; @@ -264,7 +265,7 @@ private String toUrlPath(URL url) { private List toUrlsWithoutEmpty(URL consumer, List providers) { List urls = new ArrayList<>(); - if (providers != null && !providers.isEmpty()) { + if (CollectionUtils.isNotEmpty(providers)) { for (String provider : providers) { provider = URL.decode(provider); if (provider.contains(Constants.PROTOCOL_SEPARATOR)) { diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java index 2d8981170f3..265d6ef6676 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java @@ -16,16 +16,15 @@ */ package org.apache.dubbo.remoting.transport.netty4; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.exchange.Request; import org.apache.dubbo.remoting.exchange.Response; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; - /** * NettyClientHandler */ @@ -47,7 +46,6 @@ public NettyClientHandler(URL url, ChannelHandler handler) { this.handler = handler; } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); @@ -78,27 +76,33 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } - @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { super.write(ctx, msg, promise); - NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); - try { - // if error happens from write, mock a BAD_REQUEST response so that invoker can return immediately without - // waiting until timeout. FIXME: not sure if this is the right approach, but exceptionCaught doesn't work - // as expected. - if (promise.cause() != null && msg instanceof Request) { - Request request = (Request) msg; - Response response = new Response(request.getId(), request.getVersion()); - response.setStatus(Response.BAD_REQUEST); - response.setErrorMessage(StringUtils.toString(promise.cause())); - handler.received(channel, response); - } else { - handler.sent(channel, msg); + final NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); + final boolean isRequest = msg instanceof Request; + + // We add listeners to make sure our out bound event is correct. + // If our out bound event has an error (in most cases the encoder fails), + // we need to have the request return directly instead of blocking the invoke process. + promise.addListener(future -> { + try { + if (future.isSuccess()) { + // if our future is success, mark the future to sent. + handler.sent(channel, msg); + return; + } + + Throwable t = future.cause(); + if (t != null && isRequest) { + Request request = (Request) msg; + Response response = buildErrorResponse(request, t); + handler.received(channel, response); + } + } finally { + NettyChannel.removeChannelIfDisconnected(ctx.channel()); } - } finally { - NettyChannel.removeChannelIfDisconnected(ctx.channel()); - } + }); } @Override @@ -111,4 +115,18 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } + + /** + * build a bad request's response + * + * @param request the request + * @param t the throwable. In most cases, serialization fails. + * @return the response + */ + private static Response buildErrorResponse(Request request, Throwable t) { + Response response = new Response(request.getId(), request.getVersion()); + response.setStatus(Response.BAD_REQUEST); + response.setErrorMessage(StringUtils.toString(t)); + return response; + } } \ No newline at end of file