/
consumer.clj
93 lines (79 loc) · 2.75 KB
/
consumer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
;; jackdaw.data.consumer
;;
;; jackdaw.data extensions for the consumer API types
(in-ns 'jackdaw.data)
(import '[org.apache.kafka.clients.consumer
ConsumerRecord OffsetAndTimestamp]
'org.apache.kafka.common.header.Headers)
(defn ^ConsumerRecord ->ConsumerRecord
"Given unrolled ctor-style arguments create a Kafka `ConsumerRecord`.
Convenient for testing the consumer API and its helpers."
[{:keys [:topic-name]} partition offset ts ts-type
key-size value-size key value ^Headers headers]
(ConsumerRecord. topic-name
(int partition)
(long offset)
(long ts)
(if (keyword? ts-type)
(->TimestampType ts-type)
^TimestampType ts-type)
nil ;; Deprecated checksum
(int key-size)
(int value-size)
key value
headers))
(defn map->ConsumerRecord
"Given a `::consumer-record`, build an equivalent `ConsumerRecord`.
Inverts `(datafy ^ConsumerRecord cr)`."
[{:keys [:key
:value
:headers
:partition
:timestamp
:timestamp-type
:offset
:serialized-key-size
:serialized-value-size]
:as m}]
(->ConsumerRecord m partition offset timestamp
(->TimestampType timestamp-type)
serialized-key-size serialized-value-size
key value headers))
(defn->data ConsumerRecord->data [^ConsumerRecord r]
{:topic-name (.topic r)
:key (.key r)
:value (.value r)
:headers (.headers r)
:partition (.partition r)
:timestamp (.timestamp r)
;; Deprecated field
;; :checksum (.checksum r)
:timestamp-type (TimestampType->data (.timestampType r))
:offset (.offset r)
:serialized-key-size (.serializedKeySize r)
:serialized-value-size (.serializedValueSize r)})
(comment
(->ConsumerRecord {:topic-name "foo"} 1 100 1 :jackdaw.timestamp/create
5 10 "fooo" "barrrrrrrr" nil)
(ConsumerRecord->data *1)
(map->ConsumerRecord *1)
;; on 1.10+
(datafy *1))
;;; OffsetAndTimestamp tuples
(defn ^OffsetAndTimestamp ->OffsetAndTimestamp
[{:keys [offset timestamp]}]
(OffsetAndTimestamp. offset (long timestamp)))
(defn->data OffsetAndTimestamp->data [^OffsetAndTimestamp ots]
{:offset (.offset ots)
:timestamp (.timestamp ots)})
(defn map->OffsetAndTimestamp
[{:keys [offset timestamp] :as m}]
(->OffsetAndTimestamp m))
(defn as-OffsetAndTimestamp
[ot]
(cond (instance? OffsetAndTimestamp ot)
ot
(map? ot)
(if (= OffsetAndTimestamp (:clojure.datafy/class (meta ot)))
(:clojure.datafy/obj (meta ot))
(map->OffsetAndTimestamp ot))))