From 3f3e0042675d4fa5f1b07bdcc1840508ab2cf3ff Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 3 Feb 2015 14:22:04 -0800 Subject: [PATCH 1/2] STORM-581. Add rebalance params to Storm REST API. --- STORM-UI-REST-API.md | 42 +++++++++++++++++++ storm-core/pom.xml | 6 ++- storm-core/src/clj/backtype/storm/ui/core.clj | 38 +++++++++++------ 3 files changed, 73 insertions(+), 13 deletions(-) diff --git a/STORM-UI-REST-API.md b/STORM-UI-REST-API.md index deee13413fd..f2ac8a5bfcc 100644 --- a/STORM-UI-REST-API.md +++ b/STORM-UI-REST-API.md @@ -612,6 +612,12 @@ Activates a topology. |----------|--------|-------------| |id |String (required)| Topology Id | +Sample Response: + +```json +{"topologyOperation":"activate","topologyId":"wordcount-1-1420308665","status":"success"} +``` + ### /api/v1/topology/:id/deactivate (POST) @@ -621,6 +627,12 @@ Deactivates a topology. |----------|--------|-------------| |id |String (required)| Topology Id | +Sample Response: + +```json +{"topologyOperation":"deactivate","topologyId":"wordcount-1-1420308665","status":"success"} +``` + ### /api/v1/topology/:id/rebalance/:wait-time (POST) @@ -630,6 +642,31 @@ Rebalances a topology. |----------|--------|-------------| |id |String (required)| Topology Id | |wait-time |String (required)| Wait time before rebalance happens | +|rebalanceOptions| Json (optional) | topology rebalance options | + + +Sample rebalancOptions json: + +```json +{"rebalanceOptions" : {"numWorkers" : 2, "executors" : {"spout" :4, "count" : 10}}, "callback" : "foo"} +``` + +Examples: + +```no-highlight +curl -i -b ~/cookiejar.txt -c ~/cookiejar.txt -X POST +-H 'x-csrf-token:nRXggIDItGA/rxjPETo9ok65DM3rpQqOLoNwWXZWbGuaZZjtms5/tU+h36uQCR34z50DtFybkwh1ZB5e' +-H "Content-Type: application/json" +-d '{"rebalanceOptions": {"numWorkers": 2, "executors": { "spout" : "5", "split": 7, "count": 5 }}, "callback":"foo"}' +http://localhost:8080/api/v1/topology/wordcount-1-1420308665/rebalance/0 +``` + +Sample Response: + +```json +{"topologyOperation":"rebalance","topologyId":"wordcount-1-1420308665","status":"success"} +``` + ### /api/v1/topology/:id/kill/:wait-time (POST) @@ -645,6 +682,11 @@ Caution: Small wait times (0-5 seconds) may increase the probability of triggeri [STORM-112](https://issues.apache.org/jira/browse/STORM-112), which may result in broker Supervisor daemons. +Sample Response: + +```json +{"topologyOperation":"kill","topologyId":"wordcount-1-1420308665","status":"success"} +``` ## API errors diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 9f3ed49803b..af3e4aa105c 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -70,7 +70,11 @@ ring-anti-forgery 1.0.0 - + + ring + ring-json + 0.3.1 + org.eclipse.jetty jetty-servlet diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index b36a7a957bb..f0d3f0b86f3 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -17,6 +17,7 @@ (ns backtype.storm.ui.core (:use compojure.core) (:use ring.middleware.reload) + (:use [ring.middleware.json :only [wrap-json-params]]) (:use [hiccup core page-helpers]) (:use [backtype.storm config util log]) (:use [backtype.storm.ui helpers]) @@ -853,7 +854,13 @@ (defn topology-config [topology-id] (with-nimbus nimbus - (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id)))) + (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id)))) + +(defn topology-op-response [topology-id op] + {"topologyOperation" op, + "topologyId" topology-id, + "status" "success" + }) (defn check-include-sys? [sys?] @@ -900,7 +907,7 @@ (json-response (component-page id component (:window m) (check-include-sys? (:sys m)) user) (:callback m)))) (GET "/api/v1/token" [ & m] (json-response (format "{\"antiForgeryToken\": \"%s\"}" *anti-forgery-token*) (:callback m) :serialize-fn identity)) - (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id] + (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m] (with-nimbus nimbus (let [tplg (->> (doto (GetInfoOptions.) @@ -910,8 +917,8 @@ (assert-authorized-user servlet-request "activate" (topology-config id)) (.activate nimbus name) (log-message "Activating topology '" name "'"))) - (resp/redirect (str "/api/v1/topology/" (url-encode id)))) - (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id] + (json-response (topology-op-response id "deactivate") (m "callback"))) + (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id & m] (with-nimbus nimbus (let [tplg (->> (doto (GetInfoOptions.) @@ -921,21 +928,27 @@ (assert-authorized-user servlet-request "deactivate" (topology-config id)) (.deactivate nimbus name) (log-message "Deactivating topology '" name "'"))) - (resp/redirect (str "/api/v1/topology/" (url-encode id)))) - (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time] + (json-response (topology-op-response id "deactivate") (m "callback"))) + (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m] (with-nimbus nimbus (let [tplg (->> (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/NONE)) (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id)) name (.get_name tplg) + rebalance-options (m "rebalanceOptions") options (RebalanceOptions.)] (assert-authorized-user servlet-request "rebalance" (topology-config id)) (.set_wait_secs options (Integer/parseInt wait-time)) + (if (and (not-nil? rebalance-options) (contains? rebalance-options "numWorkers")) + (.set_num_workers options (Integer/parseInt (.toString (rebalance-options "numWorkers"))))) + (if (and (not-nil? rebalance-options) (contains? rebalance-options "executors")) + (doseq [keyval (rebalance-options "executors")] + (.put_to_num_executors options (key keyval) (Integer/parseInt (.toString (val keyval)))))) (.rebalance nimbus name options) (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs"))) - (resp/redirect (str "/api/v1/topology/" (url-encode id)))) - (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time] + (json-response (topology-op-response id "rebalance") (m "callback"))) + (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m] (with-nimbus nimbus (let [tplg (->> (doto (GetInfoOptions.) @@ -947,7 +960,7 @@ (.set_wait_secs options (Integer/parseInt wait-time)) (.killTopologyWithOpts nimbus name options) (log-message "Killing topology '" name "' with wait time: " wait-time " secs"))) - (resp/redirect (str "/api/v1/topology/" (url-encode id)))) + (json-response (topology-op-response id "kill") (m "callback"))) (GET "/" [:as {cookies :cookies}] (resp/redirect "/index.html")) @@ -977,9 +990,10 @@ (def app (handler/site (-> main-routes - (wrap-reload '[backtype.storm.ui.core]) - (wrap-anti-forgery {:error-response csrf-error-response}) - catch-errors))) + (wrap-json-params) + (wrap-reload '[backtype.storm.ui.core]) + (wrap-anti-forgery {:error-response csrf-error-response}) + catch-errors))) (defn start-server! [] From b3597e1892ec30a095edc7a5a75b199faf6bced3 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 12 Feb 2015 09:51:27 -0800 Subject: [PATCH 2/2] STORM-581. Fixed type in doc. --- STORM-UI-REST-API.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/STORM-UI-REST-API.md b/STORM-UI-REST-API.md index f2ac8a5bfcc..fb66e31f7d7 100644 --- a/STORM-UI-REST-API.md +++ b/STORM-UI-REST-API.md @@ -645,7 +645,7 @@ Rebalances a topology. |rebalanceOptions| Json (optional) | topology rebalance options | -Sample rebalancOptions json: +Sample rebalanceOptions json: ```json {"rebalanceOptions" : {"numWorkers" : 2, "executors" : {"spout" :4, "count" : 10}}, "callback" : "foo"}