-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.clj
57 lines (47 loc) · 2.19 KB
/
consumer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
(ns gregor.details.consumer
(:require [gregor.details.protocols.consumer :refer [ConsumerProtocol] :as consumer]
[gregor.details.transform :as xform]
[gregor.details.deserializer :refer [->deserializer]])
(:import org.apache.kafka.clients.consumer.KafkaConsumer
java.util.concurrent.TimeUnit
org.apache.kafka.common.errors.WakeupException
java.util.Collection
java.util.regex.Pattern))
(defn reify-consumer-protocol
"Create a reified implementation of a consumer, which includes ConsumerProtocol and SharedProtocol functions"
[^KafkaConsumer consumer]
(reify
ConsumerProtocol
(close! [_ timeout]
(if-not (int? timeout)
(.close consumer)
(.close consumer (long timeout) TimeUnit/MILLISECONDS)))
;; Hopefully we can remove this try ... catch in the future. There is currently a synchronization bug
;; that arises if the user spams control events that for functions that can throw WakeupException. Since
;; the WakeupException is used to exit `poll!` to handle control events
(partitions-for [_ topic]
(mapv xform/partition-info->data (.partitionsFor consumer topic)))
(poll! [_ timeout]
(xform/consumer-records->data (.poll consumer timeout)))
(subscribe! [_ topics]
(let [t (xform/data->topics topics)]
(if (vector? t)
(.subscribe ^KafkaConsumer consumer ^Collection t)
(.subscribe ^KafkaConsumer consumer ^Pattern t))))
(unsubscribe! [_]
(.unsubscribe consumer))
(subscription [_]
(.subscription consumer))
(commit! [_]
#(.commitSync consumer))
(wakeup! [_]
(.wakeup consumer))))
(defn make-consumer
"Create a consumer from a configuration"
[{:keys [key-deserializer value-deserializer kafka-configuration topics]
:or {key-deserializer :edn value-deserializer :edn}}]
(let [driver (reify-consumer-protocol (KafkaConsumer. (xform/opts->props kafka-configuration)
(->deserializer key-deserializer)
(->deserializer value-deserializer)))]
(consumer/subscribe! driver topics)
driver))