/
_interop.clj
111 lines (59 loc) · 1.66 KB
/
_interop.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
(ns dvlopt.kafka.-interop
;; Miscellaneous interop utilities.
;;
;; Not meant to be directly used by the user.
{:author "Adam Helinski"}
(:require [clojure.string :as string])
(:import java.util.Map
java.util.regex.Pattern
java.util.concurrent.Future
clojure.lang.Named
org.apache.kafka.streams.StreamsConfig))
;;;;;;;;;;
(defn future-proxy
;; Given a future, makes a future returning (transform @f*) on deref."
^Future
[^Future f* transform]
(reify
Future
(cancel [_ interrupt?]
(.cancel f*
interrupt?))
(isCancelled [_]
(.isCancelled f*))
(isDone [_]
(.isDone f*))
(get [_]
(transform (.get f*)))
(get [_ timeout unit]
(transform (.get f*
timeout
unit)))))
(defn named?
;; Is this an Named thing ?
[x]
(instance? Named
x))
(defn nodes-string
;; Produces a string of Kafka nodes for various configurations from a list of [host port].
[nodes]
(if (string? nodes)
nodes
(string/join ","
(map (fn host-port [[host port]]
(str host ":" port))
nodes))))
(defn regex?
;; Is this a regular expression pattern ?
[x]
(instance? Pattern
x))
(defn resource-configuration
;; Prepares a configuration map for resources such as a consumer or a producer.
;;
;; Cf. https://kafka.apache.org/documentation/#configuration
^Map
[configuration nodes]
(assoc configuration
"bootstrap.servers"
(nodes-string nodes)))