From 0716e81074a8a2b443b9ab831ee76226abff038b Mon Sep 17 00:00:00 2001 From: "Kevin Downey (hiredman)" Date: Mon, 4 Oct 2010 03:48:08 -0700 Subject: [PATCH] auto serialize/deserialize --- src/conduit/hornetq.clj | 51 +++++++++++++++--------------- test/conduit_hornetq/test/core.clj | 27 ++++------------ 2 files changed, 32 insertions(+), 46 deletions(-) diff --git a/src/conduit/hornetq.clj b/src/conduit/hornetq.clj index 0713783..0d4309b 100644 --- a/src/conduit/hornetq.clj +++ b/src/conduit/hornetq.clj @@ -1,6 +1,5 @@ ;;TODO: procs can yield multiple values, how to deal with that with ;;reply queues? -;;TODO: is there some better way to "run procs" on a queue? ;;TODO: auto serialize (ns conduit.hornetq (:use [conduit.core]) @@ -60,18 +59,30 @@ (doto (.createMessage session true) (-> .getBodyBuffer (.writeBytes bytes)))) +(defn serialize [object] + (with-open [baos (ByteArrayOutputStream.) + oos (ObjectOutputStream. baos)] + (.writeObject oos object) + (.toByteArray baos))) + +(defn deserialize [bytes] + (with-open [bais (ByteArrayInputStream. bytes) + ois (ObjectInputStream. bais)] + (.readObject ois))) + + (defn- hornetq-pub-no-reply [queue id] - (fn hornetq-no-reply [bytes] + (fn hornetq-no-reply [value] (let [producer (producer *session* queue) - msg (bytes->msg *session* bytes)] + msg (bytes->msg *session* (serialize value))] (.putStringProperty msg "id" id) (.send producer msg) [[] hornetq-no-reply]))) (defn- hornetq-sg-fn [queue id] - (fn hornetq-reply [bytes] + (fn hornetq-reply [value] (let [producer (producer *session* queue) - msg (bytes->msg *session* bytes) + msg (bytes->msg *session* (serialize value)) reply-queue (str queue ".reply." (UUID/randomUUID)) consumer (consumer *session* queue)] (create-tmp-queue *session* reply-queue) @@ -81,23 +92,23 @@ (fn [] (let [msg (.receive consumer)] (ack *session* msg) - [msg hornetq-reply]))))) + [[(deserialize (msg->bytes msg))] hornetq-reply]))))) (defn- reply-fn [f] - (partial (fn hornet-reply-fn [f [bytes reply-queue]] + (partial (fn hornet-reply-fn [f [value reply-queue]] (let [producer (.createProducer *session* reply-queue) - [[new-bytes] new-f] (f bytes) - new-msg (bytes->msg *session* new-bytes)] + [[new-value] new-f] (f value) + new-msg (bytes->msg *session* (serialize new-value))] (.send producer new-msg) (.commit *session*) [[] (partial hornet-reply-fn new-f)])) f)) (defn- hornetq-pub-reply [queue id] - (fn hornetq-reply [bytes] + (fn hornetq-reply [value] (create-queue *session* queue) (let [producer (producer *session* queue) - msg (bytes->msg *session* bytes) + msg (bytes->msg *session* (serialize value)) reply-queue (str queue ".reply." (UUID/randomUUID))] (create-tmp-queue *session* reply-queue) (.putStringProperty msg "id" id) @@ -105,20 +116,10 @@ (.send producer msg) (.commit *session*) (let [msg (.receive (consumer *session* reply-queue)) - bytes (msg->bytes msg)] + value (deserialize (msg->bytes msg))] (ack *session* msg) - [[bytes] hornetq-reply])))) + [[value] hornetq-reply])))) -(def-arr serialize [object] - (with-open [baos (ByteArrayOutputStream.) - oos (ObjectOutputStream. baos)] - (.writeObject oos object) - (.toByteArray baos))) - -(def-arr deserialize [bytes] - (with-open [bais (ByteArrayInputStream. bytes) - ois (ObjectInputStream. bais)] - (.readObject ois))) (defn a-hornetq "turn a proc into a hornetq proc that listens on a queue" @@ -149,8 +150,8 @@ [[(receive session queue)] next-msg-inner])) (handle-msg [fun [id [msg-object reply-queue]]] (try - (let [msg-bytes (msg->bytes msg-object) - [_ new-fun] (fun [id [msg-bytes reply-queue]])] + (let [msg-value (deserialize (msg->bytes msg-object)) + [_ new-fun] (fun [id [msg-value reply-queue]])] (ack session msg-object) [[] (partial handle-msg new-fun)]) (catch Exception e diff --git a/test/conduit_hornetq/test/core.clj b/test/conduit_hornetq/test/core.clj index fd37081..70f7194 100644 --- a/test/conduit_hornetq/test/core.clj +++ b/test/conduit_hornetq/test/core.clj @@ -21,18 +21,14 @@ (def upcase (a-hornetq upcase-queue "upcase" - (a-comp deserialize - (a-arr #(.toUpperCase %)) - serialize))) + (a-arr #(.toUpperCase %)))) (def evaler-queue (str "some.q." (UUID/randomUUID))) (def evaler (a-hornetq evaler-queue "eval" - (a-comp deserialize - (a-arr eval) - serialize))) + (a-arr eval))) (use-fixtures :once (fn [f] @@ -57,32 +53,21 @@ (deftest test-a-hornetq (let [[result] ((test-conduit-fn hornet-proc) (.getBytes "foo bar"))] (is (= "foo bar" (String. result "utf8")))) - (let [[result] ((test-conduit-fn - (a-comp serialize - hornet-proc - deserialize)) "foo bar")] - (is (= "foo bar" result))) (let [result (conduit-map - (a-comp serialize - upcase - deserialize + (a-comp upcase pass-through) ["foo" "bar"])] (is (= ["FOO" "BAR"] result))) (let [[result] (conduit-map - (a-comp serialize - evaler - deserialize + (a-comp evaler pass-through) ['(+ 1 2)])] (is (= 3 result))) (let [result (conduit-map - (a-comp (a-par pass-through - serialize) - (a-select + (a-comp (a-select :sexp evaler :string upcase) - deserialize) + pass-through) [[:sexp '(+ 1 2)] [:string "foo"]])] (is (= [3 "FOO"] result))))