Skip to content

Commit

Permalink
Implemented lightweight test as example implementation of cqrs-server
Browse files Browse the repository at this point in the history
  • Loading branch information
Deon Moolman committed Mar 7, 2015
1 parent 32725b6 commit 0870736
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 24 deletions.
11 changes: 5 additions & 6 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,26 @@
:dependencies
[[org.clojure/clojure "1.6.0"]
[prismatic/schema "0.3.8-SNAPSHOT"]
[danlentz/clj-uuid "0.0.7-SNAPSHOT"]
[danlentz/clj-uuid "0.1.2-SNAPSHOT"]

[zookeeper-clj "0.9.3"]
[org.slf4j/slf4j-api "1.7.7"]
[org.clojure/data.json "0.2.3"]
[org.apache.zookeeper/zookeeper "3.4.6"]
[com.mdrogalis/onyx "0.5.2"]
[com.mdrogalis/onyx-core-async "0.5.0"]
[com.mdrogalis/onyx "0.5.3"]
[com.mdrogalis/onyx-core-async "0.5.3"]
[commons-codec "1.7"]
[datomic-schema "1.2.2"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]
[criterium "0.4.3"]
[yuppiechef/onyx-dynamodb "0.5.0"]
[com.taoensso/faraday "1.5.0" :exclusions [org.clojure/clojure]]
[com.taoensso/faraday "1.5.0" :exclusions [org.clojure/clojure com.amazonaws/aws-java-sdk]]
[com.mdrogalis/onyx-datomic "0.5.2"]
[com.mdrogalis/onyx-kafka "0.5.0" :exclusions [org.slf4j/slf4j-simple]]
[org.hornetq/hornetq-commons "2.4.0.Final"]
[org.hornetq/hornetq-core-client "2.4.0.Final"]
[org.hornetq/hornetq-server "2.4.0.Final"]]

:profiles {:dev {:dependencies [[com.datomic/datomic-free "0.9.5130" :exclusions [org.slf4j/slf4j-nop org.slf4j/log4j-over-slf4j joda-time]]]}
:prod {:dependencies [[com.datomic/datomic-pro "0.9.5130" :exclusions [org.slf4j/slf4j-nop org.slf4j/log4j-over-slf4j joda-time]]]}}
:repl-options{:init-ns cqrs-server.core}
:repl-options { :init-ns cqrs-server.core :timeout 120000}
:main cqrs-server.core)
7 changes: 6 additions & 1 deletion src/cqrs_server/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
[onyx.api]

[cqrs-server.cqrs :as cqrs]
[cqrs-server.module :as module]))
[cqrs-server.module :as module])
(:gen-class))

;; Start local dynamodb - download from:
;; http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Tools.DynamoDBLocal.html
Expand Down Expand Up @@ -231,3 +232,7 @@
(k/with-resource [c (zk/consumer config)]
zk/shutdown
(println (take 10 (map fressian/read (map :value (zk/messages c queue)))))))


(defn main- [& args]
(start))
43 changes: 26 additions & 17 deletions src/cqrs_server/cqrs.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
(ns cqrs-server.cqrs
(:require
[cqrs-server.util :as util :refer [defdbfn]]
[clojure.core.async :as a]
[onyx.peer.task-lifecycle-extensions :as l-ext]
[datomic.api :as d]
[datomic-schema.schema :refer [schema fields]]
[taoensso.timbre :refer [info]]
[taoensso.timbre :as log]
[schema.core :as s]
[schema.coerce :as coerce]
[clj-uuid :as u]
Expand All @@ -21,8 +23,12 @@
(defmethod aggregate-event :default [_] [])

(defn aggregate-event* [e]
(let [r (aggregate-event e)]
[{:tx (.array (fressian/write [[:idempotent-tx (java.util.UUID/fromString (str (:id e))) r]])) :e e}]))
(log/info "Aggregating" e)
(let [r (aggregate-event e)
_ (log/info "transacting: " r)
t @(d/transact (d/connect @datomic-uri) [[:idempotent-tx (java.util.UUID/fromString (str (:id e))) r]])]
[{:eventid (:id e)
:basis-t (d/basis-t (:db-after t))}]))

(defdbfn idempotent-tx [db eid tx]
(if-not (datomic.api/entity db [:event/uuid eid])
Expand All @@ -46,8 +52,11 @@
(defmethod command-coerce :default [_] [])

(defn command-coerce* [c]
(command-coerce c)
[c])
(log/info "Coercing: " c)
(let [coerce (command-coerce c)]
(if (:error coerce)
(throw (RuntimeException. (str (:error coerce))))
[c])))


(defn install-command [[type schema]]
Expand All @@ -65,13 +74,15 @@
(defmethod process-command :default [_] [])

(defn process-command* [command]
(log/info "Processing Command: " command)
(process-command command))

(defn prepare-store [e]
(log/info "Preparing for storage: " e)
(assoc e :id (str (:id e)) :data (.array (fressian/write (:data e)))))

(defn error [msg]
(throw (RuntimeException. msg)))
(throw (RuntimeException. (str msg))))

(defn command [basis-t type msg]
{:basis-t basis-t
Expand All @@ -89,18 +100,18 @@
(defn events [command segment msgs]
(map (partial event command segment) (range) msgs))



(def command-workflow
[[:command/in-queue :command/coerce]
[:command/coerce :command/process]
[:command/process :event/out-queue]
[:event/in-queue :event/prepare-store]
[:event/prepare-store :event/store]
[:event/in-queue :event/aggregator]
[:event/aggregator :event/store-aggregate]])

[:event/aggregator :event/aggregate-out]])


(defn catalog [{:keys [command-queue in-event-queue out-event-queue event-store aggregate-store]}]
(defn catalog [{:keys [command-queue in-event-queue out-event-queue event-store aggregate-store aggregate-out]}]
[(assoc
command-queue
:onyx/name :command/in-queue
Expand All @@ -110,13 +121,13 @@
:onyx/type :function
:onyx/fn :cqrs-server.cqrs/command-coerce*
:onyx/consumption :concurrent
:onyx/batch-size 10}
:onyx/batch-size 1}

{:onyx/name :command/process
:onyx/type :function
:onyx/fn :cqrs-server.cqrs/process-command*
:onyx/consumption :concurrent
:onyx/batch-size 10}
:onyx/batch-size 1}

(assoc
out-event-queue
Expand All @@ -132,7 +143,7 @@
:onyx/type :function
:onyx/fn :cqrs-server.cqrs/prepare-store
:onyx/consumption :concurrent
:onyx/batch-size 10}
:onyx/batch-size 1}

(assoc
event-store
Expand All @@ -146,8 +157,6 @@
:onyx/batch-size 1}

(assoc
aggregate-store
:onyx/name :event/store-aggregate
aggregate-out
:onyx/name :event/aggregate-out
:onyx/type :output)])


133 changes: 133 additions & 0 deletions test/cqrs_server/async_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
(ns cqrs-server.async-test
(:require
[datomic.api :as d]
[datomic-schema.schema :as ds :refer [schema fields part]]
[schema.core :as s]
[cqrs-server.module :as module]
[cqrs-server.cqrs :as cqrs]
[onyx.peer.task-lifecycle-extensions :as l-ext]

[onyx.plugin.datomic]
[onyx.plugin.core-async]
[onyx.api]

[clojure.core.async :as a]
[clojure.test :refer :all]
[taoensso.timbre :as log]))

;; A simple async cqrs test, with in-memory datomic

(def onyxid (java.util.UUID/randomUUID))

(def env-config
{:hornetq/mode :vm
:hornetq/server? true
:hornetq.server/type :vm
:zookeeper/address "127.0.0.1:2185"
:zookeeper/server? true
:zookeeper.server/port 2185
:onyx/id onyxid
:onyx.peer/job-scheduler :onyx.job-scheduler/round-robin})

(def peer-config
{:hornetq/mode :vm
:zookeeper/address "127.0.0.1:2185"
:onyx/id onyxid
:onyx.peer/inbox-capacity 100
:onyx.peer/outbox-capacity 100
:onyx.peer/job-scheduler :onyx.job-scheduler/round-robin})

(def config
{:datomic-uri "datomic:mem://cqrsasync"
:command-stream (atom nil)
:event-stream (atom nil)
:event-store-stream (atom nil)
:aggregate-out-stream (atom nil)
:channels [:command-stream :event-stream :event-store-stream :aggregate-out-stream]
:env env-config
:peer peer-config
:onyxid onyxid})

(defn chan-stream [type]
{:onyx/ident (if (= type :outpu) :core.async/write-to-chan :core.async/read-from-chan)
:onyx/type type
:onyx/medium :core.async
:onyx/consumption :sequential
:onyx/batch-size 1
:onyx/batch-timeout 500
:onyx/max-peers 1})

(def catalog
(cqrs/catalog
{:command-queue (chan-stream :input)
:out-event-queue (chan-stream :output)
:in-event-queue (chan-stream :input)
:event-store (chan-stream :output)
:aggregate-out (chan-stream :output)}))

(defmethod l-ext/inject-lifecycle-resources :command/in-queue [_ _]
{:core-async/in-chan @(:command-stream config)})

(defmethod l-ext/inject-lifecycle-resources :event/out-queue [_ _]
{:core-async/out-chan @(:event-stream config)})

(defmethod l-ext/inject-lifecycle-resources :event/in-queue [_ _]
{:core-async/in-chan @(:event-stream config)})

(defmethod l-ext/inject-lifecycle-resources :event/store [_ _]
{:core-async/out-chan @(:event-store-stream config)})

(defmethod l-ext/inject-lifecycle-resources :event/aggregate-out [_ _]
{:core-async/out-chan @(:aggregate-out-stream config)})

(defn setup-env [db-schema]
(doseq [c (:channels config)]
(reset! (get config c) (a/chan 10)))
(let [env (onyx.api/start-env env-config)
peers (onyx.api/start-peers! 10 peer-config)
dturi (:datomic-uri config)]
(d/create-database dturi)
(d/transact (d/connect dturi) (ds/generate-schema d/tempid db-schema))
(reset! cqrs/datomic-uri dturi)
{:env env
:peers peers
:job (onyx.api/submit-job
peer-config
{:catalog catalog :workflow cqrs/command-workflow :task-scheduler :onyx.task-scheduler/round-robin})}))

(defn stop-env [env]
(onyx.api/kill-job peer-config (:job env))
(doseq [p (:peers env)] (onyx.api/shutdown-peer p))
(onyx.api/shutdown-env (:env env))
(d/delete-database (:datomic-uri config))
(doseq [c (:channels config)]
(a/close! @(get config c))
(reset! (get config c) nil))
true)

(defn command [type data]
(cqrs/command (d/basis-t (d/db (d/connect (:datomic-uri config)))) type data))

(defn send-command [type data]
(a/>!! @(:command-stream config) (command type data)))

(def db-schema
(concat
cqrs/db-schema
[(schema
base
(fields
[uuid :uuid :unique-identity]))]
module/db-schema))

(deftest run-test []
(let [env (setup-env db-schema)
event (delay (first (a/alts!! [@(:event-store-stream config) (a/timeout 1000)])))
aggregate (delay (first (a/alts!! [@(:aggregate-out-stream config) (a/timeout 1000)])))]
(try
(send-command :user/register {:name "Bob" :age 33})
(assert (= (:type @event) :user/registered))
(assert @aggregate)
(assert (= #{["Bob" 33]} (d/q '[:find ?n ?a :where [?e :user/name ?n] [?e :user/age ?a]] (d/as-of (d/db (d/connect (:datomic-uri config))) (:basis-t @aggregate)))))
(finally
(stop-env env)))))

0 comments on commit 0870736

Please sign in to comment.