forked from FundingCircle/jackdaw
-
Notifications
You must be signed in to change notification settings - Fork 1
/
common.clj
83 lines (64 loc) · 1.92 KB
/
common.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
(in-ns 'jackdaw.data)
(import '[org.apache.kafka.common
PartitionInfo
Node TopicPartition TopicPartitionInfo])
(set! *warn-on-reflection* true)
;;; Node
(defn->data Node->data
""
[^Node node]
{:host (.host node)
:port (.port node)
:id (.id node)
:rack (.rack node)})
;;; PartitionInfo
(defn->data PartitionInfo->data
[^PartitionInfo pi]
{:topic-name (.topic pi)
:isr (mapv datafy (.inSyncReplicas pi))
:leader (datafy (.leader pi))
:replicas (mapv datafy (.replicas pi))
:partition (.partition pi)
:offline-replicas (mapv datafy (.offlineReplicas pi))})
;;; TopicPartitionInfo
(defn->data TopicPartitionInfo->data
""
[^TopicPartitionInfo tpi]
{:isr (mapv datafy (.isr tpi))
:leader (datafy (.leader tpi))
:partition (.partition tpi)
:replicas (mapv datafy (.replicas tpi))})
;;; Topic partition tuples
(defn ^TopicPartition ->TopicPartition
"Given unrolled ctor-style arguments, create a Kafka `TopicPartition`."
[{:keys [:topic-name]} partition]
(TopicPartition. topic-name (int partition)))
(defn map->TopicPartition
"Given a `::topic-parititon`, build an equivalent `TopicPartition`.
Inverts `(datafy ^TopicPartition tp)`."
[{:keys [topic-name
partition]
:as m}]
(->TopicPartition m partition))
(defn->data TopicPartition->data [^TopicPartition tp]
{:topic-name (.topic tp)
:partition (.partition tp)})
(defn as-TopicPartition
""
^TopicPartition [o]
(cond (instance? TopicPartition o)
o
(map? o)
(if (= TopicPartition (:clojure.datafy/class (meta o)))
(:clojure.datafy/obj (meta o))
(map->TopicPartition o))
:else
(throw (ex-info "Unable to build TopicPartition"
{:o o
:class (class o)}))))
(comment
(->TopicPartition {:topic-name "foo"} 1)
(TopicPartition->data *1)
(map->TopicPartition *1)
;; On 1.10+
(datafy *1))