Skip to content

Commit

Permalink
Merge remote-tracking branch 'xuming/issue205'
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed May 9, 2012
2 parents 695f977 + 0f5740f commit 0c7cf73
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
26 changes: 22 additions & 4 deletions src/clj/backtype/storm/daemon/common.clj
Expand Up @@ -4,6 +4,7 @@
InvalidTopologyException GlobalStreamId])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm Constants])
(:require [clojure.set :as set])
(:require [backtype.storm.daemon.acker :as acker])
(:require [backtype.storm.thrift :as thrift])
)
Expand Down Expand Up @@ -117,16 +118,33 @@
(throw (InvalidTopologyException. "May not declare inputs for a spout"))
)))

(defn validate-structure! [^StormTopology topology]
;; TODO: validate that all subscriptions are to valid component/streams
)

(defn all-components [^StormTopology topology]
(apply merge {}
(for [f thrift/STORM-TOPOLOGY-FIELDS]
(.getFieldValue topology f)
)))

(defn validate-structure! [^StormTopology topology]
;; validate all the component subscribe from component+stream which actually exists in the topology
;; and if it is a fields grouping, validate the corresponding field exists
(let [all-components (all-components topology)]
(doseq [[id comp] all-components
:let [inputs (.. comp get_common get_inputs)]]
(doseq [[global-stream-id grouping] inputs
:let [source-component-id (.get_componentId global-stream-id)
source-stream-id (.get_streamId global-stream-id)]]
(if-not (contains? all-components source-component-id)
(throw (InvalidTopologyException. (str "Component: [" id "] subscribe from non-exists component [" source-component-id "]")))
(let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)]
(if-not (contains? source-streams source-stream-id)
(throw (InvalidTopologyException. (str "Component: [" id "] subscribe from non-exists stream: [" source-stream-id "] of component [" source-component-id "]")))
(if (= :fields (thrift/grouping-type grouping))
(let [grouping-fields (set (.get_fields grouping))
source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set)
diff-fields (set/difference grouping-fields source-stream-fields)]
(when-not (empty? diff-fields)
(throw (InvalidTopologyException. (str "Component: [" id "] subscribe from stream: [" source-stream-id "] of component [" source-component-id "] with non-exists fields: " diff-fields)))))))))))))

(defn acker-inputs [^StormTopology topology]
(let [bolt-ids (.. topology get_bolts keySet)
spout-ids (.. topology get_spouts keySet)
Expand Down
41 changes: 41 additions & 0 deletions test/clj/backtype/storm/integration_test.clj
@@ -1,6 +1,7 @@
(ns backtype.storm.integration-test
(:use [clojure test])
(:import [backtype.storm.topology TopologyBuilder])
(:import [backtype.storm.generated InvalidTopologyException])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
Expand Down Expand Up @@ -99,6 +100,46 @@
(read-tuples results "4")))
))))

(defn mk-validate-topology-1 []
(thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))

(defn mk-invalidate-topology-1 []
(thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)}))

(defn mk-invalidate-topology-2 []
(thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)}))

(defn mk-invalidate-topology-3 []
(thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
{"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)}))

(defn try-complete-wc-topology [cluster topology]
(try (do
(complete-topology cluster
topology
:mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
:storm-conf {TOPOLOGY-WORKERS 2})
false)
(catch InvalidTopologyException e true)))

(deftest test-validate-topology-structure
(with-simulated-time-local-cluster [cluster :supervisors 4]
(let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1))
any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1))
any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2))
any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))]
(is (= any-error1? false))
(is (= any-error2? true))
(is (= any-error3? true))
(is (= any-error4? true)))))

(defbolt identity-bolt ["num"]
[tuple collector]
(emit-bolt! collector (.getValues tuple) :anchor tuple)
Expand Down

0 comments on commit 0c7cf73

Please sign in to comment.