Skip to content

Commit

Permalink
Merge pull request #170 from daveray/hystrix-clj-observe
Browse files Browse the repository at this point in the history
Add rx support to hystrix-clj.
  • Loading branch information
benjchristensen committed Aug 12, 2013
2 parents 4186b32 + eb9843f commit 8118301
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@
[com.netflix.hystrix.strategy.concurrency
HystrixRequestContext]))

(set! *warn-on-reflection* true)

(defmacro ^:private key-fn
"Make a function that creates keys of the given class given one of:
Expand Down Expand Up @@ -603,6 +605,64 @@
(let [^HystrixExecutable instance (apply instantiate definition args)]
(queued-command instance (.queue instance))))

(defn observe
"Asynchronously execute the command or collapser specified by the given normalized definition
with the given arguments. Returns an rx.Observable which can be subscribed to.
Note that this will eagerly begin execution of the command, even if there are no subscribers.
Use observe-later for lazy semantics.
If definition is already a HystrixExecutable and no args are given, observes it and returns
an Observable as described above. NEVER OBSERVE A HystrixExecutable MORE THAN ONCE.
See:
http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#observe()
http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCollapser.html#observe()
http://netflix.github.io/RxJava/javadoc/rx/Observable.html
"
[definition & args]
(let [^HystrixExecutable instance (apply instantiate definition args)]
(.observe instance)))

(defprotocol ^:private ObserveLater
"A protocol solely to eliminate reflection warnings because .toObservable
can be found on both HystrixCommand and HystrixCollapser, but not in their
common base class HystrixExecutable."
(^:private observe-later* [this])
(^:private observe-later-on* [this scheduler]))

(extend-protocol ObserveLater
HystrixCommand
(observe-later* [this] (.toObservable this))
(observe-later-on* [this scheduler] (.toObservable this scheduler))
HystrixCollapser
(observe-later* [this] (.toObservable this))
(observe-later-on* [this scheduler] (.toObservable this scheduler)))

(defn observe-later
"Same as #'com.netflix.hystrix.core/observe, but command execution does not begin until the
returned Observable is subscribed to.
See:
http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#toObservable())
http://netflix.github.io/RxJava/javadoc/rx/Observable.html
"
[definition & args]
(observe-later* (apply instantiate definition args)))

(defn observe-later-on
"Same as #'com.netflix.hystrix.core/observe-later but an explicit scheduler can be provided
for the callback.
See:
com.netflix.hystrix.core/observe-later
com.netflix.hystrix.core/observe
http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#toObservable(Scheduler)
http://netflix.github.io/RxJava/javadoc/rx/Observable.html
"
[definition scheduler & args]
(observe-later-on* (apply instantiate definition args) scheduler))

;################################################################################
; :command impl

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,55 @@
(is (= "hello-world" (.get qc) @qc))
(is (.isDone qc))))))

(defn ^:private wait-for-observable
[^rx.Observable o]
(rx.observables.BlockingObservable/single o))

(deftest test-observe
(let [base-def {:type :command
:group-key :my-group
:command-key :my-command
:run-fn + }]
(testing "observes a HystrixCommand"
(is (= 99
(-> (instantiate (normalize base-def) 11 88)
observe
wait-for-observable))))
(testing "throws if there are trailing args"
(is (thrown? IllegalArgumentException
(observe (instantiate (normalize base-def)) 10 23))))
(testing "instantiates and observes a command"
(let [o (observe (normalize base-def) 75 19 23)]
(is (instance? rx.Observable o))
(is (= (+ 75 19 23)
(wait-for-observable o)))))))

(deftest test-observe-later
(let [base-def {:type :command
:group-key :my-group
:command-key :my-command
:run-fn + }]
(testing "observes a HystrixCommand"
(is (= 99
(-> (instantiate (normalize base-def) 11 88)
observe-later
wait-for-observable))))
(testing "throws if there are trailing args"
(is (thrown? IllegalArgumentException
(observe-later (instantiate (normalize base-def)) 10 23))))
(testing "instantiates and observes a command"
(let [o (observe-later (normalize base-def) 75 19 23)]
(is (instance? rx.Observable o))
(is (= (+ 75 19 23)
(wait-for-observable o)))))
(testing "observes command with a Scheduler"
(let [o (observe-later-on (normalize base-def)
(rx.concurrency.Schedulers/newThread)
75 19 23)]
(is (instance? rx.Observable o))
(is (= (+ 75 19 23)
(wait-for-observable o)))))))

(deftest test-this-command-binding
(let [base-def {:type :command
:group-key :test-this-command-binding-group
Expand Down Expand Up @@ -264,8 +313,12 @@
(testing "defines a functioning command"
(is (= 99 (my-fn-command 88 11)))
(is (= 100 (execute #'my-fn-command 89 11)))
(is (= 101 (deref (queue #'my-fn-command 89 12)))))))

(is (= 101 (deref (queue #'my-fn-command 89 12))))
(is (= 103 (wait-for-observable (observe #'my-fn-command 90 13))))
(is (= 105 (wait-for-observable (observe-later #'my-fn-command 91 14))))
(is (= 107 (wait-for-observable (observe-later-on #'my-fn-command
(rx.concurrency.Schedulers/newThread)
92 15)))))))

(defcollapser my-collapser
"a doc string"
Expand Down

0 comments on commit 8118301

Please sign in to comment.