Skip to content

Commit

Permalink
enh: added the skeleton of a possible application under test
Browse files Browse the repository at this point in the history
  • Loading branch information
caligin committed Aug 20, 2017
1 parent 51373b9 commit 209f956
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 0 deletions.
11 changes: 11 additions & 0 deletions demo.consumer/.gitignore
@@ -0,0 +1,11 @@
/target
/classes
/checkouts
pom.xml
pom.xml.asc
*.jar
*.class
/.lein-*
/.nrepl-port
.hgignore
.hg/
6 changes: 6 additions & 0 deletions demo.consumer/project.clj
@@ -0,0 +1,6 @@
(defproject demo.consumer "0.1.0-SNAPSHOT"
:description "a demo rabbit consumer we want to test for distributed properties"
:main demo.consumer
:dependencies [[org.clojure/clojure "1.8.0"]
[com.novemberain/langohr "3.6.1"]
[com.novemberain/monger "3.1.0"]])
54 changes: 54 additions & 0 deletions demo.consumer/src/demo/consumer.clj
@@ -0,0 +1,54 @@
(ns demo.consumer
(:gen-class)
(:require [langohr.core :as rmq]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.exchange :as le]
[langohr.consumers :as lc]
[langohr.basic :as lb]
[monger.core :as mg]
[monger.collection :as mc])
(:import [com.mongodb MongoOptions ServerAddress]))


(defn next-state [current-state command]
((keyword command)
((keyword current-state)
{:init {:new :in-progress}
:in-progress {:update :in-progress :terminate :terminated}
:terminated {}
})))

(defn load-state [mongo collection id] (if-let [{:keys [state]} (mc/find-map-by-id mongo collection id)]
state
:init))

(defn update-state [mongo collection id state] (mc/update mongo collection {:_id id} {:state state} {:upsert true}))

(defn make-message-handler [mongo collection]
(fn [ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
(let [[id, command] (clojure.string/split (String. payload "UTF-8") #":")]
(update-state mongo collection id (or (next-state (load-state mongo collection id) command) :b0rk)))))


(defn -main
"Consumer"
[& args]
(let [conn (rmq/connect)
ch (lch/open conn)
qname "democonsumer"
xchgname "democonsumer"
dbname "democonsumer"
collectionname "entities"
mconn (mg/connect)
mongo (mg/get-db mconn dbname)]
(println (format "Consumer Connected. Channel id: %d" (.getChannelNumber ch)))
(le/declare ch xchgname "topic" {:durable true})
(lq/declare ch qname {:exclusive false :auto-delete true})
(lq/bind ch qname xchgname {:routing-key "events.for.*"})
(lc/subscribe ch qname (make-message-handler mongo collectionname) {:auto-ack true})
(Thread/sleep 60000)
(println "Closing")
(rmq/close ch)
(rmq/close conn)
(mg/disconnect mconn)))

0 comments on commit 209f956

Please sign in to comment.