From f833af240935ccd6ffa4df59d63e5ad264dfe308 Mon Sep 17 00:00:00 2001 From: liuzhaokun Date: Tue, 7 Nov 2017 17:09:00 +0800 Subject: [PATCH 1/2] [STORM-2773] If a drpcserver node in cluster is down,drpc cluster won't work if we don't modify the drpc.server configuration and restart the cluster --- .../storm/drpc/DRPCInvocationsClient.java | 26 ++++++++++++++----- .../jvm/org/apache/storm/drpc/DRPCSpout.java | 11 +++++--- .../org/apache/storm/drpc/ReturnResults.java | 2 +- .../storm/security/auth/ThriftClient.java | 12 +++++---- .../trident/drpc/ReturnResultsReducer.java | 2 +- .../org/apache/storm/utils/DRPCClient.java | 4 +-- .../org/apache/storm/utils/NimbusClient.java | 8 +++--- 7 files changed, 43 insertions(+), 22 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java index 6a5acfa847b..0b2651a03f6 100644 --- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java +++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java @@ -36,21 +36,35 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP private final AtomicReference client = new AtomicReference<>(); private String host; private int port; + private int connectRetry = 5; public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException { - super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null); + super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null, null, false); this.host = host; this.port = port; - client.set(new DistributedRPCInvocations.Client(_protocol)); + if (isConnected() != true) { + for (int i=0; i < connectRetry; i++) { + try { + this.reconnectClient(); + } catch (Exception e) { + LOG.warn("Can't connect to drpcServer "+host+",will attempt to retry."); + } + if (isConnected() == true) { + break; + } else if (i == connectRetry) { + LOG.warn("Can't connect to drpcServer "+host+" after "+connectRetry+" attempts,will ignore it."); + } + } + } } - + public String getHost() { return host; } - + public int getPort() { return port; - } + } public void reconnectClient() throws TException { if (client.get() == null) { @@ -91,7 +105,7 @@ public DRPCRequest fetchRequest(String func) throws TException, AuthorizationExc client.compareAndSet(c, null); throw e; } - } + } public void failRequest(String id) throws TException, AuthorizationException { DistributedRPCInvocations.Client c = client.get(); diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java index 8605c05634e..dca7b61b5de 100644 --- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java +++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java @@ -105,9 +105,14 @@ public Adder(String server, int port, Map conf) { @Override public Void call() throws Exception { - DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port); - synchronized (_clients) { - _clients.add(c); + try { + DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port); + + synchronized (_clients) { + _clients.add(c); + } + } catch (Exception e) { + LOG.warn("Can't connect to drpcserver "+server+" when init drpcspout,please check your cluster"); } return null; } diff --git a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java index 04ddf25511b..c86b38cf5b0 100644 --- a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java +++ b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java @@ -85,7 +85,7 @@ public void execute(Tuple input) { if(!_clients.containsKey(server)) { try { _clients.put(server, new DRPCInvocationsClient(_conf, host, port)); - } catch (TTransportException ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } } diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java index ff3227ff13a..2ad3ae85be5 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java @@ -39,14 +39,14 @@ public class ThriftClient implements AutoCloseable { protected boolean _retryForever = false; public ThriftClient(Map topoConf, ThriftConnectionType type, String host) { - this(topoConf, type, host, null, null, null); + this(topoConf, type, host, null, null, null, true); } public ThriftClient(Map topoConf, ThriftConnectionType type, String host, Integer port, Integer timeout){ - this(topoConf, type, host, port, timeout, null); + this(topoConf, type, host, port, timeout, null, true); } - public ThriftClient(Map topoConf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser) { + public ThriftClient(Map topoConf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser,Boolean toReconnect) { //create a socket with server if (host==null) { throw new IllegalArgumentException("host is not set"); @@ -70,8 +70,10 @@ public ThriftClient(Map topoConf, ThriftConnectionType type, Str _conf = topoConf; _type = type; _asUser = asUser; - if (!type.isFake()) { - reconnect(); + if (toReconnect == true) { + if (!type.isFake()) { + reconnect(); + } } } diff --git a/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java b/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java index 617a42fa661..3838196b342 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java +++ b/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java @@ -102,7 +102,7 @@ public void complete(ReturnResultsState state, TridentCollector collector) { if(!_clients.containsKey(server)) { try { _clients.put(server, new DRPCInvocationsClient(conf, host, port)); - } catch (TTransportException ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } } diff --git a/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java b/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java index ca5edc4f732..0cd9067b4aa 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java +++ b/storm-client/src/jvm/org/apache/storm/utils/DRPCClient.java @@ -106,7 +106,7 @@ public static DRPCClient getConfiguredClient(Map conf) throws TT private DRPCClient(DistributedRPC.Iface override) { super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, - "localhost", 1234, null, null); + "localhost", 1234, null, null, true); this.host = "localhost"; this.port = 1234; this.client = override; @@ -119,7 +119,7 @@ public DRPCClient(Map conf, String host, int port) throws TTrans public DRPCClient(Map conf, String host, int port, Integer timeout) throws TTransportException { super(conf, _localOverrideClient != null ? ThriftConnectionType.LOCAL_FAKE : ThriftConnectionType.DRPC, - host, port, timeout, null); + host, port, timeout, null, true); this.host = host; this.port = port; if (_localOverrideClient != null) { diff --git a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java index c6194002359..de05fc927d6 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java +++ b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java @@ -161,25 +161,25 @@ public NimbusClient(Map conf, String host, int port) throws TTra } public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException { - super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null); + super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null, true); _client = new Nimbus.Client(_protocol); _isLocal = false; } public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException { - super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser); + super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser, true); _client = new Nimbus.Client(_protocol); _isLocal = false; } public NimbusClient(Map conf, String host) throws TTransportException { - super(conf, ThriftConnectionType.NIMBUS, host, null, null, null); + super(conf, ThriftConnectionType.NIMBUS, host, null, null, null, true); _client = new Nimbus.Client(_protocol); _isLocal = false; } private NimbusClient(Nimbus.Iface client) { - super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, "localhost", null, null, null); + super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, "localhost", null, null, null, true); _client = client; _isLocal = true; } From 66489077d97feb32bac4c64b3b89b9697b1cebf9 Mon Sep 17 00:00:00 2001 From: liuzhaokun Date: Mon, 13 Nov 2017 09:55:11 +0800 Subject: [PATCH 2/2] [STORM-2773] If a drpcserver node in cluster is down,drpc cluster won't work if we don't modify the drpc.server configuration and restart the cluster --- .../storm/drpc/DRPCInvocationsClient.java | 6 ++-- .../jvm/org/apache/storm/drpc/DRPCSpout.java | 30 +++++++++---------- .../storm/security/auth/ThriftClient.java | 8 ++--- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java index 0b2651a03f6..fc2e6ed0184 100644 --- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java +++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java @@ -43,16 +43,16 @@ public DRPCInvocationsClient(Map conf, String host, int port) th this.host = host; this.port = port; if (isConnected() != true) { - for (int i=0; i < connectRetry; i++) { + for (int i = 0; i < connectRetry; i++) { try { this.reconnectClient(); } catch (Exception e) { - LOG.warn("Can't connect to drpcServer "+host+",will attempt to retry."); + LOG.warn("Can't connect to drpcServer " + host + ",will attempt to retry."); } if (isConnected() == true) { break; } else if (i == connectRetry) { - LOG.warn("Can't connect to drpcServer "+host+" after "+connectRetry+" attempts,will ignore it."); + LOG.warn("Can't connect to drpcServer " + host + " after " + connectRetry + " attempts,will ignore it."); } } } diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java index dca7b61b5de..42d2258090c 100644 --- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java +++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java @@ -55,31 +55,31 @@ public class DRPCSpout extends BaseRichSpout { static final long serialVersionUID = 2387848310969237877L; public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class); - + SpoutOutputCollector _collector; List _clients = new ArrayList<>(); transient LinkedList> _futures = null; transient ExecutorService _backround = null; final String _function; final String _local_drpc_id; - + private static class DRPCMessageId { String id; int index; - + public DRPCMessageId(String id, int index) { this.id = id; this.index = index; } } - - + + public DRPCSpout(String function) { _function = function; if (DRPCClient.isLocalOverride()) { _local_drpc_id = DRPCClient.getOverrideServiceId(); } else { - _local_drpc_id = null; + _local_drpc_id = null; } } @@ -112,7 +112,7 @@ public Void call() throws Exception { _clients.add(c); } } catch (Exception e) { - LOG.warn("Can't connect to drpcserver "+server+" when init drpcspout,please check your cluster"); + LOG.warn("Can't connect to drpcserver " + server + " when init drpcspout,please check your cluster"); } return null; } @@ -151,7 +151,7 @@ private void checkFutures() { } } } - + @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; @@ -167,19 +167,19 @@ public void open(Map conf, TopologyContext context, SpoutOutputC int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT)); List servers = (List) conf.get(Config.DRPC_SERVERS); if(servers == null || servers.isEmpty()) { - throw new RuntimeException("No DRPC servers configured for topology"); + throw new RuntimeException("No DRPC servers configured for topology"); } - + if (numTasks < servers.size()) { for (String s: servers) { _futures.add(_backround.submit(new Adder(s, port, conf))); } - } else { + } else { int i = index % servers.size(); _futures.add(_backround.submit(new Adder(servers.get(i), port, conf))); } } - + } @Override @@ -237,7 +237,7 @@ public void nextTuple() { returnInfo.put("id", req.get_request_id()); returnInfo.put("host", _local_drpc_id); returnInfo.put("port", 0); - _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), + _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 0)); } } catch (AuthorizationException aze) { @@ -257,7 +257,7 @@ public void ack(Object msgId) { public void fail(Object msgId) { DRPCMessageId did = (DRPCMessageId) msgId; DistributedRPCInvocations.Iface client; - + if (_local_drpc_id == null) { client = _clients.get(did.index); } else { @@ -288,5 +288,5 @@ public void fail(Object msgId) { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("args", "return-info")); - } + } } diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java index 2ad3ae85be5..7fde6356234 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java @@ -46,7 +46,7 @@ public ThriftClient(Map topoConf, ThriftConnectionType type, Str this(topoConf, type, host, port, timeout, null, true); } - public ThriftClient(Map topoConf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser,Boolean toReconnect) { + public ThriftClient(Map topoConf, ThriftConnectionType type, String host, Integer port, Integer timeout, String asUser, Boolean toReconnect) { //create a socket with server if (host==null) { throw new IllegalArgumentException("host is not set"); @@ -62,7 +62,7 @@ public ThriftClient(Map topoConf, ThriftConnectionType type, Str if (port<=0 && !type.isFake()) { throw new IllegalArgumentException("invalid port: "+port); - } + } _host = host; _port = port; @@ -80,7 +80,7 @@ public ThriftClient(Map topoConf, ThriftConnectionType type, Str public synchronized TTransport transport() { return _transport; } - + public synchronized void reconnect() { close(); TSocket socket = null; @@ -99,7 +99,7 @@ public synchronized void reconnect() { //TODO get this from type instead of hardcoding to Nimbus. //establish client-server transport via plugin //do retries if the connect fails - TBackoffConnect connectionRetry + TBackoffConnect connectionRetry = new TBackoffConnect( ObjectReader.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_TIMES)), ObjectReader.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),