From 4b988621224760130405b7a7c783fdc803c75953 Mon Sep 17 00:00:00 2001 From: Zach Tellman Date: Mon, 30 Mar 2015 17:59:11 -0700 Subject: [PATCH] make error handling in streams more consistent, per @mithrandi --- src/manifold/stream.clj | 58 ++++++++++++++++----------------- src/manifold/stream/default.clj | 17 ++++++---- src/manifold/stream/graph.clj | 6 +--- test/manifold/stream_test.clj | 31 ++++++++++++++++++ 4 files changed, 71 insertions(+), 41 deletions(-) diff --git a/src/manifold/stream.clj b/src/manifold/stream.clj index 0229d6b8..9897f940 100644 --- a/src/manifold/stream.clj +++ b/src/manifold/stream.clj @@ -425,24 +425,18 @@ (downstream [_] (when downstream [downstream])) IEventSink - (put [_ x _] + (put [this x _] (try (let [rsp (f x)] (if (nil? constant-response) rsp constant-response)) (catch Throwable e - (log/error e "error in consume") - (d/error-deferred e)))) - (put [_ x _ _ _] - (try - (let [rsp (f x)] - (if (nil? constant-response) - rsp - constant-response)) - (catch Throwable e - (log/error e "error in consume") - (d/error-deferred e)))) + (log/error e "error in stream handler") + (.close this) + (d/success-deferred false)))) + (put [this x default-val _ _] + (.put this x default-val)) (isClosed [_] (if downstream (.isClosed downstream) @@ -459,7 +453,8 @@ (defn connect-via "Feeds all messages from `src` into `callback`, with the understanding that they will - eventually be propagated into `dst` in some form." + eventually be propagated into `dst` in some form. The return value of `f` should be + a deferred yielding either `true` or `false`." ([src callback dst] (connect-via src callback dst nil)) ([src callback dst options] @@ -467,7 +462,7 @@ (connect src (Callback. callback dst nil) - (merge options {:dst' dst}))))) + options)))) (defn- connect-via-proxy ([src proxy dst] @@ -560,8 +555,12 @@ (let [s' (stream)] (connect-via s (fn [msg] - (d/chain msg - #(put! s' %))) + (-> msg + (d/chain #(put! s' %)) + (d/catch (fn [e] + (log/error e "deferred realized as error, closing stream") + (close! s') + false)))) s' {:description {:op "realize-each"}}) (source-only s'))) @@ -595,19 +594,18 @@ (source-only dst))))) -(let [response (d/success-deferred true)] - (defn filter - "Equivalent to Clojure's `filter`, but for streams instead of sequences." - [pred s] - (let [s' (stream)] - (connect-via s - (fn [msg] - (if (pred msg) - (put! s' msg) - response)) - s' - {:description {:op "filter"}}) - (source-only s')))) +(defn filter + "Equivalent to Clojure's `filter`, but for streams instead of sequences." + [pred s] + (let [s' (stream)] + (connect-via s + (fn [msg] + (if (pred msg) + (put! s' msg) + (d/success-deferred true))) + s' + {:description {:op "filter"}}) + (source-only s'))) (defn reductions "Equivalent to Clojure's `reductions`, but for streams instead of sequences." @@ -803,7 +801,7 @@ @val val))) (put [_ x blocking? timeout timeout-val] - ;; TODO: this doesn't really time out, because that would + ;; TODO: this doesn't realy time out, because that would ;; require consume-side filtering of messages (buf+ (metric x)) (.put ^IEventSink buf x blocking? timeout timeout-val) diff --git a/src/manifold/stream/default.clj b/src/manifold/stream/default.clj index 589e86d6..815a041d 100644 --- a/src/manifold/stream/default.clj +++ b/src/manifold/stream/default.clj @@ -57,7 +57,12 @@ (when-not permanent? (utils/with-lock lock (when-not (s/closed? this) - (add!) + + (try + (add!) + (catch Throwable e + (log/error e "error in stream transformer"))) + (loop [] (when-let [^Consumer c (.poll consumers)] (try @@ -65,7 +70,9 @@ (catch Throwable e (log/error e "error in callback"))) (recur))) + (.markClosed this) + (when (s/drained? this) (.markDrained this)))))) @@ -85,7 +92,8 @@ (add! this msg)) (catch Throwable e (.close this) - (d/error-deferred e)))) + (log/error e "error in stream transformer") + (d/success-deferred false)))) close? (reduced? result) @@ -109,10 +117,7 @@ (instance? Production result) (let [^Production p result] - (try - (d/success! (.deferred p) (.message p) (.token p)) - (catch Throwable e - (log/error e "error in callback"))) + (d/success! (.deferred p) (.message p) (.token p)) (if blocking? true (d/success-deferred true executor))) diff --git a/src/manifold/stream/graph.clj b/src/manifold/stream/graph.clj index 3b47fd44..a64f4454 100644 --- a/src/manifold/stream/graph.clj +++ b/src/manifold/stream/graph.clj @@ -25,7 +25,6 @@ ^boolean upstream? ^boolean downstream? ^IEventSink sink - ^IEventSink sink' ^String description]) (deftype AsyncPut @@ -41,7 +40,7 @@ .iterator iterator-seq (map (fn [^Downstream d] - [(.description d) (or (.sink' d) (.sink d))])))))) + [(.description d) (.sink d)])))))) (defn- async-send [^Downstream d msg dsts] @@ -87,7 +86,6 @@ (defn- handle-async-error [^AsyncPut x err source] (some-> ^Downstream (.dst x) .sink s/close!) - (some-> ^Downstream (.dst x) .sink' s/close!) (log/error err "error in message propagation") (let [^CopyOnWriteArrayList l (.dsts x)] (.remove l (.dst x)) @@ -274,7 +272,6 @@ ^IEventSink dst {:keys [upstream? downstream? - dst' timeout description] :or {timeout -1 @@ -287,7 +284,6 @@ (boolean (and upstream? (instance? IEventSink src))) downstream? dst - dst' description) k (.weakHandle ^IEventStream src ref-queue)] (if-let [dsts (.get graph k)] diff --git a/test/manifold/stream_test.clj b/test/manifold/stream_test.clj index b8d79b03..39befeec 100644 --- a/test/manifold/stream_test.clj +++ b/test/manifold/stream_test.clj @@ -1,5 +1,6 @@ (ns manifold.stream-test (:require + [clojure.tools.logging :as log] [clojure.core.async :as async] [clojure.test :refer :all] [manifold.test-utils :refer :all] @@ -236,6 +237,36 @@ s/stream->seq))) #_(is (= 1 @cnt)))) +(deftest test-error-handling + + (binding [log/*logger-factory* clojure.tools.logging.impl/disabled-logger-factory] + + (let [s (s/stream) + s' (s/map #(/ 1 %) s)] + (is (not (s/closed? s))) + (is (not (s/drained? s'))) + (is (= false @(s/put-all! s [0 1]))) + (is (s/closed? s)) + (is (s/drained? s'))) + + (let [s (s/stream) + s' (s/map #(d/future (/ 1 %)) s)] + (is (not (s/closed? s))) + (is (not (s/drained? s'))) + (is (= true @(s/put! s 0))) + (is (not (s/closed? s))) + (is (not (s/drained? s')))) + + (let [s (s/stream) + s' (->> s + (s/map #(d/future (/ 1 %))) + s/realize-each)] + (is (not (s/closed? s))) + (is (not (s/drained? s'))) + (s/put-all! s (range 10)) + (is (nil? @(s/take! s'))) + (is (s/drained? s'))))) + ;;; (defn blocking-queue-benchmark [^BlockingQueue q]