Skip to content

Commit

Permalink
STORM-3026: Upgrade ZK instance for security
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Evans committed Apr 13, 2018
1 parent 58f7aef commit 22a9620
Show file tree
Hide file tree
Showing 23 changed files with 568 additions and 140 deletions.
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ storm.messaging.transport: "org.apache.storm.messaging.netty.Context"
storm.nimbus.retry.times: 5
storm.nimbus.retry.interval.millis: 2000
storm.nimbus.retry.intervalceiling.millis: 60000
storm.nimbus.zookeeper.acls.check: true
storm.nimbus.zookeeper.acls.fixup: true
storm.auth.simple-white-list.users: []
storm.auth.simple-acl.users: []
storm.auth.simple-acl.users.commands: []
Expand Down
61 changes: 41 additions & 20 deletions storm-core/src/clj/org/apache/storm/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,22 @@
(:require [org.apache.storm [zookeeper :as zk]])
(:require [org.apache.storm.daemon [common :as common]]))

(defn mk-topo-only-acls
[topo-conf]
(defn mk-topo-acls
[topo-conf type]
(let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)]
(when (Utils/isZkAuthenticationConfiguredTopology topo-conf)
[(first ZooDefs$Ids/CREATOR_ALL_ACL)
(ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))])))

(ACL. type (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))])))

(defn mk-topo-read-write-acls
[topo-conf]
(mk-topo-acls topo-conf ZooDefs$Perms/ALL))

(defn mk-topo-read-only-acls
[topo-conf]
[topo-conf]
(mk-topo-acls topo-conf ZooDefs$Perms/READ))

(defnk mk-distributed-cluster-state
[conf :auth-conf nil :acls nil :context (ClusterStateContext.)]
(let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE)
Expand Down Expand Up @@ -68,26 +77,27 @@
(executor-beats [this storm-id executor->node+port])
(supervisors [this callback])
(supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
(setup-heartbeats! [this storm-id])
(setup-heartbeats! [this storm-id topo-conf])
(teardown-heartbeats! [this storm-id])
(teardown-topology-errors! [this storm-id])
(heartbeat-storms [this])
(error-topologies [this])
(backpressure-topologies [this])
(set-topology-log-config! [this storm-id log-config])
(set-topology-log-config! [this storm-id log-config topo-conf])
(topology-log-config [this storm-id cb])
(worker-heartbeat! [this storm-id node port info])
(remove-worker-heartbeat! [this storm-id node port])
(supervisor-heartbeat! [this supervisor-id info])
(worker-backpressure! [this storm-id node port info])
(topology-backpressure [this storm-id callback])
(setup-backpressure! [this storm-id])
(setup-backpressure! [this storm-id topo-conf])
(remove-backpressure! [this storm-id])
(remove-worker-backpressure! [this storm-id node port])
(activate-storm! [this storm-id storm-base])
(activate-storm! [this storm-id storm-base topo-conf])
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
(set-assignment! [this storm-id info])
(setup-errors! [this storm-id topo-conf])
(set-assignment! [this storm-id info topo-conf])
;; sets up information related to key consisting of nimbus
;; host:port and version info of the blob
(setup-blobstore! [this key nimbusInfo versionInfo])
Expand Down Expand Up @@ -416,8 +426,9 @@
(maybe-deserialize (.get_data cluster-state (log-config-path storm-id) (not-nil? cb)) LogConfig))

(set-topology-log-config!
[this storm-id log-config]
(.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls))
[this storm-id log-config topo-conf]
(.mkdirs cluster-state LOGCONFIG-SUBTREE acls)
(.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) (mk-topo-read-only-acls topo-conf)))

(set-worker-profile-request
[this storm-id profile-request]
Expand Down Expand Up @@ -472,8 +483,9 @@
(.delete_worker_hb cluster-state (workerbeat-path storm-id node port)))

(setup-heartbeats!
[this storm-id]
(.mkdirs cluster-state (workerbeat-storm-root storm-id) acls))
[this storm-id topo-conf]
(.mkdirs cluster-state WORKERBEATS-SUBTREE acls)
(.mkdirs cluster-state (workerbeat-storm-root storm-id) (mk-topo-read-write-acls topo-conf)))

(teardown-heartbeats!
[this storm-id]
Expand Down Expand Up @@ -506,8 +518,9 @@
(> (count children) 0)))

(setup-backpressure!
[this storm-id]
(.mkdirs cluster-state (backpressure-storm-root storm-id) acls))
[this storm-id topo-conf]
(.mkdirs cluster-state BACKPRESSURE-SUBTREE acls)
(.mkdirs cluster-state (backpressure-storm-root storm-id) (mk-topo-read-write-acls topo-conf)))

(remove-backpressure!
[this storm-id]
Expand All @@ -533,9 +546,10 @@
(.set_ephemeral_node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls)))

(activate-storm!
[this storm-id storm-base]
[this storm-id storm-base topo-conf]
(let [thrift-storm-base (thriftify-storm-base storm-base)]
(.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls)))
(.mkdirs cluster-state STORMS-SUBTREE acls)
(.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) (mk-topo-read-only-acls topo-conf))))

(update-storm!
[this storm-id new-elems]
Expand All @@ -562,9 +576,10 @@
(.delete_node cluster-state (storm-path storm-id)))

(set-assignment!
[this storm-id info]
[this storm-id info topo-conf]
(.mkdirs cluster-state ASSIGNMENTS-SUBTREE acls)
(let [thrift-assignment (thriftify-assignment info)]
(.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))
(.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) (mk-topo-read-only-acls topo-conf))))

(remove-blobstore-key!
[this blob-key]
Expand All @@ -585,9 +600,10 @@

(set-credentials!
[this storm-id creds topo-conf]
(let [topo-acls (mk-topo-only-acls topo-conf)
(let [topo-acls (mk-topo-read-only-acls topo-conf)
path (credentials-path storm-id)
thriftified-creds (thriftify-credentials creds)]
(.mkdirs cluster-state CREDENTIALS-SUBTREE acls)
(.set_data cluster-state path (Utils/serialize thriftified-creds) topo-acls)))

(credentials
Expand All @@ -596,6 +612,11 @@
(swap! credentials-callback assoc storm-id callback))
(clojurify-crdentials (maybe-deserialize (.get_data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials)))

(setup-errors!
[this storm-id topo-conf]
(.mkdirs cluster-state ERRORS-SUBTREE acls)
(.mkdirs cluster-state (error-storm-root storm-id) (mk-topo-read-write-acls topo-conf)))

(report-error
[this storm-id component-id node port error]
(let [path (error-path storm-id component-id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
(ZKStateStorage. conf, auth-conf, acls, context))

(defn -mkState [this conf auth-conf acls context]
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) acls :auth-conf auth-conf)]
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
(.close zk))
(let [callbacks (atom {})
active (atom true)
zk-writer (zk/mk-client conf
(conf STORM-ZOOKEEPER-SERVERS)
(conf STORM-ZOOKEEPER-PORT)
acls
:auth-conf auth-conf
:root (conf STORM-ZOOKEEPER-ROOT)
:watcher (fn [state type path]
Expand All @@ -50,6 +51,7 @@
(zk/mk-client conf
(conf STORM-ZOOKEEPER-SERVERS)
(conf STORM-ZOOKEEPER-PORT)
acls
:auth-conf auth-conf
:root (conf STORM-ZOOKEEPER-ROOT)
:watcher (fn [state type path]
Expand Down
24 changes: 11 additions & 13 deletions storm-core/src/clj/org/apache/storm/command/shell_submission.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,18 @@
;; limitations under the License.
(ns org.apache.storm.command.shell-submission
(:import [org.apache.storm StormSubmitter])
(:use [org.apache.storm thrift util config log zookeeper])
(:use [org.apache.storm util config log])
(:require [clojure.string :as str])
(:import [org.apache.storm.utils ConfigUtils NimbusClient])
(:gen-class))


(defn -main [^String tmpjarpath & args]
(let [conf (read-storm-config)
; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
zk-leader-elector (zk-leader-elector conf nil)
leader-nimbus (.getLeader zk-leader-elector)
host (.getHost leader-nimbus)
port (.getPort leader-nimbus)
no-op (.close zk-leader-elector)
jarpath (StormSubmitter/submitJar conf tmpjarpath)
args (concat args [host port jarpath])]
(exec-command! (str/join " " args))
))
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
(with-open [client (NimbusClient/getConfiguredClient conf)]
(let [c (.getClient client)
ns (.getLeader c)
host (.get_host ns)
port (.get_port ns)
jarpath (StormSubmitter/submitJar conf tmpjarpath)
args (concat args [host port jarpath])]
(exec-command! (str/join " " args))))))
41 changes: 27 additions & 14 deletions storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
[stats :as stats]])
(:require [org.apache.storm.ui.core :as ui])
(:require [clojure.set :as set])
(:import [org.apache.storm.zookeeper AclEnforcement])
(:import [org.apache.storm.daemon.common StormBase Assignment])
(:use [org.apache.storm.daemon common])
(:use [org.apache.storm config])
Expand Down Expand Up @@ -133,6 +134,16 @@
scheduler
))

(def NIMBUS-ZK-ACLS ZooDefs$Ids/CREATOR_ALL_ACL)

(defn mk-zk-client [conf]
(let [zk-servers (conf STORM-ZOOKEEPER-SERVERS)
zk-port (conf STORM-ZOOKEEPER-PORT)
zk-root (conf STORM-ZOOKEEPER-ROOT)]
(if (and zk-servers zk-port)
(mk-client conf zk-servers zk-port (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil) :root zk-root
:auth-conf conf))))

(defmulti blob-sync cluster-mode)

(defnk is-leader [nimbus :throw-exception true]
Expand All @@ -142,10 +153,6 @@
(let [leader-address (.getLeader leader-elector)]
(throw (RuntimeException. (str "not a leader, current leader is " leader-address))))))))

(def NIMBUS-ZK-ACLS
[(first ZooDefs$Ids/CREATOR_ALL_ACL)
(ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])

(defn mk-blob-cache-map
"Constructs a TimeCacheMap instance with a blob store timeout whose
expiration callback invokes cancel on the value held by an expired entry when
Expand Down Expand Up @@ -213,7 +220,7 @@
(exit-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
:leader-elector (zk-leader-elector conf blob-store)
:leader-elector (zk-leader-elector conf blob-store (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil))
:id->sched-status (atom {})
:node-id->resources (atom {}) ;;resources of supervisors
:id->resources (atom {}) ;;resources of topologies
Expand Down Expand Up @@ -454,7 +461,7 @@

(defn- get-version-for-key [key nimbus-host-port-info conf]
(let [version (KeySequenceNumber. key nimbus-host-port-info)]
(.getKeySequenceNumber version conf)))
(.getKeySequenceNumber version conf (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil))))

(defn get-key-seq-from-blob-store [blob-store]
(let [key-iter (.listKeys blob-store)]
Expand Down Expand Up @@ -1002,7 +1009,7 @@
td (.get tds tid)
assignment (if (and (not (:owner assignment)) (not (nil? td)))
(let [new-assignment (fixup-assignment assignment td)]
(.set-assignment! storm-cluster-state tid new-assignment)
(.set-assignment! storm-cluster-state tid new-assignment (.getConf td))
new-assignment)
assignment)]
{tid assignment}))))]
Expand Down Expand Up @@ -1060,7 +1067,7 @@
(log-debug "Assignment for " topology-id " hasn't changed")
(do
(log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
(.set-assignment! storm-cluster-state topology-id assignment)
(.set-assignment! storm-cluster-state topology-id assignment (.getConf topology-details))
)))
(->> new-assignments
(map (fn [[topology-id assignment]]
Expand Down Expand Up @@ -1098,7 +1105,8 @@
nil
nil
{}
principal))
principal)
storm-conf)
(notify-topology-action-listener nimbus storm-name "activate")))

(defn storm-active? [storm-cluster-state storm-name]
Expand Down Expand Up @@ -1744,9 +1752,10 @@
(log-message "uploadedJar " uploadedJarLocation)
(setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology)
(wait-for-desired-code-replication nimbus total-storm-conf storm-id)
(.setup-heartbeats! storm-cluster-state storm-id)
(.setup-heartbeats! storm-cluster-state storm-id total-storm-conf)
(.setup-errors! storm-cluster-state storm-id total-storm-conf)
(if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
(.setup-backpressure! storm-cluster-state storm-id))
(.setup-backpressure! storm-cluster-state storm-id total-storm-conf))
(notify-topology-action-listener nimbus storm-name "submitTopology")
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
TopologyInitialStatus/ACTIVE :active}]
Expand Down Expand Up @@ -1892,7 +1901,7 @@
(.containsKey named-loggers logger-name))
(.remove named-loggers logger-name))))))
(log-message "Setting log config for " storm-name ":" merged-log-config)
(.set-topology-log-config! storm-cluster-state id merged-log-config)))
(.set-topology-log-config! storm-cluster-state id merged-log-config topology-conf)))

(uploadNewCredentials [this storm-name credentials]
(mark! nimbus:num-uploadNewCredentials-calls)
Expand Down Expand Up @@ -2565,8 +2574,12 @@
(defn -launch [nimbus]
(let [conf (merge
(read-storm-config)
(read-yaml-config "storm-cluster-auth.yaml" false))]
(launch-server! conf nimbus)))
(read-yaml-config "storm-cluster-auth.yaml" false))
fixup-acl (conf STORM-NIMBUS-ZOOKEEPER-ACLS-FIXUP)
check-acl (or fixup-acl (conf STORM-NIMBUS-ZOOKEEPER-ACLS-CHECK))]
(when check-acl
(AclEnforcement/verifyAcls conf fixup-acl))
(launch-server! conf nimbus)))

(defn standalone-nimbus []
(reify INimbus
Expand Down
8 changes: 4 additions & 4 deletions storm-core/src/clj/org/apache/storm/zookeeper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
(log-message "Zookeeper state update: " state type path))

(defnk mk-client
[conf servers port
[conf servers port default-acl
:root ""
:watcher default-watcher
:auth-conf nil]
(let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
(let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)) default-acl)]
(.. fk
(getCuratorListenable)
(addListener
Expand Down Expand Up @@ -252,9 +252,9 @@

(defn zk-leader-elector
"Zookeeper Implementation of ILeaderElector."
[conf blob-store]
[conf blob-store default-acl]
(let [servers (conf STORM-ZOOKEEPER-SERVERS)
zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)
zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) default-acl :auth-conf conf)
leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
id (.toHostPortString (NimbusInfo/fromConf conf))
leader-latch (atom (LeaderLatch. zk leader-lock-path id))
Expand Down
15 changes: 15 additions & 0 deletions storm-core/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ public class Config extends HashMap<String, Object> {
//DO NOT CHANGE UNLESS WE ADD IN STATE NOT STORED IN THE PARENT CLASS
private static final long serialVersionUID = -1550278723792864455L;

/**
* In nimbus on startup check if all of the zookeeper ACLs are correct before starting. If not
* don't start nimbus.
*/
@isBoolean
public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK = "storm.nimbus.zookeeper.acls.check";

/**
* In nimbus on startup check if all of the zookeeper ACLs are correct before starting. If not do
* your best to fix them before nimbus starts, if it cannot fix them nimbus will not start.
* This overrides any value set for storm.nimbus.zookeeper.acls.check.
*/
@isBoolean
public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP = "storm.nimbus.zookeeper.acls.fixup";

/**
* This is part of a temporary workaround to a ZK bug, it is the 'scheme:acl' for
* the user Nimbus and Supervisors use to authenticate with ZK.
Expand Down
Loading

0 comments on commit 22a9620

Please sign in to comment.