/
commands.clj
120 lines (97 loc) · 3.75 KB
/
commands.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
(ns jackdaw.test.commands
""
(:require
[clojure.spec.alpha :as s]
[jackdaw.test.commands.base :as base]
[jackdaw.test.commands.write :as write]
[jackdaw.test.commands.watch :as watch])
(:refer-clojure :exclude [do]))
(set! *warn-on-reflection* true)
(def base-commands base/command-map)
(def write-command write/command-map)
(def watch-command watch/command-map)
(def command-map
(merge base-commands
write-command
watch-command))
(defn command-handler
[machine cmd]
(let [[cmd & params] cmd
handler (get command-map cmd)]
(if handler
;; Happy
(let [result (handler machine params)]
(assoc {}
:result result
:cmd cmd
:params params))
;; else Sad
(throw (ex-info (format "Unknown command: %s" cmd)
{:cmd cmd
:error :unknown-command
:params params
:available-commands (keys command-map)})))))
(defn with-handler
[machine handler]
(assoc machine
:command-handler handler))
;; Test Command API
(s/def ::topic-id (s/or :keyword keyword?
:string string?))
(s/def ::test-message any?)
(s/def ::write-options map?)
(s/def ::watch-options map?)
(defn do
"Invoke the provided function, passing a snapshot of the test journal
Use this to perform side-effects without representing their result in the journal"
[do-fn]
`[:do ~do-fn])
(s/fdef do
:args ifn?
:ret vector?)
(defn do!
"Invoke the provided function, passing the journal `ref`
Use this to perform side-effects when you want to represent the result in the journal
(e.g. insert test-data into an external database AND into the journal with the expectation
that it will eventually appear in kafka via some external system like kafka-connect)"
[do-fn]
`[:do! ~do-fn])
(s/fdef do!
:args ifn?
:ret vector?)
(defn write!
"Write a message to the topic identified in the topic-metadata by `topic-id`
`:message` is typically a map to be serialized by the Serde configured in the topic-metadata
but it can be whatever the configued Serde is capable of serializing
`:options` is an optional map containing additional information describing how the test-message
should be created. The following properties are supported.
`:key` An explicit key to associate with the test message
`:key-fn` A function to derive the key from the test message
`:partition` The partition to which the test message should be written
`:partition-fn` A function to derive the partition to which the test message should be written"
([topic-id message]
`[:write! ~topic-id ~message])
([topic-id message options]
`[:write! ~topic-id ~message ~options]))
(s/fdef write!
:args (s/cat :topic-id ::topic-id
:message ::test-message
:options (s/? ::write-options))
:ret vector?)
(defn watch
"Watch the test-journal until the `watch-fn` predicate returns true
`:watch-fn` is a function that takes the journal and returns true once the journal
contains evidence of the test being complete
`:options` is an optional map containing additional information describing how the watch
function will be run. The following properties are supported.
`:info` Diagnostic information to be included in the response when a watch fails
to observe the expected data in the journal
`:timeout` The number of milliseconds to wait before giving up"
([watch-fn]
`[:watch ~watch-fn])
([watch-fn options]
`[:watch ~watch-fn ~options]))
(s/fdef watch
:args (s/cat :watch-fn ifn?
:options (s/? ::watch-options))
:ret vector?)