Skip to content

Commit

Permalink
Robustness fixes
Browse files Browse the repository at this point in the history
+ Input channels can never fill up
+ More extensive try catch
+ Personalize error reporting from within probe
  • Loading branch information
eslick committed Jan 5, 2015
1 parent 3d8cbf3 commit 69400de
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions src/probe/core.clj
Expand Up @@ -7,7 +7,7 @@
[clojure.string :as str]
[clojure.tools.logging :as clog]
[clojure.core.memoize :as memo]
[clojure.core.async :refer [chan >! <! <!! >!! go go-loop] :as async]
[clojure.core.async :refer [chan sliding-buffer >! <! <!! >!! go go-loop] :as async]
[probe.wrap :as wrap]
[probe.sink :as sink]))

Expand All @@ -19,15 +19,16 @@
;; ## Exception handling
;;

(defonce error-channel (chan))
(defonce error-channel (chan (sliding-buffer 10)))

(defonce error-router
(go-loop []
(let [{:keys [state exception]} (<! error-channel)]
(clog/error "Probe error detected" state exception))))



(let [{:keys [msg state exception]} (<! error-channel)]
(try
(clog/error (or msg "Probe error detected") state exception)
(catch java.lang.Throwable t
(clog/error "Probe error detected" exception "unable to report state content" t)))
(recur))))

;;
;; ## Sinks
Expand All @@ -41,7 +42,7 @@
(try
(f state)
(catch java.lang.Throwable e
(>! error-channel {:state state :exception e})))
(>! error-channel {:msg "Sink processing exception" :state state :exception e})))
(recur))))

(defn sink-names []
Expand All @@ -62,6 +63,7 @@
(map (fn [sub] (unsubscribe (:name sub) (:sink sub)))
(sink-subscriptions name)))
(async/close! in)
(async/close! out)
(assert (nil? (<!! out)))
(swap! sinks dissoc name))))
([name]
Expand Down Expand Up @@ -255,7 +257,7 @@
;; ## Router
;;

(defonce input (chan))
(defonce input (chan (sliding-buffer 100)))

(defn write-state
"External API to submit probe state to the fabric"
Expand All @@ -272,19 +274,25 @@
#((:policy-fn (get-sink (first %))) state (second %))
(group-by :sink subs))))

(defonce router-handler
(def router-handler
(go-loop []
; (clog/trace "Waiting for probe state")
(let [state (<! input)]
(clog/trace "Routing probe state: " state)
(when-let [tags (and (map? state) (:tags state))]
(when (coll? tags)
(doseq [sub (apply-sink-policy tags state)]
(try
(clog/trace "Writing channel for: " [(:name (:sub sub))
(:sink (:sub sub))])
(>! (:channel (:sub sub)) (:new-state sub))
(catch java.lang.Throwable t
(>! error-channel {:state ~state :exception t}))))))
; (clog/trace "Routing probe state: " state)
(try
(when-let [tags (and (map? state) (:tags state))]
; (clog/trace "Routing probe state: " tags)
(when (coll? tags)
(doseq [sub (apply-sink-policy tags state)]
(do ; (clog/trace "Writing channel for: " [(:name (:sub sub))
; (:sink (:sub sub))])
(try (>! (:channel (:sub sub)) (:new-state sub))
(catch java.lang.Throwable t
; (clog/trace "Caught an error")
(>! error-channel {:state (str state) :exception t})))))))
(catch java.lang.Throwable t
(>! error-channel {:msg "Bad Probe data received, caused exception" :exception t))))
; (clog/trace "Waiting to recur")
(recur))))

;; ==================================
Expand Down

0 comments on commit 69400de

Please sign in to comment.