Skip to content

Commit

Permalink
Fix the no-test problem.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mitchell Thomas committed Oct 24, 2015
1 parent 784357a commit 23d8a95
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
6 changes: 4 additions & 2 deletions project.clj
Expand Up @@ -7,8 +7,10 @@
[org.flatland/protobuf "0.8.1"]
[org.clojure/tools.logging "0.3.1"]
[org.slf4j/slf4j-log4j12 "1.7.12"]]
:plugins [[lein-protobuf "0.4.3"]]
:plugins [[lein-protobuf "0.4.3"]
[cider/cider-nrepl "0.10.0-SNAPSHOT"]]
:main ^:skip-aot message-bus.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all}}
:profiles {:uberjar {:aot :all}
:dev {:dependencies [[org.apache.activemq/activemq-broker "5.12.1"]]}}
)
27 changes: 13 additions & 14 deletions src/message_bus/core.clj
Expand Up @@ -38,21 +38,24 @@
connection (.createConnection factory username password)
_ (.start connection)
session (.createSession connection false Session/AUTO_ACKNOWLEDGE)]
{:connection connection
:session session}))
(atom {:connection connection
:session session
:channels []})))

(defn stop-activemq-session!
[{:keys [session connection]}]
(do (.close session)
(.close connection)
true))

[sess]
(let [{:keys [session connection channels]} @sess]
(dorun (map close! channels))
(.close session)
(.close connection)
true))
(defn start-consumer!
"Returns a chan subscribed to the provided topic using the provided
session. The chan will close when the session has been terminated."
[session topic-name]
(let [ch (chan)
s (:session session)
_ (swap! session #(assoc % :channels (conj (% :channels) ch)))
s (:session @session)
ml (reify MessageListener
(onMessage [this mess]
(cond
Expand Down Expand Up @@ -92,7 +95,8 @@
the session has closed."
[session topic-name]
(let [cha (chan)
ses (:session session)
_ (swap! session #(assoc % :channels (conj (% :channels) cha)))
ses (:session @session)
top (ActiveMQTopic. topic-name)
pub (.createPublisher ses top)
_ (go
Expand Down Expand Up @@ -153,9 +157,6 @@
(comment
(def ses (start-activemq-session! default-url :username "fredly" :password "foo"))
(start-activemq-session! default-url)
(def c (message-bus-consumer ses "pine"))
(def p (message-bus-publisher ses "pine"))

(def consumer (go-loop []
(when-let [m (<! c)]
(log/info "this: " m)
Expand All @@ -164,8 +165,6 @@
(>!! p "hey!")
(>!! p [{:this (rand) :that (rand)} [1 2 3] {:ding true :dong false}])

(def pcon (message-bus-consumer ses "person"))
(def ppub (message-bus-publisher ses "person"))
(def person-consumer (go-loop []
(when-let [person-bytes (<! pcon)]
(log/info "m: " person-bytes)
Expand Down
23 changes: 19 additions & 4 deletions test/message_bus/core_test.clj
@@ -1,7 +1,22 @@
(ns message-bus.core-test
(:require [clojure.test :refer :all]
[message-bus.core :refer :all]))
[clojure.core.async :refer [put! close! <!!]]
[message-bus.core :refer :all]
[message-bus.messages :refer :all]))

(deftest a-test
(testing "FIXME, I fail."
(is (= 0 1))))
(deftest pubsub-test
(testing "Publishing and Consuming using a local broker"
(let [sess (start-activemq-session! "vm://localhost?broker.persistent=false")
dest "pubsub-test"
consu (start-consumer! sess dest)
publi (start-publisher! sess dest)
person {:id 42 :name "Herbie" :email "hhancock@jazz-foo.com"
:likes ["keys" "funk" "deep dish pizza"]}
person_proto (person-builder person)]
(is (put! publi person_proto))
(is (= (person-builder person_proto) (person-builder (<!! consu))))
(is (put! publi person))
(is (= person (<!! consu)))
(is (stop-activemq-session! sess))
(is (false? (put! publi person)))
(is (nil? (<!! consu))))))

0 comments on commit 23d8a95

Please sign in to comment.