From 46f38a80dfec09feaf4a7ac6e67217304779b304 Mon Sep 17 00:00:00 2001 From: Zach Tellman Date: Wed, 11 Jun 2014 18:45:33 -0700 Subject: [PATCH] add let-flow and loop --- .gitignore | 1 + README.md | 67 +------- docs/deferred.md | 157 +++++++++++++++++++ project.clj | 3 +- src/manifold/deferred.clj | 270 ++++++++++++++++++++++---------- src/manifold/stream.clj | 43 +++-- test/manifold/deferred_test.clj | 9 +- test/manifold/stream_test.clj | 2 +- 8 files changed, 384 insertions(+), 168 deletions(-) create mode 100644 docs/deferred.md diff --git a/.gitignore b/.gitignore index dccb4031..696cb9ed 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ pom.xml.asc *DS_Store /doc push +/doc diff --git a/README.md b/README.md index e3987c1b..3cff4b2d 100644 --- a/README.md +++ b/README.md @@ -56,72 +56,7 @@ true true ``` -Callbacks are a useful building block, but they're a painful way to create asynchronous workflows. This is made easier by `manifold.deferred/chain`, which chains together callbacks, left to right: - -```clj -> (def d (d/deferred)) -#'d - -> (d/chain d inc inc inc #(println "x + 3 =" %)) -# - -> (d/success! d 0) -< x + 3 = 3 > -true -``` - -`chain` returns a deferred representing the return value of the right-most callback. If any of the functions returns a deferred or a value that can be coerced into a deferred, the chain will be paused until the deferred yields a value. - -Values that can be coerced into a deferred include Clojure futures, and core.async channels: - -```clj -> (def d (d/deferred)) -#'d - -> (d/chain d - #(future (inc %)) - #(println "the future returned" %)) -# - -> (d/success! d 0) -< the future returned 1 > -true - ``` - -If any stage in `chain` throws an exception or returns a deferred that yields an error, all subsequent stages are skipped, and the deferred returned by `chain` yields that same error. To handle these cases, you can use `manifold.deferred/catch`: - -```clj -> (def d (d/deferred)) -#p - -> (-> d - (d/chain dec #(/ 1 %)) - (d/catch Exception #(println "whoops, that didn't work:" %))) -# - -> (d/success! d 1) -< whoops, that didn't work: # > -true -``` - -Using the `->` threading operator, `chain` and `catch` can be easily and arbitrarily composed. - -To combine multiple deferrable values into a single deferred that yields all their results, we can use `manifold.deferred/zip`: - -```clj -> @(d/zip (future 1) (future 2) (future 3)) -(1 2 3) -``` - -Finally, we can use `manifold.deferred/timeout` to get a version of a deferred that will yield a special value if none is provided within the specified time: - -```clj -> @(d/timeout - (future (Thread/sleep 1000) :foo) - 100 - :bar) -:bar -``` +Callbacks are a useful building block, but they're a painful way to create asynchronous workflows. Manifold provides a number of operators for composing over deferred values, which can be read about [here](/docs/deferred.md). ### streams diff --git a/docs/deferred.md b/docs/deferred.md new file mode 100644 index 00000000..f4d538b2 --- /dev/null +++ b/docs/deferred.md @@ -0,0 +1,157 @@ +### deferreds + +A deferred in Manifold is similar to a Clojure promise: + +```clj +> (require '[manifold.deferred :as d]) +nil + +> (def d (d/deferred)) +#'d + +> (d/success! d :foo) +true + +> @d +:foo +``` + +However, similar to Clojure's futures, deferreds in Manifold can also represent errors. Crucially, they also allow for callbacks to be registered, rather than simply blocking on dereferencing. + +```clj +> (def d (d/deferred)) +#'d + +> (d/error! d (Exception. "boom")) +true + +> @d +Exception: boom +``` + +```clj +> (def d (d/deferred)) +#'d + +> (d/on-realized d + (fn [x] (println "success!" x)) + (fn [x] (println "error!" x))) +true + +> (d/success! d :foo) +< success! :foo > +true +``` + +### composing with deferreds + +Callbacks are a useful building block, but they're a painful way to create asynchronous workflows. This is made easier by `manifold.deferred/chain`, which chains together callbacks, left to right: + +```clj +> (def d (d/deferred)) +#'d + +> (d/chain d inc inc inc #(println "x + 3 =" %)) +# + +> (d/success! d 0) +< x + 3 = 3 > +true +``` + +`chain` returns a deferred representing the return value of the right-most callback. If any of the functions returns a deferred or a value that can be coerced into a deferred, the chain will be paused until the deferred yields a value. + +Values that can be coerced into a deferred include Clojure futures, and core.async channels: + +```clj +> (def d (d/deferred)) +#'d + +> (d/chain d + #(future (inc %)) + #(println "the future returned" %)) +# + +> (d/success! d 0) +< the future returned 1 > +true + ``` + +If any stage in `chain` throws an exception or returns a deferred that yields an error, all subsequent stages are skipped, and the deferred returned by `chain` yields that same error. To handle these cases, you can use `manifold.deferred/catch`: + +```clj +> (def d (d/deferred)) +#p + +> (-> d + (d/chain dec #(/ 1 %)) + (d/catch Exception #(println "whoops, that didn't work:" %))) +# + +> (d/success! d 1) +< whoops, that didn't work: # > +true +``` + +Using the `->` threading operator, `chain` and `catch` can be easily and arbitrarily composed. + +To combine multiple deferrable values into a single deferred that yields all their results, we can use `manifold.deferred/zip`: + +```clj +> @(d/zip (future 1) (future 2) (future 3)) +(1 2 3) +``` + +Finally, we can use `manifold.deferred/timeout` to get a version of a deferred that will yield a special value if none is provided within the specified time: + +```clj +> @(d/timeout + (future (Thread/sleep 1000) :foo) + 100 + :bar) +:bar +``` + +### let-flow + +Let's say that we have two services which provide us numbers, and want to get their sum. By using `zip` and `chain` together, this is relatively straightforward: + +```clj +(defn deferred-sum [] + (let [a (call-service-a) + b (call-service-b)] + (chain (zip a b) + (fn [[a b]] + (+ a b))))) +``` + +However, this isn't a very direct expression of what we're doing. For more complex relationships between deferred values, our code will become even more difficult to understand. In these cases, it's often best to use `let-flow`. + +```clj +(defn deferred-sum [a b] + (let-flow [a (call-service-a) + b (call-service-b)] + (+ a b))) +``` + +In `let-flow`, we can treat deferred values as if they're realized. This is only true of values declared within or closed over by `let-flow`, however. So we can do this: + +```clj +(let [a (future 1)] + (let-flow [b (future (+ a 1)) + c (+ b 1)] + (+ d 1))) +``` + +but not this: + +```clj +(let-flow [a (future 1) + b (let [c (future 1)] + (+ a c))] + (+ b 1)) +``` + +In this example, `c` is declared within a normal `let` binding, and as such we can't treat it as if it were realized. + +It can be helpful to think of `let-flow` as similar to Prismatic's [Graph](https://github.com/prismatic/plumbing#graph-the-functional-swiss-army-knife) library, except that the dependencies between values are inferred from the code, rather than explicitly specified. Used sparingly, this can be an extremely powerful way to choreograph concurrent operations. diff --git a/project.clj b/project.clj index 77ecb04a..981c0515 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,8 @@ :description "a compatibility layer for event-driven abstractions" :license {:name "MIT License" :url "http://opensource.org/licenses/MIT"} - :dependencies [[org.clojure/tools.logging "0.2.6"]] + :dependencies [[org.clojure/tools.logging "0.2.6"] + [riddley "0.1.7"]] :profiles {:dev {:dependencies [[codox-md "0.2.0" :exclusions [org.clojure/clojure]] [org.clojure/clojure "1.5.1"] [criterium "0.4.3"] diff --git a/src/manifold/deferred.clj b/src/manifold/deferred.clj index 61e319b9..abf1ae59 100644 --- a/src/manifold/deferred.clj +++ b/src/manifold/deferred.clj @@ -1,8 +1,9 @@ (ns manifold.deferred - (:refer-clojure :exclude [realized?]) + (:refer-clojure :exclude [realized? loop]) (:require [manifold.utils :as utils] - [manifold.time :as time]) + [manifold.time :as time] + [clojure.set :as set]) (:import [java.util LinkedList] @@ -45,7 +46,13 @@ [x] `(instance? IDeferred ~x)) -(def deferrable? (utils/fast-satisfies #'Deferrable)) +(let [f (utils/fast-satisfies #'Deferrable)] + (defn deferrable? [x] + (or + (instance? IDeferred x) + (instance? Future x) + (instance? IPending x) + (f x)))) ;; TODO: do some sort of periodic sampling so multiple futures can share a thread (defn- register-future-callbacks [x on-success on-error] @@ -93,60 +100,23 @@ (onRealized [_ on-success on-error] (register-future-callbacks x on-success on-error)))) - IDeref - (if (instance? IPending x) - - (reify - IDeref - (deref [_] - (.deref ^IDeref x)) - IBlockingDeref - (deref [_ time timeout-value] - (.deref ^IBlockingDeref x time timeout-value)) - IPending - (isRealized [_] - (.isRealized ^IPending x)) - IDeferred - (realized [_] - (.isRealized ^IPending x)) - (onRealized [_ on-success on-error] - (register-future-callbacks x on-success on-error) - nil)) - - ;; we don't know when we're pending, but make sure we're not - ;; pending once it's dereferenced - (let [pending? (AtomicBoolean. true)] - (reify - IDeref - (deref [_] - (try - (.deref ^IDeref x) - (finally - (.set pending? false)))) - IBlockingDeref - (deref [_ time timeout-value] - (try - (let [v (.deref ^IBlockingDeref x time timeout-value)] - (when-not (identical? timeout-value v) - (.set pending? false)) - v) - (catch Throwable e - (.set pending? false) - (throw e)))) - IPending - (isRealized [_] - (not (.get pending?))) - IDeferred - (realized [_] - (not (.get pending?))) - (onRealized [f on-success on-error] - (utils/wait-for - (try - (on-success (.deref ^IDeref x)) - (catch Throwable e - (on-error e)) - (finally - (.set pending? false)))))))) + IPending + (reify + IDeref + (deref [_] + (.deref ^IDeref x)) + IBlockingDeref + (deref [_ time timeout-value] + (.deref ^IBlockingDeref x time timeout-value)) + IPending + (isRealized [_] + (.isRealized ^IPending x)) + IDeferred + (realized [_] + (.isRealized ^IPending x)) + (onRealized [_ on-success on-error] + (register-future-callbacks x on-success on-error) + nil)) (when (deferrable? x) (to-deferred x)))) @@ -219,7 +189,7 @@ (set! ~'state ~(if success? ::success ::error)) true) (try - (loop [] + (clojure.core/loop [] (if (.isEmpty ~'listeners) nil (do @@ -437,24 +407,25 @@ [error] (ErrorDeferred. error nil false)) +(declare chain) + (defn- unwrap [x] - (if (deferrable? x) - (let [d (->deferred x)] - (if (realized? d) - (let [d' (try - @d - (catch Throwable _ - ::error))] - (cond - (identical? ::error d') - d - - (deferrable? d') - (recur d') - - :else - d')) - d)) + (if-let [d (->deferred x)] + (if (realized? d) + (let [d' (try + @d + (catch Throwable _ + ::error))] + (cond + (identical? ::error d') + d + + (deferrable? d') + (recur d') + + :else + d')) + d) x)) (defn connect @@ -462,15 +433,18 @@ [a b] (assert (instance? IDeferred b) "sink `b` must be a Manifold deferred") (let [a (unwrap a)] - (if (not (instance? IDeferred a)) - (success! b a) + (if (instance? IDeferred a) (if (realized? b) false (do (on-realized a - #(success! b %) + #(let [a' (unwrap %)] + (if (instance? IDeferred a') + (connect a' b) + (success! b a'))) #(error! b %)) - true))))) + true)) + (success! b a)))) (defmacro defer "Equivalent to Clojure's `future`, but returns a Manifold deferred." @@ -578,20 +552,20 @@ ^objects ary (object-array cnt) counter (AtomicInteger. cnt) d (deferred)] - (loop [idx 0, s deferred-or-values] + (clojure.core/loop [idx 0, s deferred-or-values] (if (empty? s) ;; no further results, decrement the counter one last time ;; and return the result if everything else has been realized (if (zero? (.get counter)) - (success-deferred (seq ary)) + (success-deferred (or (seq ary) (list))) d) (let [x (first s)] (if-let [x' (->deferred x)] - (on-realized x' + (on-realized (chain x') (fn [val] (aset ary idx val) (when (zero? (.decrementAndGet counter)) @@ -624,6 +598,64 @@ (time/in interval #(success! d' timeout-value)) d'))) +(deftype Recur [s] + clojure.lang.IDeref + (deref [_] s)) + +(defn recur [& args] + (Recur. args)) + +(defmacro loop + "A version of Clojure's loop which allows for asynchronous loops, via `manifold.deferred/recur`. + `loop` will always return a deferred value, even if the body is synchronous. + + (loop [i 1e6] + (chain (future i) + #(if (zero? %) + % + (recur (dec %)))))" + [bindings & body] + (let [vars (->> bindings (partition 2) (map first)) + vals (->> bindings (partition 2) (map second)) + x-sym (gensym "x")] + `(let [result# (deferred)] + ((fn this# [result# ~@vars] + (clojure.core/loop + [~@(interleave vars vars)] + (let [~x-sym (try + ~@body + (catch Throwable e# + (error! result# e#) + nil)) + d# (->deferred ~x-sym)] + (if (nil? d#) + (if (instance? Recur ~x-sym) + (~'recur + ~@(map + (fn [n] `(nth @~x-sym ~n)) + (range (count vars)))) + (success! result# ~x-sym)) + (if (realized? d#) + (let [~x-sym @d#] + (if (instance? Recur ~x-sym) + (~'recur + ~@(map + (fn [n] `(nth @~x-sym ~n)) + (range (count vars)))) + (success! result# ~x-sym))) + (on-realized (chain d#) + (fn [x#] + (if (instance? Recur x#) + (apply this# result# @x#) + (success! result# x#))) + (fn [err#] + (error! result# err#)))))))) + result# + ~@vals) + result#))) + +;;; + (utils/when-core-async (extend-protocol Deferrable @@ -637,3 +669,77 @@ (success! d x)) (success! d nil))) d)))) + +;;; + +(defn- back-references [form] + (let [syms (atom #{})] + ((resolve 'riddley.walk/walk-exprs) + symbol? + (fn [s] + (when (some-> ((resolve 'riddley.compiler/locals)) (find s) key meta ::flow-var) + (swap! syms conj s))) + form) + @syms)) + +(defmacro let-flow + "A version of `let` where deferred values that are let-bound or closed over can be treated + as if they are realized values. The body will only be executed once all of the let-bound + values, even ones only used for side effects, have been computed. + + Returns a deferred value, representing the value returned by the body. + + (let-flow [x (future 1)] + (+ x 1)) + + (let-flow [x (future 1) + y (future (+ x 1))] + (+ y 1)) + + (let [x (future 1)] + (let-flow [y (future (+ x 1))] + (+ y 1)))" + [bindings & body] + (require 'riddley.walk) + (let [locals (keys ((resolve 'riddley.compiler/locals))) + vars (->> bindings (partition 2) (map first)) + vars' (->> vars (concat locals) (map #(vary-meta % assoc ::flow-var true))) + gensyms (repeatedly (count vars') gensym) + vals' (->> bindings (partition 2) (map second) (concat locals)) + gensym->deps (zipmap + gensyms + (->> (count vars') + range + (map + (fn [n] + `(let [~@(interleave (take n vars') (repeat nil)) + ~(nth vars' n) ~(nth vals' n)]))) + (map back-references))) + var->gensym (zipmap vars' gensyms)] + `(let [~@(interleave + gensyms + (map + (fn [n var val gensym] + (let [var->gensym (zipmap (take n vars') gensyms) + deps (gensym->deps gensym)] + (if (empty? deps) + val + `(chain (zip ~@(map var->gensym deps)) + (fn [[~@deps]] + ~val))))) + (range) + vars' + vals' + gensyms))] + ~(let [dep? (set/union + (back-references `(let [~@(interleave + vars' + (repeat nil))] + ~@body)) + (apply set/union (vals gensym->deps))) + vars' (filter dep? vars') + gensyms' (map var->gensym vars')] + `(chain (zip ~@gensyms') + (fn [[~@gensyms']] + (let [~@(interleave vars' gensyms')] + ~@body))))))) diff --git a/src/manifold/stream.clj b/src/manifold/stream.clj index c9dc747c..36ad62e0 100644 --- a/src/manifold/stream.clj +++ b/src/manifold/stream.clj @@ -47,9 +47,17 @@ (onDrained [callback]) (connector [sink])) -(def sinkable? (utils/fast-satisfies #'Sinkable)) - -(def sourceable? (utils/fast-satisfies #'Sourceable)) +(let [f (utils/fast-satisfies #'Sinkable)] + (defn sinkable? [x] + (or + (instance? IEventSink x) + (f x)))) + +(let [f (utils/fast-satisfies #'Sourceable)] + (defn sourceable? [x] + (or + (instance? IEventSource x) + (f x)))) (defn ->sink "Converts, is possible, the object to a Manifold stream, or `nil` if not possible." @@ -206,8 +214,8 @@ description] :or {upstream? false downstream? true}}]]} - ([src dst] - (connect src dst nil)) + ([source sink] + (connect source sink nil)) ([^IEventSource source ^IEventSink sink options] @@ -456,18 +464,19 @@ (doseq [[a b] (core/map list srcs intermediates)] (connect-via a #(put! b %) b)) - ((fn this [] - (d/chain - (->> intermediates - (core/map #(take! % ::drained)) - (apply d/zip)) - (fn [msgs] - (if (some-drained? msgs) - (do (close! dst) false) - (put! dst msgs))) - (fn [result] - (when result - (utils/without-overflow (this))))))) + (d/loop [] + (d/chain + (->> intermediates + (core/map #(take! % ::drained)) + (apply d/zip)) + (fn [msgs] + (if (some-drained? msgs) + (do (close! dst) false) + (put! dst msgs))) + (fn [result] + (when result + (d/recur))))) + dst)))) (let [response (d/success-deferred true)] diff --git a/test/manifold/deferred_test.clj b/test/manifold/deferred_test.clj index d16f279e..dcb91d73 100644 --- a/test/manifold/deferred_test.clj +++ b/test/manifold/deferred_test.clj @@ -4,7 +4,7 @@ (:require [clojure.test :refer :all] [manifold.test-utils :refer :all] - [manifold.deferred :refer :all])) + [manifold.deferred :refer :all :exclude [loop]])) (defmacro defer' [& body] `(defer @@ -34,6 +34,13 @@ #(do (deliver p %) expected-return-value)) p))) +(deftest test-let-flow + (is (= 5 + @(let [z (future 1)] + (let-flow [x (future (future z)) + y (future (+ z x))] + (future (+ x x y z))))))) + (deftest test-deferred ;; success! (let [d (deferred)] diff --git a/test/manifold/stream_test.clj b/test/manifold/stream_test.clj index ae388c69..dd49d892 100644 --- a/test/manifold/stream_test.clj +++ b/test/manifold/stream_test.clj @@ -26,7 +26,7 @@ (future (doseq [x vs] (s/put! sink x))) - (is (= vs (s/stream->lazy-seq source 1))))) + (is (= vs (s/stream->lazy-seq source 10))))) (defn splice-into-stream [gen] #(let [x (gen)