Skip to content

Commit ae86dc7

Browse files
committed
Add Clojure RPC tutorial code.
1 parent 72593f3 commit ae86dc7

File tree

3 files changed

+82
-1
lines changed

3 files changed

+82
-1
lines changed

clojure/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Code examples are executed via `lein run`:
4040

4141
[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-java.html)
4242

43-
TBD
43+
lein run -m rabbitmq.tutorials.rpc-server
44+
lein run -m rabbitmq.tutorials.rpc-client
4445

4546
To learn more, visit [Langohr documentation](http://clojurerabbitmq.info) site.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
;; note: this is example code and shouldn't be run in production as-is
2+
3+
(ns rabbitmq.tutorials.rpc-client
4+
(:require [langohr.core :as lc]
5+
[langohr.channel :as lch]
6+
[langohr.queue :as lq]
7+
[langohr.basic :as lb]
8+
[langohr.consumers :as lcons]))
9+
10+
(def ^{:const true} q "rpc_queue")
11+
12+
(defn correlation-id-equals?
13+
[correlation-id d]
14+
(= (.getCorrelationId (.getProperties d)) correlation-id))
15+
16+
(defrecord FibonacciClient [conn ch cbq consumer]
17+
clojure.lang.IFn
18+
(invoke [this n]
19+
(let [correlation-id (str (java.util.UUID/randomUUID))]
20+
(lb/publish ch "" q (str n) {:reply-to cbq
21+
:correlation-id correlation-id})
22+
(lb/consume ch cbq consumer)
23+
(-> (first (filter (partial correlation-id-equals? correlation-id)
24+
(lcons/deliveries-seq consumer)))
25+
.getBody
26+
(String. "UTF-8")
27+
(read-string))))
28+
java.io.Closeable
29+
(close [this]
30+
(.close conn)))
31+
32+
(defn make-fibonacci-rpc-client
33+
[]
34+
(let [conn (lc/connect)
35+
ch (lch/open conn)
36+
cbq (lq/declare ch "" {:auto-delete false :exclusive true})
37+
consumer (lcons/create-queueing ch {})]
38+
(->FibonacciClient conn ch (:queue cbq) consumer)))
39+
40+
(defn -main
41+
[& args]
42+
(with-open [fibonacci-rpc (make-fibonacci-rpc-client)]
43+
(println " [x] Requesting fib(30)")
44+
(let [response (fibonacci-rpc 30)]
45+
(println (format " [.] Got %s" response)))))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
(ns rabbitmq.tutorials.rpc-server
2+
(:require [langohr.core :as lc]
3+
[langohr.channel :as lch]
4+
[langohr.queue :as lq]
5+
[langohr.basic :as lb]
6+
[langohr.consumers :as lcons]))
7+
8+
(def ^{:const true} q "rpc_queue")
9+
10+
(defn fib
11+
[n]
12+
(if (zero? n)
13+
0
14+
(if (= n 1)
15+
1
16+
(+ (fib (- n 1))
17+
(fib (- n 2))))))
18+
19+
(defn handle-delivery
20+
"Handles message delivery"
21+
[ch {:keys [delivery-tag reply-to correlation-id]} payload]
22+
(let [n (read-string (String. payload "UTF-8"))]
23+
(println (format " [.] fib(%s)" n))
24+
(let [response (fib n)]
25+
(lb/publish ch "" reply-to (str response) {:correlation-id correlation-id})
26+
(lb/ack ch delivery-tag))))
27+
28+
(defn -main
29+
[& args]
30+
(with-open [conn (lc/connect)]
31+
(let [ch (lch/open conn)]
32+
(lq/declare ch q {:auto-delete false})
33+
(lb/qos ch 1)
34+
(println " [x] Awaiting RPC requests")
35+
(lcons/blocking-subscribe ch "rpc_queue" handle-delivery))))

0 commit comments

Comments
 (0)