Skip to content
Browse files

initial statespout commit

  • Loading branch information...
1 parent 9d91adb commit b0a1b12c99dac3e92a74e3032f9f62c89b577517 @nathanmarz nathanmarz committed Sep 17, 2011
View
1 TODO
@@ -5,6 +5,7 @@ Use cases:
#################
+
* Repackage jzmq and zmq as a leiningen "native dep"
- this might be good, since the native dep can package builds for all different systems/os's?
View
4 conf/defaults.yaml
@@ -59,6 +59,6 @@ topology.message.timeout.secs: 30
topology.skip.missing.serializations: false
topology.max.task.parallelism: null
topology.max.spout.pending: null
-topology.state.synchronization.timeout.secs: 60
+topology.state.sync.timeout.secs: 60
+topology.state.sync.max.tries: 3
topology.stats.sample.rate: 0.05
-
View
7 src/clj/backtype/storm/bootstrap.clj
@@ -8,12 +8,15 @@
TimeCacheMap$ExpiredCallback BufferFileInputStream]))
(import (quote [backtype.storm.serialization TupleSerializer TupleDeserializer]))
(import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector ShellSpout]))
- (import (quote [backtype.storm.tuple Tuple Fields MessageId]))
+ (import (quote [backtype.storm.tuple Tuple Fields MessageId TupleImpl]))
(import (quote [backtype.storm.task IBolt IOutputCollector
OutputCollector OutputCollectorImpl IInternalOutputCollector
TopologyContext ShellBolt
- CoordinatedBolt CoordinatedBolt$SourceArgs KeyedFairBolt]))
+ CoordinatedBolt CoordinatedBolt$SourceArgs KeyedFairBolt
+ ComponentType]))
(import (quote [backtype.storm.daemon Shutdownable]))
+ (import (quote [backtype.storm.state IStateSpout ISubscribedState ISynchronizeOutputCollector IStateSpoutOutputCollector
+ StateSpoutOutputCollector SynchronizeOutputCollector]))
(use (quote [backtype.storm config util log clojure]))
(use (quote [clojure.contrib.seq :only [find-first]]))
(require (quote [backtype.storm [thrift :as thrift] [cluster :as cluster]
View
11 src/clj/backtype/storm/daemon/common.clj
@@ -1,13 +1,24 @@
(ns backtype.storm.daemon.common
(:use [clojure.contrib.seq-utils :only [find-first]])
(:use [backtype.storm log config util])
+ (:import [backtype.storm Constants])
)
(def ACKER-COMPONENT-ID -1)
+
(def ACKER-INIT-STREAM-ID -1)
(def ACKER-ACK-STREAM-ID -2)
(def ACKER-FAIL-STREAM-ID -3)
+(def SYNC-REQUEST-STREAM-ID -4)
+
+(def SYNC-SYNC-SUBSTREAM -1)
+(def SYNC-RESYNC-SUBSTREAM -2)
+(def SYNC-SYNC-FINISH-SUBSTREAM -3)
+(def SYNC-ADD-SUBSTREAM -4)
+(def SYNC-UPDATE-SUBSTREAM -5)
+(def SYNC-REMOVE-SUBSTREAM -6)
+(def FAILURE-SUBSTREAM Constants/FAILURE_SUBSTREAM)
(defn system-component? [id]
(< id 0))
View
3 src/clj/backtype/storm/daemon/nimbus.clj
@@ -370,6 +370,9 @@
(InvalidTopologyException.
"All component ids must be positive")))
;; TODO: validate that every declared stream is positive
+ ;; TODO: validate that everything subscribes to a valid, declared stream
+ ;; TODO: check that failure stream subscriptions are to a spout stream
+ ;; TODO: check that field groupings are valid
))
(defn file-cache-map [conf]
View
235 src/clj/backtype/storm/daemon/task.clj
@@ -111,38 +111,95 @@
obj
)))
-
(defn outbound-components
- "Returns map of stream id to component id to grouper"
+ "Returns map of stream id to component id to grouper, including all implicit streams"
[topology-context]
(let [output-groupings (clojurify-structure (.getThisTargets topology-context))
- acker-task-amt (count (.getComponentTasks topology-context ACKER-COMPONENT-ID))]
+ acker-task-amt (count (.getComponentTasks topology-context ACKER-COMPONENT-ID))
+ state-spout-sources (filter #(.isStateSpout topology-context %)
+ (.getThisSourceComponents topology-context))
+ ]
(merge
+
+ ;; streams to send acknowledgement information to acker components
{
- ACKER-INIT-STREAM-ID
+ [ACKER-INIT-STREAM-ID]
{ACKER-COMPONENT-ID (mk-fields-grouper (Fields. ["id" "spout-task"])
(Fields. ["id"])
acker-task-amt)}
- ACKER-ACK-STREAM-ID
+ [ACKER-ACK-STREAM-ID]
{ACKER-COMPONENT-ID (mk-fields-grouper (Fields. ["id" "ack-val"])
(Fields. ["id"])
acker-task-amt)}
- ACKER-FAIL-STREAM-ID
+ [ACKER-FAIL-STREAM-ID]
{ACKER-COMPONENT-ID (mk-fields-grouper (Fields. ["id"]) ;; TODO: add failure msg here later...
(Fields. ["id"])
acker-task-amt)}
+ }
+
+ ;; stream for tasks to reqest syncronizations from state spouts
+ {
+ [SYNC-REQUEST-STREAM-ID]
+ (into {}
+ (for [cid state-spout-sources]
+ [cid :direct]
+ ))
}
- (into {}
- (for [[stream-id component->grouping] output-groupings
- :let [out-fields (.getThisOutputFields topology-context stream-id)]]
- [stream-id
- (into {}
- (for [[component tgrouping] component->grouping]
- [component (mk-grouper out-fields
- tgrouping
- (count (.getComponentTasks topology-context component))
- )]
- ))])))
+
+ ;; declared streams
+ (into {}
+ (for [[stream-id component->grouping] output-groupings]
+ (apply
+ merge-with merge
+ (for [[component tgrouping] component->grouping]
+ (let [out-fields (.getThisOutputFields topology-context stream-id)
+ num-out-tasks (count (.getComponentTasks topology-context component))]
+ (if (.isThisStateSpout topology-context)
+ {
+ ;; sync id is metadata
+ [stream-id SYNC-SYNC-SUBSTREAM]
+ {component (mk-grouper out-fields
+ tgrouping
+ num-out-tasks
+ )}
+
+ ;; sync id is metadata
+ [stream-id SYNC-RESYNC-SUBSTREAM]
+ {component :direct}
+
+ [stream-id SYNC-SYNC-FINISH-SUBSTREAM]
+ {component :direct}
+
+ ;; hash is metadata
+ [stream-id SYNC-ADD-SUBSTREAM]
+ {component (mk-grouper out-fields
+ tgrouping
+ num-out-tasks
+ )}
+
+ ;; hash is metadata
+ [stream-id SYNC-REMOVE-SUBSTREAM]
+ {component :direct}
+
+ }
+
+
+ {[stream-id]
+ {component (mk-grouper out-fields
+ tgrouping
+ num-out-tasks
+ )}
+
+ ;; failure information is added as metadata
+ ;; TODO: probably shouldn't declare this for bolts
+ [stream-id FAILURE-SUBSTREAM]
+ {component (mk-grouper out-fields
+ tgrouping
+ num-out-tasks
+ )}
+ }
+ )))))
+ ))
))
@@ -163,10 +220,10 @@
^List generated-ids send-fn]
(let [ack-val (bit-xor-vals generated-ids)]
(doseq [[anchor id] (.. input-tuple getMessageId getAnchorsToIds)]
- (send-fn (Tuple. topology-context
- [anchor (bit-xor ack-val id)]
- (.getThisTaskId topology-context)
- ACKER-ACK-STREAM-ID))
+ (send-fn (TupleImpl. topology-context
+ [anchor (bit-xor ack-val id)]
+ (.getThisTaskId topology-context)
+ [ACKER-ACK-STREAM-ID]))
)))
(defn mk-task [conf storm-conf topology-context storm-id zmq-context cluster-state storm-active-atom transfer-fn]
@@ -200,7 +257,9 @@
stream->component->grouper (outbound-components topology-context)
component->tasks (reverse-map task-info)
;; important it binds to virtual port before function returns
- ^ZMQ$Socket puller (-> zmq-context (mq/socket mq/pull) (mqvp/virtual-bind task-id))
+ ^ZMQ$Socket puller (-> zmq-context
+ (mq/socket mq/pull)
+ (mqvp/virtual-bind task-id))
;; TODO: consider DRYing things up and moving stats / tuple -> multiple components code here
task-transfer-fn (fn [task ^Tuple tuple]
@@ -210,11 +269,11 @@
emit-sampler (mk-stats-sampler storm-conf)
send-fn (fn this
- ([^Integer out-task-id ^Tuple tuple]
+ ([^Integer out-task-id ^TupleImpl tuple]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Emitting direct: " out-task-id "; " task-readable-name " " tuple))
(let [target-component (.getComponentId topology-context out-task-id)
- component->grouping (stream->component->grouper (.getSourceStreamId tuple))
+ component->grouping (stream->component->grouper (.getFullStreamId tuple))
grouping (get component->grouping target-component)
out-task-id (if (or
;; This makes streams to/from system
@@ -233,10 +292,10 @@
)
[out-task-id]
))
- ([^Tuple tuple]
+ ([^TupleImpl tuple]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Emitting: " task-readable-name " " tuple))
- (let [stream (.getSourceStreamId tuple)
+ (let [stream (.getFullStreamId tuple)
;; TODO: this doesn't seem to be very fast
;; and seems to be the current bottleneck
out-tasks (mapcat
@@ -339,11 +398,11 @@
tuple-id (if message-id
(MessageId/makeRootId gen-id)
(MessageId/makeUnanchored))
- tuple (Tuple. topology-context
- values
- task-id
- out-stream-id
- tuple-id)
+ tuple (TupleImpl. topology-context
+ values
+ task-id
+ [out-stream-id]
+ tuple-id)
out-tasks (if out-task-id
(send-fn out-task-id tuple)
(send-fn tuple))]
@@ -353,10 +412,10 @@
(.put pending gen-id [message-id
tuple
(if (sampler) (System/currentTimeMillis))])
- (send-fn (Tuple. topology-context
- [gen-id task-id]
- task-id
- ACKER-INIT-STREAM-ID))
+ (send-fn (TupleImpl. topology-context
+ [gen-id task-id]
+ task-id
+ [ACKER-INIT-STREAM-ID]))
))
out-tasks
))
@@ -438,10 +497,10 @@
)))
(^void fail [this ^Tuple input-tuple]
(doseq [anchor (.. input-tuple getMessageId getAnchors)]
- (send-fn (Tuple. topology-context
- [anchor]
- task-id
- ACKER-FAIL-STREAM-ID))
+ (send-fn (TupleImpl. topology-context
+ [anchor]
+ task-id
+ [ACKER-FAIL-STREAM-ID]))
)
(let [delta (tuple-time-delta! tuple-start-times input-tuple)]
(when delta
@@ -459,9 +518,8 @@
(OutputCollectorImpl. topology-context output-collector))
;; TODO: can get any SubscribedState objects out of the context now
[(fn []
- ;; synchronization needs to be done with a key provided by this bolt, otherwise:
- ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
+ ;; can make whether or not to wait a configuration parameter
;; buffer other tuples until fully synchronized, then process all of those tuples
;; then go into normal loop
;; spill to disk?
@@ -475,7 +533,8 @@
(let [tuple (.deserialize deserializer ser)]
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
;; TODO: how to handle incremental updates as well as synchronizations at same time
- ;; TODO: need to version tuples somehow
+ ;; if receive incremental update during sync with a state spout, go into sync mode for that spout
+ ;; once out of sync, need to drop messages from that state spout
(log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context))
(when (sampler)
(.put tuple-start-times tuple (System/currentTimeMillis)))
@@ -484,6 +543,98 @@
))))]
))
+(defstruct StateSpoutState :id->tuple :task->ids)
+
+(defn remove-state-tuple [state id]
+ )
+
+(defn add-state-tuple [tasks state id ^Tuple tuple]
+ )
+
+(defn mk-state-tuple [^List values]
+ ;; chose a unique id
+ )
+
+(defn do-synchronization [^IStateSpout state-spout storm-conf send-fn ^TopologyContext topology-context]
+ (loop [i 0]
+ (when (>= i (storm-conf TOPOLOGY-STATE-SYNC-MAX-TRIES))
+ (throw (RuntimeException. "Exceeded maximum number of attempts at syncing state spout")))
+ (let [
+ resync? (atom false)
+ output-collector (SynchronizeOutputCollector.
+ (reify ISynchronizeOutputCollector
+ (^void add [this ^int stream ^List values]
+ (let [tuple (TupleImpl. topology-context
+ values
+ (.getThisTaskId topology-context)
+ [stream SYNC-SYNC-SUBSTREAM]
+ (MessageId/makeRootId (MessageId/generateId)))]
+ ;; TODO: finish
+ ;; TODO: update sync map
+ ;; (remember id -> tuple values)
+ ))
+ (^void resynchronize [this]
+ (reset! resync? true))
+ ))]
+ (.synchronize state-spout output-collector)
+ (if @resync?
+ (recur (inc i))
+ (do
+ ;; TODO: select a unique id for the syncing
+ ;; send all the tuples to the sync stream
+ ;; send count to the comm stream
+ ;; update internal mapping of tasks to tuples
+ ))
+ )))
+
+(defmethod mk-executors IStateSpout [^IStateSpout state-spout storm-conf ^ZMQ$Socket puller send-fn storm-active-atom ^TopologyContext topology-context task-stats]
+ (let [deserializer (TupleDeserializer. storm-conf topology-context)
+ event-queue (ConcurrentLinkedQueue.)
+ resync? (atom false)
+ output-collector (StateSpoutOutputCollector.
+ (reify IStateSpoutOutputCollector
+ (^void add [this ^int stream ^List tuple]
+ ;; TODO: finish
+ ;; if have seen id before, reuse the same tuple id, otherwise generate root id
+ ;; TODO: update internal state. send tuple with hash and update map for tasks it was sent to. include the hash
+ )
+ (^void remove [this ^int stream ^List tuple]
+ ;; TODO: finish
+ ;; if have seen id before, reuse the same tuple id, otherwise generate root id
+ ;; TODO: update internal state. send tuple with hash and update map for tasks it was sent to. include the hash
+ )
+ (^void resynchronize [this]
+ (reset! resync? true))
+ ))
+ ]
+ [(fn []
+ (let [^bytes ser (mq/recv puller)]
+ (when-not (empty? ser) ; skip empty messages (used during shutdown)
+ (let [tuple (.deserialize deserializer ser)
+ task-id (.getSourceTask tuple)]
+ (.add event-queue (fn []
+ ;; TODO: synchronize to the task
+ ;; TODO: read internal state and re-emit it to the task
+ ))
+ ))))
+ (fn []
+ (loop []
+ (let [event (.poll event-queue)]
+ (when event
+ (event)
+ (recur)
+ )))
+ ;; TODO: this needs to be non-blocking -- alternative design? lock around the state itself?
+ (.nextTuple state-spout output-collector)
+ (loop []
+ (when @resync?
+ (reset! resync? false)
+ (.synchronize state-spout output-collector)
+ (recur)
+ ))
+ )
+ ]
+ ))
(defmethod close-component ISpout [spout]
(.close spout))
View
2 src/clj/backtype/storm/testing.clj
@@ -403,7 +403,7 @@
`(with-var-roots [task/outbound-components (let [old# task/outbound-components]
(fn [& args#]
(merge (apply old# args#)
- {TrackerAggregator/TRACK_STREAM
+ {[TrackerAggregator/TRACK_STREAM]
{TRACKER-BOLT-ID (fn [& args#] 0)}}
)))
task/mk-acker-bolt (let [old# task/mk-acker-bolt]
View
5 src/jvm/backtype/storm/Config.java
@@ -262,13 +262,12 @@
* typically used in testing to limit the number of threads spawned in local mode.
*/
public static String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
-
-
/**
* The maximum amount of time a component gives a source of state to synchronize before it requests
* synchronization again.
*/
- public static String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
+ public static String TOPOLOGY_STATE_SYNC_TIMEOUT_SECS="topology.state.sync.timeout.secs";
+ public static String TOPOLOGY_STATE_SYNC_MAX_TRIES="topology.state.sync.max.tries";
/**
* The percentage of tuples to sample to produce stats for a task.
View
1 src/jvm/backtype/storm/Constants.java
@@ -3,4 +3,5 @@
public class Constants {
public static final int COORDINATED_STREAM_ID = 100;
+ public static final int FAILURE_SUBSTREAM = -1;
}
View
107 src/jvm/backtype/storm/generated/GlobalStreamId.java
@@ -31,14 +31,21 @@
private static final TField COMPONENT_ID_FIELD_DESC = new TField("componentId", TType.I32, (short)1);
private static final TField STREAM_ID_FIELD_DESC = new TField("streamId", TType.I32, (short)2);
+ private static final TField STREAM_TYPE_FIELD_DESC = new TField("streamType", TType.I32, (short)3);
private int componentId;
private int streamId;
+ private StreamType streamType;
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements TFieldIdEnum {
COMPONENT_ID((short)1, "componentId"),
- STREAM_ID((short)2, "streamId");
+ STREAM_ID((short)2, "streamId"),
+ /**
+ *
+ * @see StreamType
+ */
+ STREAM_TYPE((short)3, "streamType");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -57,6 +64,8 @@ public static _Fields findByThriftId(int fieldId) {
return COMPONENT_ID;
case 2: // STREAM_ID
return STREAM_ID;
+ case 3: // STREAM_TYPE
+ return STREAM_TYPE;
default:
return null;
}
@@ -108,6 +117,8 @@ public String getFieldName() {
new FieldValueMetaData(TType.I32)));
tmpMap.put(_Fields.STREAM_ID, new FieldMetaData("streamId", TFieldRequirementType.REQUIRED,
new FieldValueMetaData(TType.I32)));
+ tmpMap.put(_Fields.STREAM_TYPE, new FieldMetaData("streamType", TFieldRequirementType.OPTIONAL,
+ new EnumMetaData(TType.ENUM, StreamType.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
FieldMetaData.addStructMetaDataMap(GlobalStreamId.class, metaDataMap);
}
@@ -134,6 +145,9 @@ public GlobalStreamId(GlobalStreamId other) {
__isset_bit_vector.or(other.__isset_bit_vector);
this.componentId = other.componentId;
this.streamId = other.streamId;
+ if (other.is_set_streamType()) {
+ this.streamType = other.streamType;
+ }
}
public GlobalStreamId deepCopy() {
@@ -189,6 +203,37 @@ public void set_streamId_isSet(boolean value) {
__isset_bit_vector.set(__STREAMID_ISSET_ID, value);
}
+ /**
+ *
+ * @see StreamType
+ */
+ public StreamType get_streamType() {
+ return this.streamType;
+ }
+
+ /**
+ *
+ * @see StreamType
+ */
+ public void set_streamType(StreamType streamType) {
+ this.streamType = streamType;
+ }
+
+ public void unset_streamType() {
+ this.streamType = null;
+ }
+
+ /** Returns true if field streamType is set (has been asigned a value) and false otherwise */
+ public boolean is_set_streamType() {
+ return this.streamType != null;
+ }
+
+ public void set_streamType_isSet(boolean value) {
+ if (!value) {
+ this.streamType = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case COMPONENT_ID:
@@ -207,6 +252,14 @@ public void setFieldValue(_Fields field, Object value) {
}
break;
+ case STREAM_TYPE:
+ if (value == null) {
+ unset_streamType();
+ } else {
+ set_streamType((StreamType)value);
+ }
+ break;
+
}
}
@@ -222,6 +275,9 @@ public Object getFieldValue(_Fields field) {
case STREAM_ID:
return new Integer(get_streamId());
+ case STREAM_TYPE:
+ return get_streamType();
+
}
throw new IllegalStateException();
}
@@ -237,6 +293,8 @@ public boolean isSet(_Fields field) {
return is_set_componentId();
case STREAM_ID:
return is_set_streamId();
+ case STREAM_TYPE:
+ return is_set_streamType();
}
throw new IllegalStateException();
}
@@ -276,6 +334,15 @@ public boolean equals(GlobalStreamId that) {
return false;
}
+ boolean this_present_streamType = true && this.is_set_streamType();
+ boolean that_present_streamType = true && that.is_set_streamType();
+ if (this_present_streamType || that_present_streamType) {
+ if (!(this_present_streamType && that_present_streamType))
+ return false;
+ if (!this.streamType.equals(that.streamType))
+ return false;
+ }
+
return true;
}
@@ -293,6 +360,11 @@ public int hashCode() {
if (present_streamId)
builder.append(streamId);
+ boolean present_streamType = true && (is_set_streamType());
+ builder.append(present_streamType);
+ if (present_streamType)
+ builder.append(streamType.getValue());
+
return builder.toHashCode();
}
@@ -322,6 +394,15 @@ public int compareTo(GlobalStreamId other) {
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_streamType()).compareTo(typedOther.is_set_streamType());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_streamType()) { lastComparison = TBaseHelper.compareTo(this.streamType, typedOther.streamType);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -351,6 +432,13 @@ public void read(TProtocol iprot) throws TException {
TProtocolUtil.skip(iprot, field.type);
}
break;
+ case 3: // STREAM_TYPE
+ if (field.type == TType.I32) {
+ this.streamType = StreamType.findByValue(iprot.readI32());
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
default:
TProtocolUtil.skip(iprot, field.type);
}
@@ -370,6 +458,13 @@ public void write(TProtocol oprot) throws TException {
oprot.writeFieldBegin(STREAM_ID_FIELD_DESC);
oprot.writeI32(this.streamId);
oprot.writeFieldEnd();
+ if (this.streamType != null) {
+ if (is_set_streamType()) {
+ oprot.writeFieldBegin(STREAM_TYPE_FIELD_DESC);
+ oprot.writeI32(this.streamType.getValue());
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -386,6 +481,16 @@ public String toString() {
sb.append("streamId:");
sb.append(this.streamId);
first = false;
+ if (is_set_streamType()) {
+ if (!first) sb.append(", ");
+ sb.append("streamType:");
+ if (this.streamType == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.streamType);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
View
44 src/jvm/backtype/storm/generated/StreamType.java
@@ -0,0 +1,44 @@
+/**
+ * Autogenerated by Thrift
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum StreamType implements TEnum {
+ NORMAL(1),
+ FAILURE(2);
+
+ private final int value;
+
+ private StreamType(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static StreamType findByValue(int value) {
+ switch (value) {
+ case 1:
+ return NORMAL;
+ case 2:
+ return FAILURE;
+ default:
+ return null;
+ }
+ }
+}
View
31 src/jvm/backtype/storm/serialization/TupleDeserializer.java
@@ -2,17 +2,19 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
import backtype.storm.utils.WritableUtils;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TupleDeserializer {
- Map<Integer, Map<Integer, ValuesDeserializer>> _fieldSerializers = new HashMap<Integer, Map<Integer, ValuesDeserializer>>();
+ Map<Integer, Map<List<Integer>, ValuesDeserializer>> _fieldSerializers = new HashMap<Integer, Map<List<Integer>, ValuesDeserializer>>();
+ Map<Integer, Map<List<Integer>, ValuesDeserializer>> _metaSerializers = new HashMap<Integer, Map<List<Integer>, ValuesDeserializer>>();
Map _conf;
TopologyContext _context;
@@ -21,28 +23,35 @@ public TupleDeserializer(Map conf, TopologyContext context) {
_context = context;
}
- public Tuple deserialize(byte[] ser) throws IOException {
+ public TupleImpl deserialize(byte[] ser) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(ser);
DataInputStream in = new DataInputStream(bin);
int taskId = WritableUtils.readVInt(in);
- int streamId = WritableUtils.readVInt(in);
+
+ int numStreams = WritableUtils.readVInt(in);
+ List<Integer> fullStreamId = new ArrayList<Integer>(numStreams);
+ for(int i=0; i<numStreams; i++) {
+ fullStreamId.add(WritableUtils.readVInt(in));
+ }
MessageId id = MessageId.deserialize(in);
int componentId = _context.getComponentId(taskId);
- ValuesDeserializer streamSerializers = getValuesDeserializer(_fieldSerializers, componentId, streamId);
+ ValuesDeserializer streamSerializers = getValuesDeserializer(_fieldSerializers, componentId, fullStreamId);
List<Object> values = streamSerializers.deserializeFrom(in);
- return new Tuple(_context, values, taskId, streamId, id);
+ ValuesDeserializer metaSerializers = getValuesDeserializer(_metaSerializers, componentId, fullStreamId);
+ List<Object> metadata = metaSerializers.deserializeFrom(in);
+ return new TupleImpl(_context, values, taskId, fullStreamId, metadata, id);
}
- private ValuesDeserializer getValuesDeserializer(Map<Integer, Map<Integer, ValuesDeserializer>> deserializers, int componentId, int streamId) {
- Map<Integer, ValuesDeserializer> streamToSerializers = deserializers.get(componentId);
+ private ValuesDeserializer getValuesDeserializer(Map<Integer, Map<List<Integer>, ValuesDeserializer>> deserializers, int componentId, List<Integer> fullStreamId) {
+ Map<List<Integer>, ValuesDeserializer> streamToSerializers = deserializers.get(componentId);
if(streamToSerializers==null) {
- streamToSerializers = new HashMap<Integer, ValuesDeserializer>();
+ streamToSerializers = new HashMap<List<Integer>, ValuesDeserializer>();
deserializers.put(componentId, streamToSerializers);
}
- ValuesDeserializer streamSerializers = streamToSerializers.get(streamId);
+ ValuesDeserializer streamSerializers = streamToSerializers.get(fullStreamId);
if(streamSerializers==null) {
streamSerializers = new ValuesDeserializer(_conf);
- streamToSerializers.put(streamId, streamSerializers);
+ streamToSerializers.put(fullStreamId, streamSerializers);
}
return streamSerializers;
}
View
33 src/jvm/backtype/storm/serialization/TupleSerializer.java
@@ -1,19 +1,21 @@
package backtype.storm.serialization;
-import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
import backtype.storm.utils.CRC32OutputStream;
import backtype.storm.utils.WritableUtils;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class TupleSerializer {
ByteArrayOutputStream _outputter;
DataOutputStream _dataOutputter;
- Map<Integer, Map<Integer, ValuesSerializer>> _fieldSerializers = new HashMap<Integer, Map<Integer, ValuesSerializer>>();
+ Map<Integer, Map<List<Integer>, ValuesSerializer>> _fieldSerializers = new HashMap<Integer, Map<List<Integer>, ValuesSerializer>>();
+ Map<Integer, Map<List<Integer>, ValuesSerializer>> _metaSerializers = new HashMap<Integer, Map<List<Integer>, ValuesSerializer>>();
Map _conf;
public TupleSerializer(Map conf) {
@@ -22,17 +24,26 @@ public TupleSerializer(Map conf) {
_conf = conf;
}
- public byte[] serialize(Tuple tuple) throws IOException {
+ public byte[] serialize(TupleImpl tuple) throws IOException {
_outputter.reset();
WritableUtils.writeVInt(_dataOutputter, tuple.getSourceTask());
- WritableUtils.writeVInt(_dataOutputter, tuple.getSourceStreamId());
+
+ List<Integer> fullStreamId = tuple.getFullStreamId();
+ WritableUtils.writeVInt(_dataOutputter, fullStreamId.size());
+ for(Integer id: fullStreamId) {
+ WritableUtils.writeVInt(_dataOutputter, id);
+ }
tuple.getMessageId().serialize(_dataOutputter);
+
ValuesSerializer streamSerializers = getValuesSerializer(_fieldSerializers, tuple);
- streamSerializers.serializeInto(tuple.getValues(), _dataOutputter);
+ streamSerializers.serializeInto(tuple.getTuple(), _dataOutputter);
+ ValuesSerializer metaSerializers = getValuesSerializer(_metaSerializers, tuple);
+ metaSerializers.serializeInto(tuple.getMetadata(), _dataOutputter);
+
return _outputter.toByteArray();
}
- public long crc32(Tuple tuple) {
+ public long crc32(TupleImpl tuple) {
CRC32OutputStream hasher = new CRC32OutputStream();
try {
getValuesSerializer(_fieldSerializers, tuple).serializeInto(tuple.getValues(), new DataOutputStream(hasher));
@@ -42,16 +53,16 @@ public long crc32(Tuple tuple) {
return hasher.getValue();
}
- private ValuesSerializer getValuesSerializer(Map<Integer, Map<Integer, ValuesSerializer>> serializers, Tuple tuple) {
- Map<Integer, ValuesSerializer> streamToSerializers = serializers.get(tuple.getSourceComponent());
+ private ValuesSerializer getValuesSerializer(Map<Integer, Map<List<Integer>, ValuesSerializer>> serializers, TupleImpl tuple) {
+ Map<List<Integer>, ValuesSerializer> streamToSerializers = serializers.get(tuple.getSourceComponent());
if(streamToSerializers==null) {
- streamToSerializers = new HashMap<Integer, ValuesSerializer>();
+ streamToSerializers = new HashMap<List<Integer>, ValuesSerializer>();
serializers.put(tuple.getSourceComponent(), streamToSerializers);
}
- ValuesSerializer streamSerializers = streamToSerializers.get(tuple.getSourceStreamId());
+ ValuesSerializer streamSerializers = streamToSerializers.get(tuple.getFullStreamId());
if(streamSerializers==null) {
streamSerializers = new ValuesSerializer(_conf);
- streamToSerializers.put(tuple.getSourceStreamId(), streamSerializers);
+ streamToSerializers.put(tuple.getFullStreamId(), streamSerializers);
}
return streamSerializers;
}
View
1 src/jvm/backtype/storm/spout/ISpoutOutputCollector.java
@@ -1,7 +1,6 @@
package backtype.storm.spout;
import java.util.List;
-import backtype.storm.tuple.Tuple;
public interface ISpoutOutputCollector {
/**
View
4 src/jvm/backtype/storm/state/IStateSpoutOutputCollector.java
@@ -1,5 +1,7 @@
package backtype.storm.state;
+import java.util.List;
+
public interface IStateSpoutOutputCollector extends ISynchronizeOutputCollector {
- void remove(int streamId, Object id);
+ void remove(int streamId, List<Object> tuple);
}
View
4 src/jvm/backtype/storm/state/ISubscribedState.java
@@ -3,6 +3,6 @@
import backtype.storm.tuple.Tuple;
public interface ISubscribedState {
- void set(Object id, Tuple tuple);
- void remove(Object id);
+ void add(Tuple tuple);
+ void remove(Tuple tuple);
}
View
5 src/jvm/backtype/storm/state/ISynchronizeOutputCollector.java
@@ -3,5 +3,8 @@
import java.util.List;
public interface ISynchronizeOutputCollector {
- void add(int streamId, Object id, List<Object> tuple);
+ void add(int streamId, List<Object> tuple);
+ //this will return immediately and do synchronization once the spout function exits
+ //(to avoid blowing up the stack) -- no reason for user to do anything more after calling this anyway
+ void resynchronize();
}
View
20 src/jvm/backtype/storm/state/StateSpoutOutputCollector.java
@@ -1,11 +1,25 @@
package backtype.storm.state;
+import backtype.storm.utils.Utils;
+import java.util.List;
-public class StateSpoutOutputCollector extends SynchronizeOutputCollector implements IStateSpoutOutputCollector {
+public class StateSpoutOutputCollector extends SynchronizeOutputCollector implements IStateSpoutOutputCollector {
+ private IStateSpoutOutputCollector _delegate;
+
+ public StateSpoutOutputCollector(IStateSpoutOutputCollector delegate) {
+ super(delegate);
+ _delegate = delegate;
+ }
+
+
+ public void remove(List<Object> tuple) {
+ remove(Utils.DEFAULT_STREAM_ID, tuple);
+ }
+
@Override
- public void remove(int streamId, Object id) {
- throw new UnsupportedOperationException("Not supported yet.");
+ public void remove(int streamId, List<Object> tuple) {
+ _delegate.remove(streamId, tuple);
}
}
View
21 src/jvm/backtype/storm/state/SynchronizeOutputCollector.java
@@ -1,13 +1,28 @@
package backtype.storm.state;
+import backtype.storm.utils.Utils;
import java.util.List;
public class SynchronizeOutputCollector implements ISynchronizeOutputCollector {
-
+ private ISynchronizeOutputCollector _delegate;
+
+ public SynchronizeOutputCollector(ISynchronizeOutputCollector delegate) {
+ _delegate = delegate;
+ }
+
+
+ public void add(List<Object> tuple) {
+ add(Utils.DEFAULT_STREAM_ID, tuple);
+ }
+
@Override
- public void add(int streamId, Object id, List<Object> tuple) {
- throw new UnsupportedOperationException("Not supported yet.");
+ public void add(int streamId, List<Object> tuple) {
+ _delegate.add(streamId, tuple);
}
+ @Override
+ public void resynchronize() {
+ _delegate.resynchronize();
+ }
}
View
8 src/jvm/backtype/storm/task/ComponentType.java
@@ -0,0 +1,8 @@
+package backtype.storm.task;
+
+
+public enum ComponentType {
+ BOLT,
+ SPOUT,
+ STATE_SPOUT;
+}
View
4 src/jvm/backtype/storm/task/OutputCollector.java
@@ -1,6 +1,10 @@
package backtype.storm.task;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import backtype.storm.utils.Utils;
import java.util.Arrays;
import java.util.List;
View
5 src/jvm/backtype/storm/task/OutputCollectorImpl.java
@@ -2,6 +2,7 @@
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -51,7 +52,9 @@ private Tuple anchorTuple(List<Tuple> anchors, int streamId, List<Object> tuple)
}
}
}
- return new Tuple(_context, tuple, _context.getThisTaskId(), streamId, MessageId.makeId(anchorsToIds));
+ List<Integer> fullStreamId = new ArrayList<Integer>(1);
+ fullStreamId.add(streamId);
+ return new TupleImpl(_context, tuple, _context.getThisTaskId(), fullStreamId, null, MessageId.makeId(anchorsToIds));
}
public void ack(Tuple input) {
View
53 src/jvm/backtype/storm/task/TopologyContext.java
@@ -13,6 +13,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -108,6 +109,43 @@ public TopologyContext(StormTopology topology, Map<Integer, Integer> taskToCompo
throw new NotImplementedException();
}
+ public ComponentType getComponentType(int componentId) {
+ if(_topology.get_bolts().containsKey(componentId)) {
+ return ComponentType.BOLT;
+ }
+ if(_topology.get_spouts().containsKey(componentId)) {
+ return ComponentType.SPOUT;
+ }
+ if(_topology.get_state_spouts().containsKey(componentId)) {
+ return ComponentType.STATE_SPOUT;
+ }
+ throw new IllegalArgumentException("Invalid component id: " + componentId);
+ }
+
+ public boolean isThisSpout() {
+ return isSpout(getThisComponentId());
+ }
+
+ public boolean isSpout(int componentId) {
+ return getComponentType(componentId) == ComponentType.SPOUT;
+ }
+
+ public boolean isThisBolt() {
+ return isBolt(getThisComponentId());
+ }
+
+ public boolean isBolt(int componentId) {
+ return getComponentType(componentId) == ComponentType.BOLT;
+ }
+
+ public boolean isThisStateSpout() {
+ return isStateSpout(getThisComponentId());
+ }
+
+ public boolean isStateSpout(int componentId) {
+ return getComponentType(componentId) == ComponentType.STATE_SPOUT;
+ }
+
/**
* Gets the unique id assigned to this topology. The id is the storm name with a
* unique nonce appended to it.
@@ -218,6 +256,21 @@ public Fields getComponentOutputFields(int componentId, int streamId) {
public Map<GlobalStreamId, Grouping> getThisSources() {
return getSources(getThisComponentId());
}
+
+ public Set<Integer> getThisSourceComponents() {
+ return getSourceComponents(getThisComponentId());
+ }
+
+ public Set<Integer> getSourceComponents(int componentId) {
+ Map<GlobalStreamId, Grouping> sources = getSources(componentId);
+ Set<Integer> ret = new HashSet<Integer>();
+ if(sources!=null) {
+ for(GlobalStreamId id: sources.keySet()) {
+ ret.add(id.get_componentId());
+ }
+ }
+ return ret;
+ }
/**
* Gets the declared inputs to the specified component.
View
162 src/jvm/backtype/storm/tuple/Tuple.java
@@ -5,140 +5,30 @@
import java.util.List;
import java.util.Map;
-public class Tuple {
- private List<Object> values;
- private int taskId;
- private int streamId;
- private TopologyContext context;
- private MessageId id;
-
- //needs to get taskId explicitly b/c could be in a different task than where it was created
- public Tuple(TopologyContext context, List<Object> values, int taskId, int streamId, MessageId id) {
- this.values = values;
- this.taskId = taskId;
- this.streamId = streamId;
- this.id = id;
- this.context = context;
- //TODO: should find a way to include this information here
- //TODO: should only leave out the connection info?
- //TODO: have separate methods for "user" and "system" topology?
- if(streamId>=0) {
- int componentId = context.getComponentId(taskId);
- if(componentId>=0) {
- Fields schema = context.getComponentOutputFields(componentId, streamId);
- if(values.size()!=schema.size()) {
- throw new IllegalArgumentException(
- "Tuple created with wrong number of fields. " +
- "Expected " + schema.size() + " fields but got " +
- values.size() + " fields");
- }
- }
- }
- }
-
- public Tuple(TopologyContext context, List<Object> values, int taskId, int streamId) {
- this(context, values, taskId, streamId, MessageId.makeUnanchored());
- }
-
- public Tuple copyWithNewId(long id) {
- Map<Long, Long> newIds = new HashMap<Long, Long>();
- for(Long anchor: this.id.getAnchorsToIds().keySet()) {
- newIds.put(anchor, id);
- }
- return new Tuple(this.context, this.values, this.taskId, this.streamId, MessageId.makeId(newIds));
- }
-
- public int size() {
- return values.size();
- }
-
- public Object getValue(int i) {
- return values.get(i);
- }
-
- public String getString(int i) {
- return (String) values.get(i);
- }
-
- public Integer getInteger(int i) {
- return (Integer) values.get(i);
- }
-
- public Long getLong(int i) {
- return (Long) values.get(i);
- }
-
- public Boolean getBoolean(int i) {
- return (Boolean) values.get(i);
- }
-
- public Short getShort(int i) {
- return (Short) values.get(i);
- }
-
- public Byte getByte(int i) {
- return (Byte) values.get(i);
- }
-
- public Double getDouble(int i) {
- return (Double) values.get(i);
- }
-
- public Float getFloat(int i) {
- return (Float) values.get(i);
- }
-
- public byte[] getBinary(int i) {
- return (byte[]) values.get(i);
- }
-
- @Deprecated
- public List<Object> getTuple() {
- return values;
- }
-
- public List<Object> getValues() {
- return values;
- }
-
- public Fields getFields() {
- return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
- }
-
- public List<Object> select(Fields selector) {
- return getFields().select(selector, values);
- }
-
- public int getSourceComponent() {
- return context.getComponentId(taskId);
- }
-
- public int getSourceTask() {
- return taskId;
- }
-
- public int getSourceStreamId() {
- return streamId;
- }
-
- public MessageId getMessageId() {
- return id;
- }
-
- @Override
- public String toString() {
- return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString();
- }
-
- @Override
- public boolean equals(Object other) {
- // for OutputCollector
- return this == other;
- }
-
- @Override
- public int hashCode() {
- // for OutputCollector
- return System.identityHashCode(this);
- }
+public interface Tuple {
+ List<Object> getTuple();
+ Object getValue(int i);
+ int size();
+ List<Object> getValues();
+ String getString(int i);
+ Integer getInteger(int i);
+ Long getLong(int i);
+ Boolean getBoolean(int i);
+ Short getShort(int i);
+ Byte getByte(int i);
+ Double getDouble(int i);
+ Float getFloat(int i);
+ byte[] getBinary(int i);
+ List<Object> getMetadata();
+ Object getMetadataValue(int i);
+ int metadataSize();
+ MessageId getMessageId();
+ List<Object> select(Fields selector);
+ Fields getFields();
+ int getSourceComponent();
+ int getSourceTask();
+ //returning null is the default stream id
+ int getSourceStreamId();
+ Tuple copyWithNewId(long id);
+ boolean isFromFailureStream();
}
View
184 src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -0,0 +1,184 @@
+package backtype.storm.tuple;
+
+import backtype.storm.Constants;
+import backtype.storm.task.TopologyContext;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TupleImpl implements Tuple {
+ private List<Object> values;
+ private List<Object> metadata;
+ private int taskId;
+ private List<Integer> streamId;
+ private TopologyContext context;
+ private MessageId id;
+
+
+ //needs to get taskId explicitly b/c could be in a different task than where it was created
+ public TupleImpl(TopologyContext context, List<Object> values, int taskId, List<Integer> streamId, List<Object> metadata, MessageId id) {
+ if(streamId.size()==0) {
+ throw new IllegalArgumentException("Cannot have an empty stream id");
+ }
+ this.values = values;
+ this.taskId = taskId;
+ this.streamId = streamId;
+ this.id = id;
+ this.context = context;
+
+ int majorStream = streamId.get(0);
+ if(majorStream>=0) {
+ int componentId = context.getComponentId(taskId);
+ if(componentId>=0) {
+ Fields schema = context.getComponentOutputFields(componentId, majorStream);
+ if(values.size()!=schema.size()) {
+ throw new IllegalArgumentException(
+ "Tuple created with wrong number of fields. " +
+ "Expected " + schema.size() + " fields but got " +
+ values.size() + " fields");
+ }
+ }
+ }
+// this doesn't work because of implicit streams that don't exist in context (ackers)
+// int expectedSize = context.getComponentOutputFields(getSourceComponent(), streamId).size();
+// if(expectedSize!=values.size()) {
+// throw new RuntimeException("Created tuple with invalid number of fields: " + values.toString());
+// }
+ }
+
+ public TupleImpl(TopologyContext context, List<Object> values, int taskId, List<Integer> streamId, MessageId id) {
+ this(context, values, taskId, streamId, null, id);
+ }
+
+ public TupleImpl(TopologyContext context, List<Object> values, int taskId, List<Integer> streamId) {
+ this(context, values, taskId, streamId, MessageId.makeUnanchored());
+ }
+
+ public TupleImpl(TopologyContext context, List<Object> values, int taskId, List<Integer> streamId, List<Object> metadata) {
+ this(context, values, taskId, streamId, metadata, MessageId.makeUnanchored());
+ }
+
+ public List<Object> getMetadata() {
+ return metadata;
+ }
+
+ public Object getMetadataValue(int i) {
+ return metadata.get(i);
+ }
+
+ public int metadataSize() {
+ return metadata.size();
+ }
+
+ public Tuple copyWithNewId(long id) {
+ Map<Long, Long> newIds = new HashMap<Long, Long>();
+ for(Long anchor: this.id.getAnchorsToIds().keySet()) {
+ newIds.put(anchor, id);
+ }
+ return new TupleImpl(this.context, this.values, this.taskId, this.streamId, metadata, MessageId.makeId(newIds));
+ }
+
+ public int size() {
+ return values.size();
+ }
+
+ public Object getValue(int i) {
+ return values.get(i);
+ }
+
+ @Deprecated
+ public List<Object> getTuple() {
+ return values;
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ public String getString(int i) {
+ return (String) values.get(i);
+ }
+
+ public Integer getInteger(int i) {
+ return (Integer) values.get(i);
+ }
+
+ public Long getLong(int i) {
+ return (Long) values.get(i);
+ }
+
+ public Boolean getBoolean(int i) {
+ return (Boolean) values.get(i);
+ }
+
+ public Short getShort(int i) {
+ return (Short) values.get(i);
+ }
+
+ public Byte getByte(int i) {
+ return (Byte) values.get(i);
+ }
+
+ public Double getDouble(int i) {
+ return (Double) values.get(i);
+ }
+
+ public Float getFloat(int i) {
+ return (Float) values.get(i);
+ }
+
+ public byte[] getBinary(int i) {
+ return (byte[]) values.get(i);
+ }
+
+ public Fields getFields() {
+ return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
+ }
+
+ public List<Object> select(Fields selector) {
+ return getFields().select(selector, values);
+ }
+
+ public int getSourceComponent() {
+ return context.getComponentId(taskId);
+ }
+
+ public int getSourceTask() {
+ return taskId;
+ }
+
+ public int getSourceStreamId() {
+ return streamId.get(0);
+ }
+
+ public MessageId getMessageId() {
+ return id;
+ }
+
+ public boolean isFromFailureStream() {
+ return streamId.size()==2 &&
+ context.isSpout(streamId.get(0)) &&
+ streamId.get(1).equals(Constants.FAILURE_SUBSTREAM);
+ }
+
+ public List<Integer> getFullStreamId() {
+ return streamId;
+ }
+
+ @Override
+ public int hashCode() {
+ // for OutputCollector
+ return System.identityHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ // for OutputCollector
+ return this == other;
+ }
+
+ @Override
+ public String toString() {
+ return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString();
+ }
+}
View
28 src/py/storm/ttypes.py
@@ -16,6 +16,20 @@
fastbinary = None
+class StreamType:
+ NORMAL = 1
+ FAILURE = 2
+
+ _VALUES_TO_NAMES = {
+ 1: "NORMAL",
+ 2: "FAILURE",
+ }
+
+ _NAMES_TO_VALUES = {
+ "NORMAL": 1,
+ "FAILURE": 2,
+ }
+
class NullStruct:
thrift_spec = (
@@ -682,17 +696,20 @@ class GlobalStreamId:
Attributes:
- componentId
- streamId
+ - streamType
"""
thrift_spec = (
None, # 0
(1, TType.I32, 'componentId', None, None, ), # 1
(2, TType.I32, 'streamId', None, None, ), # 2
+ (3, TType.I32, 'streamType', None, None, ), # 3
)
- def __init__(self, componentId=None, streamId=None,):
+ def __init__(self, componentId=None, streamId=None, streamType=None,):
self.componentId = componentId
self.streamId = streamId
+ self.streamType = streamType
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -713,6 +730,11 @@ def read(self, iprot):
self.streamId = iprot.readI32();
else:
iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.streamType = iprot.readI32();
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -731,6 +753,10 @@ def write(self, oprot):
oprot.writeFieldBegin('streamId', TType.I32, 2)
oprot.writeI32(self.streamId)
oprot.writeFieldEnd()
+ if self.streamType != None:
+ oprot.writeFieldBegin('streamType', TType.I32, 3)
+ oprot.writeI32(self.streamType)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
View
9 src/storm.thrift
@@ -40,10 +40,15 @@ struct SpoutSpec {
3: required bool distributed;
}
+enum StreamType {
+ NORMAL = 1,
+ FAILURE = 2
+}
+
struct GlobalStreamId {
1: required i32 componentId;
2: required i32 streamId;
- #Going to need to add an enum for the stream type (NORMAL or FAILURE)
+ 3: optional StreamType streamType;
}
struct Bolt {
@@ -52,8 +57,6 @@ struct Bolt {
3: required ComponentCommon common;
}
-// not implemented yet
-// this will eventually be the basis for subscription implementation in storm
struct StateSpoutSpec {
1: required ComponentObject state_spout_object;
2: required ComponentCommon common;

0 comments on commit b0a1b12

Please sign in to comment.
Something went wrong with that request. Please try again.