Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions STORM-UI-REST-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ Response fields:
|windowHint| String | window param value in "hh mm ss" format. Default value is "All Time"|
|executors| Integer |Number of executor tasks in the component|
|componentErrors| Array of Errors | List of component errors|
|componentErrors.time| Long | Timestamp when the exception occurred |
|componentErrors.errorTime| Long | Timestamp when the exception occurred (Prior to 0.11.0, this field was named 'time'.)|
|componentErrors.errorHost| String | host name for the error|
|componentErrors.errorPort| String | port for the error|
|componentErrors.error| String |Shows the error happened in a component|
Expand Down Expand Up @@ -483,7 +483,7 @@ Sample response:
"componentType": "spout",
"windowHint": "10m 0s",
"executors": 5,
"componentErrors":[{"time": 1406006074000,
"componentErrors":[{"errorTime": 1406006074000,
"errorHost": "10.11.1.70",
"errorPort": 6701,
"errorWorkerLogLink": "http://10.11.1.7:8000/log?file=worker-6701.log",
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@
(doseq [[_ component] (all-components ret)
:let [common (.get_common component)]]
(.put_to_streams common EVENTLOGGER-STREAM-ID (thrift/output-fields (eventlogger-bolt-fields))))
(.put_to_bolts ret "__eventlogger" eventlogger-bolt)
(.put_to_bolts ret EVENTLOGGER-COMPONENT-ID eventlogger-bolt)
))

(defn add-metric-components! [storm-conf ^StormTopology topology]
Expand Down
156 changes: 134 additions & 22 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice
LogConfig LogLevel LogLevelAction])
ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction])
(:import [backtype.storm.daemon Shutdownable])
(:use [backtype.storm util config log timer zookeeper])
(:require [backtype.storm [cluster :as cluster] [stats :as stats] [converter :as converter]])
(:require [backtype.storm [cluster :as cluster]
[converter :as converter]
[stats :as stats]
[tuple :as tuple]])
(:require [clojure.set :as set])
(:import [backtype.storm.daemon.common StormBase Assignment])
(:use [backtype.storm.daemon common])
Expand Down Expand Up @@ -978,13 +981,6 @@
(.set_host (:host %))
(.set_port (:port %))))))

(defn- get-last-error
[storm-cluster-state storm-id component-id]
(if-let [e (.last-error storm-cluster-state storm-id component-id)]
(doto (ErrorInfo. (:error e) (:time-secs e))
(.set_host (:host e))
(.set_port (:port e)))))

(defn- thriftify-executor-id [[first-task-id last-task-id]]
(ExecutorInfo. (int first-task-id) (int last-task-id)))

Expand Down Expand Up @@ -1082,7 +1078,42 @@
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf inimbus)
principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
get-common-topo-info
(fn [^String storm-id operation]
(let [storm-cluster-state (:storm-cluster-state nimbus)
topology-conf (try-read-storm-conf conf storm-id)
storm-name (topology-conf TOPOLOGY-NAME)
_ (check-authorization! nimbus
storm-name
topology-conf
operation)
topology (try-read-storm-topology conf storm-id)
task->component (storm-task-info topology topology-conf)
base (.storm-base storm-cluster-state storm-id nil)
launch-time-secs (if base (:launch-time-secs base)
(throw
(NotAliveException. (str storm-id))))
assignment (.assignment-info storm-cluster-state storm-id nil)
beats (map-val :heartbeat (get @(:heartbeats-cache nimbus)
storm-id))
all-components (set (vals task->component))]
{:storm-name storm-name
:storm-cluster-state storm-cluster-state
:all-components all-components
:launch-time-secs launch-time-secs
:assignment assignment
:beats beats
:topology topology
:task->component task->component
:base base}))
get-last-error (fn [storm-cluster-state storm-id component-id]
(if-let [e (.last-error storm-cluster-state
storm-id
component-id)]
(doto (ErrorInfo. (:error e) (:time-secs e))
(.set_host (:host e))
(.set_port (:port e)))))]
(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)

;add to nimbuses
Expand Down Expand Up @@ -1453,16 +1484,14 @@
))

(^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
(let [storm-cluster-state (:storm-cluster-state nimbus)
topology-conf (try-read-storm-conf conf storm-id)
storm-name (topology-conf TOPOLOGY-NAME)
_ (check-authorization! nimbus storm-name topology-conf "getTopologyInfo")
task->component (storm-task-info (try-read-storm-topology conf storm-id) topology-conf)
base (.storm-base storm-cluster-state storm-id nil)
launch-time-secs (if base (:launch-time-secs base) (throw (NotAliveException. (str storm-id))))
assignment (.assignment-info storm-cluster-state storm-id nil)
beats (map-val :heartbeat (get @(:heartbeats-cache nimbus) storm-id))
all-components (-> task->component reverse-map keys)
(let [{:keys [storm-name
storm-cluster-state
all-components
launch-time-secs
assignment
beats
task->component
base]} (get-common-topo-info storm-id "getTopologyInfo")
num-err-choice (or (.get_num_err_choice options)
NumErrorsChoice/ALL)
errors-fn (condp = num-err-choice
Expand Down Expand Up @@ -1509,11 +1538,94 @@
topo-info
))

(^TopologyInfo getTopologyInfo [this ^String storm-id]
(^TopologyInfo getTopologyInfo [this ^String topology-id]
(.getTopologyInfoWithOpts this
storm-id
topology-id
(doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))

(^TopologyPageInfo getTopologyPageInfo
[this ^String topo-id ^String window ^boolean include-sys?]
(let [info (get-common-topo-info topo-id "getTopologyPageInfo")

exec->node+port (:executor->node+port (:assignment info))
last-err-fn (partial get-last-error
(:storm-cluster-state info)
topo-id)
topo-page-info (stats/agg-topo-execs-stats topo-id
exec->node+port
(:task->component info)
(:beats info)
(:topology info)
window
include-sys?
last-err-fn)]
(when-let [owner (:owner (:base info))]
(.set_owner topo-page-info owner))
(when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
(.set_sched_status topo-page-info sched-status))
(doto topo-page-info
(.set_name (:storm-name info))
(.set_status (extract-status-str (:base info)))
(.set_uptime_secs (time-delta (:launch-time-secs info)))
(.set_topology_conf (to-json (try-read-storm-conf conf
topo-id)))
(.set_replication_count
(.getReplicationCount (:code-distributor nimbus) topo-id)))
(when-let [debug-options
(get-in info [:base :component->debug topo-id])]
(.set_debug_options
topo-page-info
(converter/thriftify-debugoptions debug-options)))
topo-page-info))

(^ComponentPageInfo getComponentPageInfo
[this
^String topo-id
^String component-id
^String window
^boolean include-sys?]
(let [info (get-common-topo-info topo-id "getComponentPageInfo")
{:keys [executor->node+port node->host]} (:assignment info)
executor->host+port (map-val (fn [[node port]]
[(node->host node) port])
executor->node+port)
comp-page-info (stats/agg-comp-execs-stats executor->host+port
(:task->component info)
(:beats info)
window
include-sys?
topo-id
(:topology info)
component-id)]
(doto comp-page-info
(.set_topology_name (:storm-name info))
(.set_errors (get-errors (:storm-cluster-state info)
topo-id
component-id))
(.set_topology_status (extract-status-str (:base info))))
(when-let [debug-options
(get-in info [:base :component->debug component-id])]
(.set_debug_options
comp-page-info
(converter/thriftify-debugoptions debug-options)))
;; Add the event logger details.
(let [component->tasks (reverse-map (:task->component info))
eventlogger-tasks (sort (get component->tasks
EVENTLOGGER-COMPONENT-ID))
;; Find the task the events from this component route to.
task-index (mod (tuple/list-hash-code [component-id])
(count eventlogger-tasks))
task-id (nth eventlogger-tasks task-index)
eventlogger-exec (first (filter (fn [[start stop]]
(between? task-id start stop))
(keys executor->host+port)))
[host port] (get executor->host+port eventlogger-exec)]
(if (and host port)
(doto comp-page-info
(.set_eventlog_host host)
(.set_eventlog_port port))))
comp-page-info))

Shutdownable
(shutdown [this]
(log-message "Shutting down master")
Expand Down
12 changes: 11 additions & 1 deletion storm-core/src/clj/backtype/storm/log.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
;; limitations under the License.

(ns backtype.storm.log
(:require [clojure.tools.logging :as log]))
(:require [clojure.tools.logging :as log])
(:use [clojure pprint])
(:import [java.io StringWriter]))

(defmacro log-message
[& args]
Expand Down Expand Up @@ -44,3 +46,11 @@
(defn log-stream
[& args]
(apply log/log-stream args))

(defmacro log-pprint
[& args]
`(let [^StringWriter writer# (StringWriter.)]
(doall
(for [object# [~@args]]
(pprint object# writer#)))
(log-message "\n" writer#)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a "\n" before logging the message?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was to clean up the output. Otherwise the data structure would not begin on its own line but instead begin after the timestamp, etc., and so that was not "pretty."

Loading