From 8b9173081045b31aff4064af3a44ce502c658c18 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Tue, 3 Apr 2018 14:52:36 -0500 Subject: [PATCH] [STORM-3017] Refactor pacemaker client exception handling --- .../storm/cluster/PaceMakerStateStorage.java | 2 +- .../storm/pacemaker/PacemakerClient.java | 39 +++++++++++-------- .../storm/pacemaker/PacemakerClientPool.java | 27 +++++++++++-- 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java index bc2cb7456a0..06f16af8fbc 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java @@ -124,7 +124,7 @@ public void set_worker_hb(String path, byte[] data, List acls) { } LOG.debug("Successful set_worker_hb"); break; - } catch (HBExecutionException e) { + } catch (HBExecutionException|PacemakerConnectionException e) { if (retry <= 0) { throw new RuntimeException(e); } diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java index e3890463563..8b46f552c9b 100644 --- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java +++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java @@ -56,6 +56,7 @@ public class PacemakerClient implements ISaslClient { private HBMessage messages[]; private LinkedBlockingQueue availableMessageSlots; private ThriftNettyClientCodec.AuthMethod authMethod; + private static final int maxRetries = 10; private static Timer timer = new Timer(true); @@ -169,7 +170,7 @@ public String secretKey() { return secret; } - public HBMessage send(HBMessage m) throws InterruptedException { + public HBMessage send(HBMessage m) throws PacemakerConnectionException, InterruptedException { LOG.debug("Sending message: {}", m.toString()); int next = availableMessageSlots.take(); @@ -177,7 +178,8 @@ public HBMessage send(HBMessage m) throws InterruptedException { m.set_message_id(next); messages[next] = m; LOG.debug("Put message in slot: {}", Integer.toString(next)); - do { + int retry = maxRetries; + while (true) { try { waitUntilReady(); Channel channel = channelRef.get(); @@ -185,23 +187,26 @@ public HBMessage send(HBMessage m) throws InterruptedException { channel.write(m); m.wait(1000); } - } catch (PacemakerConnectionException exp) { - LOG.error("error attempting to write to a channel {}", exp); + if (messages[next] != m && messages[next] != null) { + // messages[next] == null can happen if we lost the connection and subsequently reconnected or timed out. + HBMessage ret = messages[next]; + messages[next] = null; + LOG.debug("Got Response: {}", ret); + return ret; + } + } catch (PacemakerConnectionException e) { + if (retry <= 0) { + throw e; + } + LOG.error("error attempting to write to a channel {}.", e.getMessage()); } - } while (messages[next] == m); - } - - HBMessage ret = messages[next]; - if (ret == null) { - // This can happen if we lost the connection and subsequently reconnected or timed out. - LOG.warn("Got null response. This can happen if we lost the connection and subsequently reconnected or timed out. " - + "Resending message..."); - ret = send(m); + if (retry <= 0) { + throw new PacemakerConnectionException("couldn't get response after " + maxRetries + " attempts."); + } + retry--; + LOG.error("Not getting response or getting null response. Making {} more attempts.", retry); + } } - messages[next] = null; - LOG.debug("Got Response: {}", ret); - return ret; - } private void waitUntilReady() throws PacemakerConnectionException, InterruptedException { diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java index bd5b137d1fb..539e0a8bf8a 100644 --- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java +++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientPool.java @@ -54,17 +54,25 @@ public PacemakerClientPool(Map config) { } } - public HBMessage send(HBMessage m) throws InterruptedException { + public HBMessage send(HBMessage m) throws PacemakerConnectionException, InterruptedException { + try { return getWriteClient().send(m); + } catch (PacemakerConnectionException e) { + rotateClients(); + throw e; + } } public List sendAll(HBMessage m) throws PacemakerConnectionException, InterruptedException { List responses = new ArrayList(); LOG.debug("Using servers: {}", servers); for(String s : servers) { - HBMessage response = getClientForServer(s).send(m); - responses.add(response); - + try { + HBMessage response = getClientForServer(s).send(m); + responses.add(response); + } catch (PacemakerConnectionException e) { + LOG.warn("Failed to connect to the pacemaker server {}", s); + } } if(responses.size() == 0) { throw new PacemakerConnectionException("Failed to connect to any Pacemaker."); @@ -79,6 +87,17 @@ public void close() { } } + private void rotateClients() { + PacemakerClient c = getWriteClient(); + String server = servers.peek(); + // Servers should be rotated **BEFORE** the old client is removed from clientForServer + // or a race with getWriteClient() could cause it to be put back in the map. + servers.add(servers.remove()); + clientForServer.remove(server); + c.shutdown(); + c.close(); + } + private PacemakerClient getWriteClient() { return getClientForServer(servers.peek()); }