From 92c742b970e031418e5e91688441ad3a3875ca7c Mon Sep 17 00:00:00 2001 From: Vadim Platonov Date: Tue, 13 Sep 2016 23:28:41 +0200 Subject: [PATCH] Add deferred/alt --- src/manifold/deferred.clj | 46 +++++++++++++++++++++++++++++++++ test/manifold/deferred_test.clj | 20 ++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/src/manifold/deferred.clj b/src/manifold/deferred.clj index 8455a8be..26924a23 100644 --- a/src/manifold/deferred.clj +++ b/src/manifold/deferred.clj @@ -1084,6 +1084,52 @@ (map #(or (->deferred % nil) %)) (apply zip'))) +;; same technique as clojure.core.async/random-array +(defn- random-array [n] + (let [a (int-array n)] + (clojure.core/loop [i 1] + (if (= i n) + a + (let [j (rand-int (inc i))] + (aset a i (aget a j)) + (aset a j i) + (recur (inc i))))))) + +(defn alt' + "Like `alt`, but only unwraps Manifold deferreds." + [& vals] + (let [d (deferred) + cnt (count vals) + ^ints idxs (random-array cnt)] + (clojure.core/loop [i 0] + (when (< i cnt) + (let [i' (aget idxs i) + x (nth vals i')] + (if (deferred? x) + (success-error-unrealized x + val (success! d val) + err (error! d err) + (do (on-realized (chain' x) + #(success! d %) + #(error! d %)) + (recur (inc i)))) + (success! d x))))) + d)) + +(defn alt + "Takes a list of values, some of which may be deferrable, and returns a + deferred that will yield the value which was realized first. + + @(alt 1 2) => 1 + @(alt (future (Thread/sleep 1) 1) + (future (Thread/sleep 1) 2)) => 1 or 2 depending on the thread scheduling + + Values appearing earlier in the input are preferred." + [& vals] + (->> vals + (map #(or (->deferred % nil) %)) + (apply alt'))) + (defn timeout! "Takes a deferred, and sets a timeout on it, such that it will be realized as `timeout-value` (or a TimeoutException if none is specified) if it is not realized in `interval` ms. Returns diff --git a/test/manifold/deferred_test.clj b/test/manifold/deferred_test.clj index 6d3f061a..1dcc5c7c 100644 --- a/test/manifold/deferred_test.clj +++ b/test/manifold/deferred_test.clj @@ -243,6 +243,26 @@ (d/error! d (Exception.)) (is (= ::delivered (deref target-d 0 ::not-delivered))))) +(deftest test-alt + (is (#{1 2 3} @(d/alt 1 2 3))) + (is (= 2 @(d/alt (d/future (Thread/sleep 10) 1) 2))) + + (is (= 2 @(d/alt (d/future (Thread/sleep 10) (throw (Exception. "boom"))) 2))) + + (is (thrown-with-msg? Exception #"boom" + @(d/alt (d/future (throw (Exception. "boom"))) (d/future (Thread/sleep 10))))) + + (testing "uniformly distributed" + (let [results (atom {}) + ;; within 10% + n 1e4, r 10, eps (* n 0.1) + f #(/ (% n eps) r)] + (dotimes [_ n] + @(d/chain (apply d/alt (range r)) + #(swap! results update % (fnil inc 0)))) + (doseq [[i times] @results] + (is (<= (f -) times (f +))))))) + ;;; (deftest ^:benchmark benchmark-chain