diff --git a/src/clj/backtype/storm/cluster.clj b/src/clj/backtype/storm/cluster.clj index 7231b15d6..c649fa92e 100644 --- a/src/clj/backtype/storm/cluster.clj +++ b/src/clj/backtype/storm/cluster.clj @@ -131,12 +131,14 @@ (def SUPERVISORS-ROOT "supervisors") (def WORKERBEATS-ROOT "workerbeats") (def ERRORS-ROOT "errors") +(def CUSTOMERCONTEXT-ROOT "customercontext") (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) (def STORMS-SUBTREE (str "/" STORMS-ROOT)) (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) (def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) +(def CUSTOMERCONTEXT-SUBTREE (str "/" CUSTOMERCONTEXT-ROOT)) (defn supervisor-path [id] (str SUPERVISORS-SUBTREE "/" id)) @@ -144,6 +146,9 @@ (defn assignment-path [id] (str ASSIGNMENTS-SUBTREE "/" id)) +(defn customercontext-path [id] + (str CUSTOMERCONTEXT-SUBTREE "/" id)) + (defn storm-path [id] (str STORMS-SUBTREE "/" id)) @@ -329,6 +334,7 @@ (remove-storm! [this storm-id] (delete-node cluster-state (assignment-path storm-id)) + (delete-node cluster-state (customercontext-path storm-id)) (remove-storm-base! this storm-id)) (report-error [this storm-id component-id error] diff --git a/src/clj/backtype/storm/daemon/worker.clj b/src/clj/backtype/storm/daemon/worker.clj index eb060eb82..45362af66 100644 --- a/src/clj/backtype/storm/daemon/worker.clj +++ b/src/clj/backtype/storm/daemon/worker.clj @@ -3,6 +3,7 @@ (:use [backtype.storm bootstrap]) (:require [backtype.storm.daemon [executor :as executor]]) (:import [java.util.concurrent Executors]) + (:import [backtype.storm.task CustomerContext]) (:gen-class)) (bootstrap) @@ -430,4 +431,5 @@ (defn -main [storm-id assignment-id port-str worker-id] (let [conf (read-storm-config)] (validate-distributed-mode! conf) - (mk-worker conf nil (java.net.URLDecoder/decode storm-id) assignment-id (Integer/parseInt port-str) worker-id))) + (mk-worker conf nil (java.net.URLDecoder/decode storm-id) assignment-id (Integer/parseInt port-str) worker-id) + (CustomerContext/start conf))) diff --git a/src/clj/backtype/storm/ui/core.clj b/src/clj/backtype/storm/ui/core.clj index cbe05fe74..feb5ee3a4 100644 --- a/src/clj/backtype/storm/ui/core.clj +++ b/src/clj/backtype/storm/ui/core.clj @@ -13,10 +13,13 @@ Nimbus$Client StormTopology GlobalStreamId RebalanceOptions KillOptions]) (:import [java.io File]) + (:import [backtype.storm.utils Utils]) + (:import [backtype.storm.task CustomerContext]) (:require [compojure.route :as route] [compojure.handler :as handler] [ring.util.response :as resp] [backtype.storm [thrift :as thrift]]) + (:require [backtype.storm [zookeeper :as zk]]) (:gen-class)) (def ^:dynamic *STORM-CONF* (read-storm-config)) @@ -558,7 +561,7 @@ (nil-to-zero (:failed stats))]) ))) -(defn spout-executor-table [topology-id executors window include-sys?] +(defn spout-executor-table [topology-id component executors window include-sys?] (sorted-table ["Id" "Uptime" "Host" "Port" "Emitted" "Transferred" "Complete latency (ms)" "Acked" "Failed"] @@ -570,7 +573,8 @@ aggregate-spout-streams swap-map-order (get window)))]] - [(pretty-executor-info (.get_executor_info e)) + [(let [pretty-executor-info (pretty-executor-info (.get_executor_info e))] + (link-to (url-format "/topology/%s/component/%s/executor/%s" topology-id component pretty-executor-info) pretty-executor-info)) (pretty-uptime-sec (.get_uptime_secs e)) (.get_host e) (.get_port e) @@ -595,7 +599,7 @@ [[:h2 "Output stats" window-hint]] (spout-output-summary-table stream-summary window) [[:h2 "Executors" window-hint]] - (spout-executor-table (.get_id topology-info) executors window include-sys?) + (spout-executor-table (.get_id topology-info) component executors window include-sys?) ;; task id, task uptime, stream aggregated stats, last error ))) @@ -633,7 +637,7 @@ ]) ))) -(defn bolt-executor-table [topology-id executors window include-sys?] +(defn bolt-executor-table [topology-id component executors window include-sys?] (sorted-table ["Id" "Uptime" "Host" "Port" "Emitted" "Transferred" "Capacity (last 10m)" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"] @@ -645,7 +649,8 @@ (aggregate-bolt-streams) swap-map-order (get window)))]] - [(pretty-executor-info (.get_executor_info e)) + [(let [pretty-executor-info (pretty-executor-info (.get_executor_info e))] + (link-to (url-format "/topology/%s/component/%s/executor/%s" topology-id component pretty-executor-info) pretty-executor-info)) (pretty-uptime-sec (.get_uptime_secs e)) (.get_host e) (.get_port e) @@ -699,7 +704,7 @@ (bolt-output-summary-table stream-summary window) [[:h2 "Executors"]] - (bolt-executor-table (.get_id topology-info) executors window include-sys?) + (bolt-executor-table (.get_id topology-info) component executors window include-sys?) ))) (defn errors-table [errors-list] @@ -736,6 +741,25 @@ (errors-table (get (.get_errors summ) component))] )))) +(defn executor-page [topology-id component executor window include-sys?] + (let [conf *STORM-CONF* + split-result (clojure.string/split executor #"-") + left-part (first split-result) + right-part (last split-result) + start-taskid (subs left-part 1) + end-taskid (subs right-part 0 (- (count right-part) 1)) + taskids (range (Integer/parseInt start-taskid) (+ (Integer/parseInt end-taskid) 1))] + (for [taskid taskids] + (let [path (str (conf STORM-ZOOKEEPER-ROOT) "/" CustomerContext/CUSTOMERCONTEXT_ROOT "/" topology-id "/" taskid) + zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf) + task-context-data (zk/get-data zk path false) + task-context-map (when task-context-data (Utils/deserialize task-context-data)) + task-context-table-content (concat + [[:h2 (link-to (url-format "/topology/%s/component/%s" topology-id component) (str "TaskID:" taskid))]] + (configuration-table task-context-map))] + (.close zk) + task-context-table-content)))) + (defn get-include-sys? [cookies] (let [sys? (get cookies "sys") sys? (if (or (nil? sys?) (= "false" (:value sys?))) false true)] @@ -755,6 +779,11 @@ (-> (component-page id component (:window m) include-sys?) (concat [(mk-system-toggle-button include-sys?)]) ui-template))) + (GET "/topology/:id/component/:component/executor/:executor" [:as {cookies :cookies} id component executor & m] + (let [include-sys? (get-include-sys? cookies)] + (-> (executor-page id component executor (:window m) include-sys?) + (concat [(mk-system-toggle-button include-sys?)]) + ui-template))) (POST "/topology/:id/activate" [id] (with-nimbus nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) diff --git a/src/jvm/backtype/storm/nimbus/NimbusCloudStorage.java b/src/jvm/backtype/storm/nimbus/NimbusCloudStorage.java new file mode 100644 index 000000000..1cebbd483 --- /dev/null +++ b/src/jvm/backtype/storm/nimbus/NimbusCloudStorage.java @@ -0,0 +1,245 @@ +package backtype.storm.nimbus; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; + +import backtype.storm.Config; +import backtype.storm.utils.Utils; + +import com.netflix.curator.framework.CuratorFramework; + +/** + * NimbusCloudStorage is an implementation of INimbusStorage which gives out a solution of nimbus-single-node problem. + * Using NimbusCloudStorage you can now start nimbus processes as many as you want on your clusters just ensure that one machine has only one nimbus process. + * + * usage: add [nimbus.storage: "backtype.storm.nimbus.NimbusCloudStorage"] to storm/conf/storm.yaml + * + * internal keynotes: + * NimbusCloudStorage will start a loop thread on init when "storm nimbus" command is executed. + * The loop thread will then recurrently compare the zk-assignments and local dir "nimbus/stormdist" to download codes. + * The loop interval in seconds (default 2) can be customerized by add [nimbus.storage.cloud.loop.secs:"10"] to storm/conf/storm.yaml + * + * @author yveschina + * + */ +public class NimbusCloudStorage implements INimbusStorage { + + private static Logger logger = Logger.getLogger(NimbusCloudStorage.class); + + private int nimbus_cloud_storage_loop_secs = 2; + private String storm_nimbus_dir; + @SuppressWarnings("rawtypes") + private Map conf; + + private static volatile Thread storageSyncThread = null; + private static final String storm_assignments_folder = "assignments"; + private static final String storm_stormdist_folder = "stormdist"; + private static final String storm_nimbus_tmp_folder = "tmp"; + public static final String STORM_NIMBUS_CLOUD_STORAGE_LOOP_SECS = "nimbus.storage.cloud.loop.secs"; + + @Override + public void init(@SuppressWarnings("rawtypes") Map conf) { + this.conf = conf; + setNimbusLocalDir(); + setLoopSecs(); + try { + FileUtils.forceMkdir(new File(storm_nimbus_dir)); + } catch (IOException e) { + e.printStackTrace(); + } + initStorageSyncThread(); + } + + private void setNimbusLocalDir() { + String storm_local_dir = (String) conf.get(Config.STORM_LOCAL_DIR); + String nimbus_local_dir = (String) conf.get("nimbus.local.dir"); + String local_dir = storm_local_dir; + if (StringUtils.isNotBlank(nimbus_local_dir)) { + local_dir = nimbus_local_dir; + } + storm_nimbus_dir = local_dir + "/nimbus"; + } + + private void setLoopSecs() { + if (conf.containsKey(STORM_NIMBUS_CLOUD_STORAGE_LOOP_SECS)) { + nimbus_cloud_storage_loop_secs = Integer.valueOf(conf.get(STORM_NIMBUS_CLOUD_STORAGE_LOOP_SECS).toString()).intValue(); + } + } + + private void initStorageSyncThread() { + if (storageSyncThread == null) { + storageSyncThread = new Thread(new Runnable() { + @SuppressWarnings("unchecked") + @Override + public void run() { + logger.info("storage sync thread start running"); + // init curatorFramework + List servers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); + CuratorFramework curatorFramework = Utils.newCurator(conf, servers, port); + curatorFramework.start(); + + // Get /storm/assignments Path + String storm_zookeeper_root = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT); + String assignments_dir = storm_zookeeper_root + "/" + storm_assignments_folder; + + String storm_stormdist_dir = storm_nimbus_dir + "/" + storm_stormdist_folder; + File storm_stormdist = new File(storm_stormdist_dir); + if (!storm_stormdist.exists()) { + try { + FileUtils.forceMkdir(storm_stormdist); + } catch (IOException e) { + e.printStackTrace(); + } + } + + do { + // check to download storm code + try { + Thread.sleep(nimbus_cloud_storage_loop_secs*1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // get assignments storm-id from zk + List zkStorms = null; + try { + zkStorms = curatorFramework.getChildren().forPath(assignments_dir); + } catch (Exception e1) { + e1.printStackTrace(); + } + + if (null == zkStorms || zkStorms.size() == 0) { + continue; + } + + // get downloaded storm-id from local + List localStorms = list(storm_stormdist_folder); + if (null == localStorms) { + localStorms = new ArrayList(); + } + + // find storms need to download code + Set localStormsSet = new HashSet(localStorms); + List stormsAbsent = new ArrayList(); + for (String zkStorm : zkStorms) { + if (!localStormsSet.contains(zkStorm)) { + stormsAbsent.add(zkStorm); + } + } + + if (stormsAbsent.size() == 0) { + continue; + } + + // download code from leader + for (String storm : stormsAbsent) { + logger.info("start to download storm code!storm-id:" + storm); + try { + byte[] data = curatorFramework.getData().forPath(assignments_dir + "/" + storm); + Object assignment = (Object) Utils.deserialize(data); + String master_code_dir = assignment.getClass().getDeclaredField("master_code_dir").get(assignment).toString(); + + String storm_dir_tmp = storm_nimbus_dir + "/" + storm_nimbus_tmp_folder + "/" + UUID.randomUUID(); + FileUtils.forceMkdir(new File(storm_dir_tmp)); + + Utils.downloadFromMaster(conf, master_code_dir + "/stormjar.jar", storm_dir_tmp + "/stormjar.jar"); + Utils.downloadFromMaster(conf, master_code_dir + "/stormcode.ser", storm_dir_tmp + "/stormcode.ser"); + Utils.downloadFromMaster(conf, master_code_dir + "/stormconf.ser", storm_dir_tmp + "/stormconf.ser"); + + String storm_dir_stormdist = storm_nimbus_dir + "/" + storm_stormdist_folder + "/" + storm; + FileUtils.moveDirectory(new File(storm_dir_tmp), new File(storm_dir_stormdist)); + } catch (Exception e) { + e.printStackTrace(); + } + logger.info("storm code download complete!storm-id:" + storm); + } + } while (true); + } + }); + storageSyncThread.start(); + } + } + + @Override + public InputStream open(String path) { + FileInputStream inputStream = null; + try { + inputStream = new FileInputStream(getAbsolutePath(path)); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + return inputStream; + } + + @Override + public OutputStream create(String path) { + FileOutputStream outputStream = null; + try { + outputStream = new FileOutputStream(getAbsolutePath(path)); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + return outputStream; + } + + @Override + public List list(String path) { + File file = new File(getAbsolutePath(path)); + String[] files = file.list(); + return files == null ? null : Arrays.asList(files); + } + + @Override + public void delete(String path) { + File file = new File(getAbsolutePath(path)); + if (file.exists()) { + try { + FileUtils.forceDelete(file); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @Override + public void mkdirs(String path) { + try { + FileUtils.forceMkdir(new File(getAbsolutePath(path))); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + @Override + public boolean isSupportDistributed() { + return true; + } + + private String getAbsolutePath(String path) { + String absolute_path = storm_nimbus_dir; + if (path.startsWith("/")) { + absolute_path += path; + } else { + absolute_path += "/" + path; + } + return absolute_path; + } +} diff --git a/src/jvm/backtype/storm/task/CustomerContext.java b/src/jvm/backtype/storm/task/CustomerContext.java new file mode 100644 index 000000000..a23c3759f --- /dev/null +++ b/src/jvm/backtype/storm/task/CustomerContext.java @@ -0,0 +1,136 @@ +package backtype.storm.task; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; + +import backtype.storm.Config; +import backtype.storm.utils.Utils; + +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.utils.ZKPaths; + +/** + * CustomerContext is used by spoult/bolt to save customerized data which could + * be shown in storm-ui for task dimension monitor requirments. + * + * @author yveschina + * + */ +public class CustomerContext { + + private static Logger logger = Logger.getLogger(CustomerContext.class); + /** + * + * Map> + * + */ + private static final Map>> workerContextMap = new ConcurrentHashMap>>(); + public static String CUSTOMERCONTEXT_ROOT = "customercontext"; + private static Thread monitorThread = null; + + public static void start(@SuppressWarnings("rawtypes") final Map conf) { + synchronized (CustomerContext.class) { + if (monitorThread == null) { + monitorThread = new Thread(new Runnable() { + + @SuppressWarnings("unchecked") + @Override + public void run() { + + logger.info("CustomerContext serialization thread start running"); + + List servers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); + CuratorFramework curatorFramework = Utils.newCurator(conf, servers, port); + curatorFramework.start(); + + do { + + if (workerContextMap.size() == 0) { + continue; + } + + for (Entry>> workerContextEntry : workerContextMap.entrySet()) { + String stormId = workerContextEntry.getKey(); + ConcurrentHashMap> stormContextMap = workerContextEntry.getValue(); + for (Entry> stormContextEntry : stormContextMap.entrySet()) { + Integer taskId = stormContextEntry.getKey(); + Map taskContextMap = stormContextEntry.getValue(); + String path = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/" + CUSTOMERCONTEXT_ROOT + "/" + stormId + + "/" + taskId; + try { + ZKPaths.mkdirs(curatorFramework.getZookeeperClient().getZooKeeper(), path); + curatorFramework.setData().forPath(path, Utils.serialize(taskContextMap)); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } while (true); + } + }); + monitorThread.start(); + } + } + } + + /** + * save customer context k,v + * + * @param key + * @param value + * @param topologyContext + */ + public static void put(String key, Object value, TopologyContext topologyContext) { + String stormId = topologyContext.getStormId(); + ConcurrentHashMap> stormContextMap = null; + if (workerContextMap.containsKey(stormId)) { + stormContextMap = workerContextMap.get(stormId); + } else { + stormContextMap = new ConcurrentHashMap>(); + workerContextMap.put(stormId, stormContextMap); + } + Integer taskId = topologyContext.getThisTaskId(); + Map taskContextMap = null; + if (stormContextMap.containsKey(taskId)) { + taskContextMap = stormContextMap.get(taskId); + } else { + taskContextMap = new HashMap(); + stormContextMap.put(taskId, taskContextMap); + } + taskContextMap.put(key, value); + } + + /** + * get value by k saved by CustomerContext.put(String key, Object value, + * TopologyContext topologyContext) + * + * @param key + * @param topologyContext + * @return Object + */ + public static Object get(String key, TopologyContext topologyContext) { + Object object = null; + String stormId = topologyContext.getStormId(); + if (workerContextMap.containsKey(stormId)) { + ConcurrentHashMap> stormContextMap = workerContextMap.get(stormId); + Integer taskId = topologyContext.getThisTaskId(); + Map taskContextMap = stormContextMap.get(taskId); + if (taskContextMap.containsKey(key)) { + object = taskContextMap.get(key); + } + } + return object; + } +}