Skip to content

Commit

Permalink
auto serialize/deserialize
Browse files Browse the repository at this point in the history
  • Loading branch information
hiredman committed Oct 4, 2010
1 parent 29197b5 commit 0716e81
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 46 deletions.
51 changes: 26 additions & 25 deletions src/conduit/hornetq.clj
@@ -1,6 +1,5 @@
;;TODO: procs can yield multiple values, how to deal with that with
;;reply queues?
;;TODO: is there some better way to "run procs" on a queue?
;;TODO: auto serialize
(ns conduit.hornetq
(:use [conduit.core])
Expand Down Expand Up @@ -60,18 +59,30 @@
(doto (.createMessage session true)
(-> .getBodyBuffer (.writeBytes bytes))))

(defn serialize [object]
(with-open [baos (ByteArrayOutputStream.)
oos (ObjectOutputStream. baos)]
(.writeObject oos object)
(.toByteArray baos)))

(defn deserialize [bytes]
(with-open [bais (ByteArrayInputStream. bytes)
ois (ObjectInputStream. bais)]
(.readObject ois)))


(defn- hornetq-pub-no-reply [queue id]
(fn hornetq-no-reply [bytes]
(fn hornetq-no-reply [value]
(let [producer (producer *session* queue)
msg (bytes->msg *session* bytes)]
msg (bytes->msg *session* (serialize value))]
(.putStringProperty msg "id" id)
(.send producer msg)
[[] hornetq-no-reply])))

(defn- hornetq-sg-fn [queue id]
(fn hornetq-reply [bytes]
(fn hornetq-reply [value]
(let [producer (producer *session* queue)
msg (bytes->msg *session* bytes)
msg (bytes->msg *session* (serialize value))
reply-queue (str queue ".reply." (UUID/randomUUID))
consumer (consumer *session* queue)]
(create-tmp-queue *session* reply-queue)
Expand All @@ -81,44 +92,34 @@
(fn []
(let [msg (.receive consumer)]
(ack *session* msg)
[msg hornetq-reply])))))
[[(deserialize (msg->bytes msg))] hornetq-reply])))))

(defn- reply-fn [f]
(partial (fn hornet-reply-fn [f [bytes reply-queue]]
(partial (fn hornet-reply-fn [f [value reply-queue]]
(let [producer (.createProducer *session* reply-queue)
[[new-bytes] new-f] (f bytes)
new-msg (bytes->msg *session* new-bytes)]
[[new-value] new-f] (f value)
new-msg (bytes->msg *session* (serialize new-value))]
(.send producer new-msg)
(.commit *session*)
[[] (partial hornet-reply-fn new-f)]))
f))

(defn- hornetq-pub-reply [queue id]
(fn hornetq-reply [bytes]
(fn hornetq-reply [value]
(create-queue *session* queue)
(let [producer (producer *session* queue)
msg (bytes->msg *session* bytes)
msg (bytes->msg *session* (serialize value))
reply-queue (str queue ".reply." (UUID/randomUUID))]
(create-tmp-queue *session* reply-queue)
(.putStringProperty msg "id" id)
(.putStringProperty msg "replyTo" reply-queue)
(.send producer msg)
(.commit *session*)
(let [msg (.receive (consumer *session* reply-queue))
bytes (msg->bytes msg)]
value (deserialize (msg->bytes msg))]
(ack *session* msg)
[[bytes] hornetq-reply]))))
[[value] hornetq-reply]))))

(def-arr serialize [object]
(with-open [baos (ByteArrayOutputStream.)
oos (ObjectOutputStream. baos)]
(.writeObject oos object)
(.toByteArray baos)))

(def-arr deserialize [bytes]
(with-open [bais (ByteArrayInputStream. bytes)
ois (ObjectInputStream. bais)]
(.readObject ois)))

(defn a-hornetq
"turn a proc into a hornetq proc that listens on a queue"
Expand Down Expand Up @@ -149,8 +150,8 @@
[[(receive session queue)] next-msg-inner]))
(handle-msg [fun [id [msg-object reply-queue]]]
(try
(let [msg-bytes (msg->bytes msg-object)
[_ new-fun] (fun [id [msg-bytes reply-queue]])]
(let [msg-value (deserialize (msg->bytes msg-object))
[_ new-fun] (fun [id [msg-value reply-queue]])]
(ack session msg-object)
[[] (partial handle-msg new-fun)])
(catch Exception e
Expand Down
27 changes: 6 additions & 21 deletions test/conduit_hornetq/test/core.clj
Expand Up @@ -21,18 +21,14 @@
(def upcase (a-hornetq
upcase-queue
"upcase"
(a-comp deserialize
(a-arr #(.toUpperCase %))
serialize)))
(a-arr #(.toUpperCase %))))

(def evaler-queue (str "some.q." (UUID/randomUUID)))

(def evaler (a-hornetq
evaler-queue
"eval"
(a-comp deserialize
(a-arr eval)
serialize)))
(a-arr eval)))

(use-fixtures :once
(fn [f]
Expand All @@ -57,32 +53,21 @@
(deftest test-a-hornetq
(let [[result] ((test-conduit-fn hornet-proc) (.getBytes "foo bar"))]
(is (= "foo bar" (String. result "utf8"))))
(let [[result] ((test-conduit-fn
(a-comp serialize
hornet-proc
deserialize)) "foo bar")]
(is (= "foo bar" result)))
(let [result (conduit-map
(a-comp serialize
upcase
deserialize
(a-comp upcase
pass-through)
["foo" "bar"])]
(is (= ["FOO" "BAR"] result)))
(let [[result] (conduit-map
(a-comp serialize
evaler
deserialize
(a-comp evaler
pass-through)
['(+ 1 2)])]
(is (= 3 result)))
(let [result (conduit-map
(a-comp (a-par pass-through
serialize)
(a-select
(a-comp (a-select
:sexp evaler
:string upcase)
deserialize)
pass-through)
[[:sexp '(+ 1 2)]
[:string "foo"]])]
(is (= [3 "FOO"] result))))

0 comments on commit 0716e81

Please sign in to comment.