From 3be366ff17183fae588e3d10c1d28a9f9d1542c9 Mon Sep 17 00:00:00 2001 From: Emil Wabra Date: Tue, 9 Feb 2016 13:43:12 +0100 Subject: [PATCH 1/5] Send activate and deactivate command to shellprocess --- .../jvm/backtype/storm/spout/ShellSpout.java | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index bfdfe67be1b..3d23cd7a0c4 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -97,30 +97,15 @@ public void close() { } public void nextTuple() { - if (_spoutMsg == null) { - _spoutMsg = new SpoutMsg(); - } - _spoutMsg.setCommand("next"); - _spoutMsg.setId(""); - querySubprocess(); + this.sendSimpleSyncCommand("next", ""); } public void ack(Object msgId) { - if (_spoutMsg == null) { - _spoutMsg = new SpoutMsg(); - } - _spoutMsg.setCommand("ack"); - _spoutMsg.setId(msgId); - querySubprocess(); + this.sendSimpleSyncCommand("ack", msgId); } public void fail(Object msgId) { - if (_spoutMsg == null) { - _spoutMsg = new SpoutMsg(); - } - _spoutMsg.setCommand("fail"); - _spoutMsg.setId(msgId); - querySubprocess(); + this.sendSimpleSyncCommand("fail", msgId); } private void handleMetrics(ShellMsg shellMsg) { @@ -232,10 +217,21 @@ public void activate() { // prevent timer to check heartbeat based on last thing before activate setHeartbeat(); heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); + this.sendSimpleSyncCommand("activate", ""); + } + + private void sendSimpleSyncCommand(String command, Object msgId) { + if (_spoutMsg == null) { + _spoutMsg = new SpoutMsg(); + } + _spoutMsg.setCommand(command); + _spoutMsg.setId(msgId); + querySubprocess(); } @Override public void deactivate() { + this.sendSimpleSyncCommand("deactivate", ""); heartBeatExecutorService.shutdownNow(); } From bd122576ca4f102fac896316015ad21d9a502f36 Mon Sep 17 00:00:00 2001 From: Emil Wabra Date: Thu, 11 Feb 2016 09:36:53 +0100 Subject: [PATCH 2/5] Rename method --- .../src/jvm/backtype/storm/spout/ShellSpout.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index 3d23cd7a0c4..e739a4f54aa 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -97,15 +97,15 @@ public void close() { } public void nextTuple() { - this.sendSimpleSyncCommand("next", ""); + this.sendSyncCommand("next", ""); } public void ack(Object msgId) { - this.sendSimpleSyncCommand("ack", msgId); + this.sendSyncCommand("ack", msgId); } public void fail(Object msgId) { - this.sendSimpleSyncCommand("fail", msgId); + this.sendSyncCommand("fail", msgId); } private void handleMetrics(ShellMsg shellMsg) { @@ -217,10 +217,10 @@ public void activate() { // prevent timer to check heartbeat based on last thing before activate setHeartbeat(); heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); - this.sendSimpleSyncCommand("activate", ""); + this.sendSyncCommand("activate", ""); } - private void sendSimpleSyncCommand(String command, Object msgId) { + private void sendSyncCommand(String command, Object msgId) { if (_spoutMsg == null) { _spoutMsg = new SpoutMsg(); } @@ -231,7 +231,7 @@ private void sendSimpleSyncCommand(String command, Object msgId) { @Override public void deactivate() { - this.sendSimpleSyncCommand("deactivate", ""); + this.sendSyncCommand("deactivate", ""); heartBeatExecutorService.shutdownNow(); } From 74c19e7be2e911ba1785204532193f5d639917f3 Mon Sep 17 00:00:00 2001 From: Emil Wabra Date: Thu, 11 Feb 2016 09:45:24 +0100 Subject: [PATCH 3/5] Merged from upstream --- .../org/apache/storm/spout/ShellSpout.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java b/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java index e24c76ca4bf..f8c305d7c17 100644 --- a/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java @@ -97,28 +97,22 @@ public void close() { } public void nextTuple() { - if (_spoutMsg == null) { - _spoutMsg = new SpoutMsg(); - } - _spoutMsg.setCommand("next"); - _spoutMsg.setId(""); - querySubprocess(); + this.sendSyncCommand("next", ""); } public void ack(Object msgId) { - if (_spoutMsg == null) { - _spoutMsg = new SpoutMsg(); - } - _spoutMsg.setCommand("ack"); - _spoutMsg.setId(msgId); - querySubprocess(); + this.sendSyncCommand("ack", msgId); } public void fail(Object msgId) { + this.sendSyncCommand("fail", msgId); + } + + private void sendSyncCommand(String command, Object msgId) { if (_spoutMsg == null) { _spoutMsg = new SpoutMsg(); } - _spoutMsg.setCommand("fail"); + _spoutMsg.setCommand(command); _spoutMsg.setId(msgId); querySubprocess(); } @@ -232,10 +226,12 @@ public void activate() { // prevent timer to check heartbeat based on last thing before activate setHeartbeat(); heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); + this.sendSyncCommand("activate", ""); } @Override public void deactivate() { + this.sendSyncCommand("deactivate", ""); heartBeatExecutorService.shutdownNow(); } From 330b3b6c1cda4c9eec9f7be6c52568c3660fba17 Mon Sep 17 00:00:00 2001 From: Emil Wabra Date: Sun, 15 Jan 2017 20:48:32 +0100 Subject: [PATCH 4/5] Added Documentation for "activate" and "deactivate" multilang calls --- docs/Multilang-protocol.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/Multilang-protocol.md b/docs/Multilang-protocol.md index ea010f2d827..45ad2bc30ca 100644 --- a/docs/Multilang-protocol.md +++ b/docs/Multilang-protocol.md @@ -140,7 +140,7 @@ What happens next depends on the type of component: Shell spouts are synchronous. The rest happens in a while(true) loop: -* STDIN: Either a next, ack, or fail command. +* STDIN: Either a next, ack, activate, deactivate or fail command. "next" is the equivalent of ISpout's `nextTuple`. It looks like: @@ -154,6 +154,16 @@ Shell spouts are synchronous. The rest happens in a while(true) loop: {"command": "ack", "id": "1231231"} ``` +"activate" is the equivalent of ISpout's `activate`: +``` +{"command": "activate"} +``` + +"deactivate" is the equivalent of ISpout's `deactivate`: +``` +{"command": "deactivate"} +``` + "fail" looks like: ``` From d406193cd353542206bcbd55b8448b98a57d5402 Mon Sep 17 00:00:00 2001 From: Emil Wabra Date: Sun, 15 Jan 2017 21:09:18 +0100 Subject: [PATCH 5/5] Fixed merge issue --- storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java b/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java index 41b85184b0d..a5ec72bacd2 100644 --- a/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java @@ -140,8 +140,8 @@ private void sendSyncCommand(String command, Object msgId) { if (_spoutMsg == null) { _spoutMsg = new SpoutMsg(); } - _spoutMsg.setCommand("next"); - _spoutMsg.setId(""); + _spoutMsg.setCommand(command); + _spoutMsg.setId(msgId); querySubprocess(); }