From f679dac1733e5c20fabfda34634d0470466eb539 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Tue, 15 Mar 2016 16:43:58 -0500 Subject: [PATCH 1/2] [STORM-1631] - Storm CGroup bug when launching workers as the user that submitted the topology --- .../org/apache/storm/daemon/supervisor.clj | 24 ++++++++++++++----- .../container/ResourceIsolationInterface.java | 8 +++++++ .../storm/container/cgroup/CgroupManager.java | 16 ++++++++----- 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index fd8f6c94c71..6207137c379 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -241,14 +241,16 @@ (defn generate-supervisor-id [] (Utils/uuid)) -(defnk worker-launcher [conf user args :environment {} :log-prefix nil :exit-code-callback nil :directory nil] +(defnk worker-launcher [conf user args :environment {} :log-prefix nil :exit-code-callback nil :directory nil :launch-in-container? false :supervisor nil :worker-id nil] (let [_ (when (clojure.string/blank? user) (throw (java.lang.IllegalArgumentException. "User cannot be blank when calling worker-launcher."))) wl-initial (conf SUPERVISOR-WORKER-LAUNCHER) storm-home (System/getProperty "storm.home") wl (if wl-initial wl-initial (str storm-home "/bin/worker-launcher")) - command (concat [wl user] args)] + command (if launch-in-container? + (concat (.getLaunchCommandPrefix (:resource-isolation-manager supervisor) worker-id) [wl user] args) + (concat [wl user] args))] (log-message "Running as user:" user " command:" (pr-str command)) (Utils/launchProcess command environment @@ -1250,14 +1252,14 @@ command (->> command (map str) (filter (complement empty?))) - command (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) + command_final (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) (do (.reserveResourcesForWorker (:resource-isolation-manager supervisor) worker-id {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))}) (.getLaunchCommand (:resource-isolation-manager supervisor) worker-id (java.util.ArrayList. (java.util.Arrays/asList (to-array command))))) command)] - (log-message "Launching worker with command: " (Utils/shellCmd command)) + (log-message "Launching worker with command: " (Utils/shellCmd command_final)) (write-log-metadata! storm-conf user worker-id storm-id port conf) (ConfigUtils/setWorkerUserWSE conf worker-id user) (create-artifacts-link conf storm-id port worker-id) @@ -1270,8 +1272,18 @@ (remove-dead-worker worker-id) (create-blobstore-links conf storm-id worker-id) (if run-worker-as-user - (worker-launcher conf user ["worker" worker-dir (Utils/writeScript worker-dir command topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)) - (Utils/launchProcess command + (worker-launcher conf + user + ["worker" + worker-dir + (Utils/writeScript worker-dir command topology-worker-environment)] + :log-prefix log-prefix + :exit-code-callback callback + :directory (File. worker-dir) + :launch-in-container? (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) true false) + :supervisor supervisor + :worker-id worker-id) + (Utils/launchProcess command_final topology-worker-environment log-prefix callback diff --git a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java index 2db9f1bb35e..c5cad02482b 100644 --- a/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java +++ b/storm-core/src/jvm/org/apache/storm/container/ResourceIsolationInterface.java @@ -48,4 +48,12 @@ public interface ResourceIsolationInterface { */ List getLaunchCommand(String workerId, List existingCommand); + /** + * After reserving resources for the worker (i.e. calling reserveResourcesForWorker). this function can be used + * to get the launch command prefix + * @param workerId the of the worker + * @return the command line prefix for launching a worker with resource isolation + */ + List getLaunchCommandPrefix(String workerId); + } diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java index 875474a3090..80093b3aed9 100644 --- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java +++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupManager.java @@ -176,12 +176,17 @@ public void releaseResourcesForWorker(String workerId) { @Override public List getLaunchCommand(String workerId, List existingCommand) { + List newCommand = getLaunchCommandPrefix(workerId); + newCommand.addAll(existingCommand); + return newCommand; + } + @Override + public List getLaunchCommandPrefix(String workerId) { CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup); - if(!this.rootCgroup.getChildren().contains(workerGroup)) { - LOG.error("cgroup {} doesn't exist! Need to reserve resources for worker first!", workerGroup); - return existingCommand; + if (!this.rootCgroup.getChildren().contains(workerGroup)) { + throw new RuntimeException("cgroup " + workerGroup + " doesn't exist! Need to reserve resources for worker first!"); } StringBuilder sb = new StringBuilder(); @@ -189,9 +194,9 @@ public List getLaunchCommand(String workerId, List existingComma sb.append(this.conf.get(Config.STORM_CGROUP_CGEXEC_CMD)).append(" -g "); Iterator it = this.hierarchy.getSubSystems().iterator(); - while(it.hasNext()) { + while (it.hasNext()) { sb.append(it.next().toString()); - if(it.hasNext()) { + if (it.hasNext()) { sb.append(","); } else { sb.append(":"); @@ -200,7 +205,6 @@ public List getLaunchCommand(String workerId, List existingComma sb.append(workerGroup.getName()); List newCommand = new ArrayList(); newCommand.addAll(Arrays.asList(sb.toString().split(" "))); - newCommand.addAll(existingCommand); return newCommand; } From 3b4a4e3b8ea2fcaf0c1d58fb3abcec2966752369 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Tue, 15 Mar 2016 23:42:27 -0500 Subject: [PATCH 2/2] moving cgroups cleanup code to a better place so that retry will happen if failure occurs at cleanup --- storm-core/src/clj/org/apache/storm/daemon/supervisor.clj | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 6207137c379..d15a9d7e6d7 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -282,6 +282,11 @@ (defn try-cleanup-worker [conf supervisor id] (try + ;; clean up for resource isolation if enabled + (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) + (.releaseResourcesForWorker (:resource-isolation-manager supervisor) id)) + ;; Always make sure to clean up everything else before worker directory + ;; is removed since that is what is going to trigger the retry for cleanup (if (.exists (File. (ConfigUtils/workerRoot conf id))) (do (if (conf SUPERVISOR-RUN-WORKER-AS-USER) @@ -295,8 +300,6 @@ (ConfigUtils/removeWorkerUserWSE conf id) (remove-dead-worker id) )) - (if (conf STORM-RESOURCE-ISOLATION-PLUGIN-ENABLE) - (.releaseResourcesForWorker (:resource-isolation-manager supervisor) id)) (catch IOException e (log-warn-error e "Failed to cleanup worker " id ". Will retry later")) (catch RuntimeException e