Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Adding a ClockSpout to storm-contrib-core.

Comes as both an extensible abstract class, and a barebones implementation
that is good enough for most purposes.
  • Loading branch information...
commit 798c36f721c7b8b02537df00a403300da9cf04fe 1 parent 8b6b593
@apetresc apetresc authored
View
7 storm-contrib-core/pom.xml
@@ -12,6 +12,13 @@
<name>storm-contrib-core</name>
<description>Core dependencies and components for storm-contrib</description>
+ <dependencies>
+ <dependency>
+ <groupId>storm</groupId>
+ <artifactId>storm</artifactId>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<plugin>
View
94 storm-contrib-core/src/main/java/storm/contrib/core/ClockSpout.java
@@ -0,0 +1,94 @@
+package storm.contrib.core;
+
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.utils.Utils;
+
+/**
+ * A generic template for a Spout that emits a tuple at a regular interval (in
+ * terms of real time).
+ *
+ * Due to the time-sensitive nature of this task, there is no point in making
+ * these ticks reliable.
+ *
+ * @author Adrian Petrescu <apetresc@gmail.com>
+ *
+ */
+public abstract class ClockSpout implements IRichSpout {
+ private SpoutOutputCollector collector;
+
+ protected final String streamId;
+ private int i;
+
+ /**
+ * @param streamId The stream on which to emit
+ */
+ protected ClockSpout(String streamId) {
+ this.streamId = streamId;
+ }
+
+ @Override
+ public void open(
+ @SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
+
+ this.collector = collector;
+ i = 0;
+ }
+
+ @Override
+ public void close() { }
+
+ @Override
+ public void nextTuple() {
+ collector.emit(streamId, getTupleForTick(i));
+ Utils.sleep(getDelayForTick(i));
+
+ i++;
+ }
+
+ /**
+ * Returns the data to be included in the {@code i}-th tick. Subclasses
+ * should ensure that they are also overriding
+ * {@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields}
+ * to match the structure being returned here.
+ *
+ * @param i the number of ticks that have already been emitted by this
+ * spout
+ * @return the data to be included in the {@code i}-th tick
+ */
+ public abstract List<Object> getTupleForTick(int i);
+
+ /**
+ * Returns the delay between the {@code i} and {@code i+1}-th ticks, in
+ * milliseconds.
+ *
+ * @param i the number of ticks that have already been emitted by this
+ * spout
+ * @return the delay between the {@code i} and {@code i+1}-th ticks
+ */
+ public abstract long getDelayForTick(int i);
+
+ /**
+ * Marked final to emphasize to subclasses that there is no point
+ * in implementing this (emitted tuples are not anchored).
+ */
+ @Override
+ public final void ack(Object msgId) { }
+
+ /**
+ * Marked final to emphasize to subclasses that there is no point
+ * in implementing this (emitted tuples are not anchored).
+ */
+ @Override
+ public final void fail(Object msgId) { }
+
+ @Override
+ public final boolean isDistributed() {
+ return false;
+ }
+
+}
View
46 storm-contrib-core/src/main/java/storm/contrib/core/SimpleClockSpout.java
@@ -0,0 +1,46 @@
+package storm.contrib.core;
+
+import java.util.List;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+/**
+ * A simple implementation of {@link ClockSpout} which emits a barebones tick
+ * tuple at a fixed rate.
+ *
+ * Every {@code delay} milliseconds, this spout will emit a tuple containing a
+ * single field, {@code tick}, specifying how many tuples have already been
+ * emitted.
+ *
+ * @author Adrian Petrescu <apetresc@gmail.com>
+ *
+ */
+public class SimpleClockSpout extends ClockSpout {
+ private final int delay;
+
+ /**
+ * @param streamId The stream on which to emit
+ * @param delay The fixed amount of time (in milliseconds) between ticks
+ */
+ public SimpleClockSpout(String streamId, int delay) {
+ super(streamId);
+ this.delay = delay;
+ }
+
+ @Override
+ public List<Object> getTupleForTick(int i) {
+ return Utils.tuple(i);
+ }
+
+ @Override
+ public long getDelayForTick(int i) {
+ return delay;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(streamId, new Fields("tick"));
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.