diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 814b3fadfe1..814d0d18c53 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -136,9 +136,7 @@ public void prepare(Map stormConf, TopologyContext context, heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); heartBeatExecutorService.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); - LOG.info("Start checking heartbeat..."); - setHeartbeat(); - } + } public void execute(Tuple input) { if (_exception != null) { @@ -313,7 +311,7 @@ public void run() { LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}", currentTimeMillis, lastHeartbeat, workerTimeoutMills); - if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { + if (lastHeartbeat > 0 && currentTimeMillis - lastHeartbeat > workerTimeoutMills) { bolt.die(new RuntimeException("subprocess heartbeat timeout")); } @@ -325,6 +323,9 @@ public void run() { private class BoltReaderRunnable implements Runnable { public void run() { + + LOG.info("Start checking heartbeat..."); + setHeartbeat(); while (_running) { try { ShellMsg shellMsg = _process.readShellMsg(); @@ -336,8 +337,10 @@ public void run() { if (command.equals("sync")) { setHeartbeat(); } else if(command.equals("ack")) { + setHeartbeat(); handleAck(shellMsg.getId()); } else if (command.equals("fail")) { + setHeartbeat(); handleFail(shellMsg.getId()); } else if (command.equals("error")) { handleError(shellMsg.getMsg());