diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index d050b32bc..65978a443 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -4,6 +4,7 @@ InvalidTopologyException GlobalStreamId]) (:import [backtype.storm.utils Utils]) (:import [backtype.storm Constants]) + (:require [clojure.set :as set]) (:require [backtype.storm.daemon.acker :as acker]) (:require [backtype.storm.thrift :as thrift]) ) @@ -117,16 +118,33 @@ (throw (InvalidTopologyException. "May not declare inputs for a spout")) ))) -(defn validate-structure! [^StormTopology topology] - ;; TODO: validate that all subscriptions are to valid component/streams - ) - (defn all-components [^StormTopology topology] (apply merge {} (for [f thrift/STORM-TOPOLOGY-FIELDS] (.getFieldValue topology f) ))) +(defn validate-structure! [^StormTopology topology] + ;; validate all the component subscribe from component+stream which actually exists in the topology + ;; and if it is a fields grouping, validate the corresponding field exists + (let [all-components (all-components topology)] + (doseq [[id comp] all-components + :let [inputs (.. comp get_common get_inputs)]] + (doseq [[global-stream-id grouping] inputs + :let [source-component-id (.get_componentId global-stream-id) + source-stream-id (.get_streamId global-stream-id)]] + (if-not (contains? all-components source-component-id) + (throw (InvalidTopologyException. (str "Component: [" id "] subscribe from non-exists component [" source-component-id "]"))) + (let [source-streams (-> all-components (get source-component-id) .get_common .get_streams)] + (if-not (contains? source-streams source-stream-id) + (throw (InvalidTopologyException. (str "Component: [" id "] subscribe from non-exists stream: [" source-stream-id "] of component [" source-component-id "]"))) + (if (= :fields (thrift/grouping-type grouping)) + (let [grouping-fields (set (.get_fields grouping)) + source-stream-fields (-> source-streams (get source-stream-id) .get_output_fields set) + diff-fields (set/difference grouping-fields source-stream-fields)] + (when-not (empty? diff-fields) + (throw (InvalidTopologyException. (str "Component: [" id "] subscribe from stream: [" source-stream-id "] of component [" source-component-id "] with non-exists fields: " diff-fields))))))))))))) + (defn acker-inputs [^StormTopology topology] (let [bolt-ids (.. topology get_bolts keySet) spout-ids (.. topology get_spouts keySet) diff --git a/test/clj/backtype/storm/integration_test.clj b/test/clj/backtype/storm/integration_test.clj index c0fe6953b..e282df699 100644 --- a/test/clj/backtype/storm/integration_test.clj +++ b/test/clj/backtype/storm/integration_test.clj @@ -1,6 +1,7 @@ (ns backtype.storm.integration-test (:use [clojure test]) (:import [backtype.storm.topology TopologyBuilder]) + (:import [backtype.storm.generated InvalidTopologyException]) (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt]) (:use [backtype.storm bootstrap testing]) (:use [backtype.storm.daemon common]) @@ -99,6 +100,46 @@ (read-tuples results "4"))) )))) +(defn mk-validate-topology-1 [] + (thrift/mk-topology + {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} + {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)})) + +(defn mk-invalidate-topology-1 [] + (thrift/mk-topology + {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} + {"2" (thrift/mk-bolt-spec {"3" ["word"]} (TestWordCounter.) :parallelism-hint 4)})) + +(defn mk-invalidate-topology-2 [] + (thrift/mk-topology + {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} + {"2" (thrift/mk-bolt-spec {"1" ["non-exists-field"]} (TestWordCounter.) :parallelism-hint 4)})) + +(defn mk-invalidate-topology-3 [] + (thrift/mk-topology + {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)} + {"2" (thrift/mk-bolt-spec {["1" "non-exists-stream"] ["word"]} (TestWordCounter.) :parallelism-hint 4)})) + +(defn try-complete-wc-topology [cluster topology] + (try (do + (complete-topology cluster + topology + :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]} + :storm-conf {TOPOLOGY-WORKERS 2}) + false) + (catch InvalidTopologyException e true))) + +(deftest test-validate-topology-structure + (with-simulated-time-local-cluster [cluster :supervisors 4] + (let [any-error1? (try-complete-wc-topology cluster (mk-validate-topology-1)) + any-error2? (try-complete-wc-topology cluster (mk-invalidate-topology-1)) + any-error3? (try-complete-wc-topology cluster (mk-invalidate-topology-2)) + any-error4? (try-complete-wc-topology cluster (mk-invalidate-topology-3))] + (is (= any-error1? false)) + (is (= any-error2? true)) + (is (= any-error3? true)) + (is (= any-error4? true))))) + (defbolt identity-bolt ["num"] [tuple collector] (emit-bolt! collector (.getValues tuple) :anchor tuple)