Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2018: Supervisor V2 #1697

Merged
merged 11 commits into from
Oct 27, 2016
6 changes: 3 additions & 3 deletions bin/storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ def healthcheck(*args):
Run health checks on the local supervisor.
"""
exec_storm_class(
"org.apache.storm.command.healthcheck",
"org.apache.storm.command.HealthCheck",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
Expand All @@ -572,7 +572,7 @@ def kill_workers(*args):
to have admin rights on the node to be able to successfully kill all workers.
"""
exec_storm_class(
"org.apache.storm.command.kill_workers",
"org.apache.storm.command.KillWorkers",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
Expand Down Expand Up @@ -656,7 +656,7 @@ def pacemaker(klass="org.apache.storm.pacemaker.pacemaker"):
extrajars=cppaths,
jvmopts=jvmopts)

def supervisor(klass="org.apache.storm.daemon.supervisor"):
def supervisor(klass="org.apache.storm.daemon.supervisor.Supervisor"):
"""Syntax: [storm supervisor]

Launches the supervisor daemon. This command should be run
Expand Down
2 changes: 1 addition & 1 deletion log4j2/cluster.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<configuration monitorInterval="60">
<properties>
<property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} [%p] %msg%n</property>
<property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %t %c{1.} [%p] %msg%n</property>
</properties>
<appenders>
<RollingFile name="A1" immediateFlush="false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
(ns org.apache.storm.cluster-state.zookeeper-state-factory
(:import [org.apache.curator.framework.state ConnectionStateListener])
(:import [org.apache.zookeeper KeeperException$NoNodeException KeeperException$NodeExistsException]
[org.apache.storm.cluster ClusterState DaemonType])
[org.apache.storm.cluster ClusterState DaemonType ZKStateStorage])
(:import [org.apache.storm.utils StormConnectionStateConverter])
(:use [org.apache.storm cluster config log util])
(:require [org.apache.storm [zookeeper :as zk]])
(:gen-class
:implements [org.apache.storm.cluster.ClusterStateFactory]))

(defn -mkStore [this conf auth-conf, acls, context]
(ZKStateStorage. conf, auth-conf, acls, context))

(defn -mkState [this conf auth-conf acls context]
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
Expand Down
88 changes: 0 additions & 88 deletions storm-core/src/clj/org/apache/storm/command/healthcheck.clj

This file was deleted.

4 changes: 2 additions & 2 deletions storm-core/src/clj/org/apache/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@
(swap! interval-errors inc)

(when (<= @interval-errors max-per-interval)
(cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)
(.report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)
(hostname storm-conf)
(.getThisWorkerPort (:worker-context executor)) error)
(.getThisWorkerPort ^WorkerTopologyContext (:worker-context executor)) error)
))))

;; in its own function so that it can be mocked out by tracked topologies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,16 @@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.command.kill-workers
(:import [java.io File])
(ns org.apache.storm.daemon.local-supervisor
(:import [org.apache.storm.daemon.supervisor Supervisor]
[org.apache.storm.utils ConfigUtils])
(:use [org.apache.storm.daemon common])
(:use [org.apache.storm util config])
(:require [org.apache.storm.daemon
[supervisor :as supervisor]])
(:gen-class))

(defn -main
"Construct the supervisor-data from scratch and kill the workers on this supervisor"
[& args]
(let [conf (read-storm-config)
conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath))
isupervisor (supervisor/standalone-supervisor)
supervisor-data (supervisor/supervisor-data conf nil isupervisor)
ids (supervisor/my-worker-ids conf)]
(doseq [id ids]
(supervisor/shutdown-worker supervisor-data id))))
(defserverfn mk-local-supervisor [conf shared-context isupervisor]
(if (not (ConfigUtils/isLocalMode conf))
(throw
(IllegalArgumentException. "Cannot start server in distrubuted mode!")))
(let [supervisor-server (Supervisor. conf shared-context isupervisor)]
(.launch supervisor-server)
supervisor-server))
7 changes: 4 additions & 3 deletions storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
(:import [java.nio ByteBuffer])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.daemon DirectoryCleaner])
(:import [org.apache.storm.daemon.supervisor SupervisorUtils])
(:import [org.yaml.snakeyaml Yaml]
[org.yaml.snakeyaml.constructor SafeConstructor])
(:import [org.apache.storm.ui InvalidRequestException]
[org.apache.storm.security.auth AuthUtils])
(:require [org.apache.storm.daemon common [supervisor :as supervisor]])
(:require [org.apache.storm.daemon common])
(:require [compojure.route :as route]
[compojure.handler :as handler]
[ring.middleware.keyword-params]
Expand Down Expand Up @@ -157,10 +158,10 @@
(defn get-alive-ids
[conf now-secs]
(->>
(supervisor/read-worker-heartbeats conf)
(clojurify-structure (SupervisorUtils/readWorkerHeartbeats conf))
(remove
#(or (not (val %))
(supervisor/is-worker-hb-timed-out? now-secs
(SupervisorUtils/isWorkerHbTimedOut now-secs
(val %)
conf)))
keys
Expand Down