/
extras.clj
89 lines (83 loc) · 3.71 KB
/
extras.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
(ns jackdaw.streams.extras
"FIXME"
{:license "BSD 3-Clause License <https://github.com/FundingCircle/jackdaw/blob/master/LICENSE>"}
(:require [clj-time.core :as inst]
[clojure.tools.logging :as log]
[jackdaw.streams :as js]
[clojure.spec.alpha :as s])
(:import org.apache.kafka.common.TopicPartition
org.apache.kafka.streams.processor.StateRestoreListener))
(defn map-validating!
[builder topic topic-spec {:keys [line file]}]
(js/map-values builder
(fn [val]
(if-not (s/valid? topic-spec val)
(let [msg "Failed to validate outbound record!"
data (merge {::topic (:topic.metadata/name topic)
::line line
::file file}
(s/explain-data topic-spec val))]
(log/fatal msg (pr-str data))
(throw (ex-info msg data)))
val))))
(defn with-file [form-meta]
(update form-meta :file #(or %1 *file*)))
(defn logging-state-restore-listener
"Returns a new Kafka `StateRestoreListener` instance which logs when
batches are restored, and how long it takes to restore all the
batches for a given partition."
^StateRestoreListener []
(let [restore-tracker (atom {})]
(reify StateRestoreListener
(^void onRestoreStart [_
^TopicPartition topicPartition,
^String storeName
^long startingOffset
^long endingOffset]
(swap! restore-tracker assoc storeName (inst/now))
(log/warnf "Restoring state store (%s.%d) over offsets %s...%s"
(.topic topicPartition) (.partition topicPartition)
startingOffset endingOffset))
(^void onBatchRestored [_
^TopicPartition topicPartition
^String storeName
^long batchEndOffset
^long numRestored]
(log/warnf "Restored a batch from (%s.%d)"
(.topic topicPartition) (.partition topicPartition)))
(^void onRestoreEnd [_
^TopicPartition topicPartition
^String storeName
^long totalRestored]
(let [start-date (get @restore-tracker storeName)
elapsed-sec (inst/in-seconds (inst/interval start-date
(inst/now)))]
(log/warnf "Finished restoring state store (%s.%d) elapsed %s"
(.topic topicPartition) (.partition topicPartition)
elapsed-sec))))))
(defmacro to!
"Wraps `#'jackdaw.streams/to!`, providing validation of records
against the spec of the to! topic."
([builder topic topic-spec]
`(let [t# ~topic]
(-> ~builder
(map-validating! t# ~topic-spec ~(with-file (meta &form)))
(js/to t#))))
([builder partition-fn topic topic-spec]
`(let [t# ~topic]
(-> ~builder
(map-validating! t# ~topic-spec ~(with-file (meta &form)))
(js/to ~partition-fn t#)))))
(defmacro through
"Wraps `#'jackdaw.streams/through`, providing validation of records
against the spec of the through topic."
([builder topic topic-spec]
`(let [t# ~topic]
(-> ~builder
(map-validating! t# ~topic-spec ~(with-file (meta &form)))
(js/through t#))))
([builder partition-fn topic topic-spec]
`(let [t# ~topic]
(-> ~builder
(map-validating! t# ~topic-spec ~(with-file (meta &form)))
(js/through ~partition-fn t#)))))