-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka.clj
89 lines (79 loc) · 2.26 KB
/
kafka.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
(ns sisyphus.kafka
(:require
[cheshire.core :as json]
[cheshire.factory :as factory]
[kinsky.client :as kafka]))
(def poll-interval Long/MAX_VALUE)
(def non-numeric-factory
(factory/make-json-factory {:allow-non-numeric-numbers true}))
(defn server-for
[config]
(str (:host config) ":" (:port config)))
(defn producer-config
[config]
{:bootstrap.servers (server-for config)})
(defn boot-producer
[config]
(kafka/producer
(producer-config config)
(kafka/keyword-serializer)
(kafka/json-serializer)))
(defn send!
[producer topic message]
(kafka/send!
producer
{:topic topic
:value message}))
(defn consumer-config
[config]
{:bootstrap.servers (server-for config)
:enable.auto.commit "true"
:auto.commit.interval.ms "1000"
:group.id (get config :group-id "flow")
:auto.offset.reset "latest"})
(defn handle-message
[handle record]
(try
(if (= (first record) :by-topic)
(let [topics (last record)]
(doseq [[topic messages] topics]
(doseq [message messages]
(println topic ":" message)
(let [value {topic (:value message)}]
(handle topic (:value message)))))))
(catch Exception e
(println (.getMessage e))
(.printStackTrace e))))
(defn consume
[consumer handle]
(binding [factory/*json-factory* non-numeric-factory]
(loop [records
(try
(kafka/poll! consumer poll-interval)
(catch Exception e
(println (.getMessage e))))]
(when-not (empty? records)
(doseq [record records]
(handle record)))
(recur (kafka/poll! consumer poll-interval)))))
(defn boot-consumer
[config handle]
(let [consumer (kafka/consumer
(consumer-config config)
(kafka/keyword-deserializer)
(kafka/json-deserializer))]
(doseq [topic (:subscribe config)]
(kafka/subscribe! consumer topic))
{:consumer consumer
:future (future
(consume
consumer
(partial handle-message handle)))}))
(defn boot-kafka
[config handle]
(let [producer (boot-producer config)
consumer (boot-consumer config handle)]
(merge
consumer
{:config config
:producer producer})))