From 421aa27b87a707923cf374837c4556dd4a992e54 Mon Sep 17 00:00:00 2001 From: Itai Frenkel Date: Fri, 14 Aug 2015 13:41:13 +0300 Subject: [PATCH 1/2] Treat ack and fail as heartbeat --- storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 814b3fadfe1..4fd1ac0a8c0 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -336,8 +336,10 @@ public void run() { if (command.equals("sync")) { setHeartbeat(); } else if(command.equals("ack")) { + setHeartbeat(); handleAck(shellMsg.getId()); } else if (command.equals("fail")) { + setHeartbeat(); handleFail(shellMsg.getId()); } else if (command.equals("error")) { handleError(shellMsg.getMsg()); From 70f5689b15ab5c6092ed2310fdde4820a4f7bc0d Mon Sep 17 00:00:00 2001 From: Itai Frenkel Date: Sun, 16 Aug 2015 13:55:02 +0300 Subject: [PATCH 2/2] Avoid race condition between heartbeat and reader thread when bolt starts --- storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 4fd1ac0a8c0..814d0d18c53 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -136,9 +136,7 @@ public void prepare(Map stormConf, TopologyContext context, heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); heartBeatExecutorService.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); - LOG.info("Start checking heartbeat..."); - setHeartbeat(); - } + } public void execute(Tuple input) { if (_exception != null) { @@ -313,7 +311,7 @@ public void run() { LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}", currentTimeMillis, lastHeartbeat, workerTimeoutMills); - if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { + if (lastHeartbeat > 0 && currentTimeMillis - lastHeartbeat > workerTimeoutMills) { bolt.die(new RuntimeException("subprocess heartbeat timeout")); } @@ -325,6 +323,9 @@ public void run() { private class BoltReaderRunnable implements Runnable { public void run() { + + LOG.info("Start checking heartbeat..."); + setHeartbeat(); while (_running) { try { ShellMsg shellMsg = _process.readShellMsg();