-
Notifications
You must be signed in to change notification settings - Fork 81
/
journal.clj
102 lines (90 loc) · 3.67 KB
/
journal.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
(ns jackdaw.test.journal
""
(:require
[clojure.tools.logging :as log]
[manifold.stream :as s]
[manifold.deferred :as d]))
;; Journal
;;
;; The journal represents the test output. It captures the output of all
;; messages arriving on the test-machine' `:consumer` channel.
(defn watch-for
"Returns the first true application of the journal to the specified condition `condition?`"
[machine condition? timeout info]
(let [journal (:journal machine)
p (promise)
id (java.util.UUID/randomUUID)
check-condition (fn [new]
(when-let [result (condition? new)]
(remove-watch journal id)
(deliver p {:result :found
:info result})))]
(add-watch journal id (fn [k r old new]
(check-condition new)))
;; don't rely on watcher to 'check-condition'
;; in case journal is already in a final, good state
(check-condition @journal)
(deref p timeout {:error :timeout :info info})))
(defn journal-read
"Append `record` into the `journal` under `journal-key`"
[journal journal-key v]
(let [result (update-in journal journal-key concat v)]
result))
(defn reverse-lookup
"Given a map of `topic-metadata`, find the name of the entry whose
`:topic-name` matches the supplied topic.
The name supplied in the key positions in the supplied map do not necessarily
match the topic name. This allows users to provide a 'logical' name that may stay
constant if the user wishes over version changes.
This function is used when populating the journal to ensure that the topic
identifiers given by the user are used when populating the journal (instead of
the real topic names)."
[topic-metadata topic]
(let [m (reduce-kv (fn [m k v]
(assoc m (:topic-name v) k))
{}
topic-metadata)]
(get m topic)))
(defn journal-result
[machine record]
"Journals the `record` in the appropriate place in the supplied test
machine's `:journal`"
(let [journal (:journal machine)]
(if-let [err (agent-error journal)]
(throw err)
(send journal journal-read [:topics (:topic record)] [record]))))
(defn journaller
"Returns an asynchronous process that reads all messages produced by
the supplied `machine`'s `:consumer` and records them in the journal"
[machine stop?]
(when-not (:journal machine)
(log/error machine "no journal available")
(throw (ex-info "no journal available: " {})))
(when-not (get-in machine [:consumer :messages])
(log/error machine "no message stream to journal")
(throw (ex-info "no message stream to journal:" machine)))
(let [{:keys [messages]} (:consumer machine)]
(d/loop [record (s/take! messages)]
(d/chain record
(fn [record]
(when-not @stop?
(when record
(journal-result machine record)
(d/recur (s/take! messages)))))))))
(defn with-journal
"Enriches the supplied `machine` with a journaller that will write to
the supplied `journal`."
[machine journal]
(let [machine' (assoc machine :journal journal)
stop? (atom false)
jloop (journaller machine' stop?)]
(assoc machine'
:journal journal
:jloop jloop
:exit-hooks (concat
(:exit-hooks machine)
[#(do
(log/debug "stop accepting journal messages")
(reset! stop? true)
(log/debug "wait for journaller to finish")
@jloop)]))))