/
util.clj
97 lines (82 loc) · 3.65 KB
/
util.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
(ns ketu.async.util
(:require [clojure.core.async :as async]
[clojure.set])
(:import (org.apache.kafka.common.serialization Deserializer Serializer)))
(defn blocking-drain! [ch]
(loop [] (when (some? (async/<!! ch)) (recur))))
(defn blocking-drain-all! [chs]
(blocking-drain! (async/merge chs)))
(defn go-drain! [ch]
(async/go-loop [] (when (some? (async/<! ch)) (recur))))
(defn go-drain-all! [chs]
(go-drain! (async/merge chs)))
(defn- preset-deserializer-class [value-type]
(case value-type
:string "org.apache.kafka.common.serialization.StringDeserializer"
:byte-array "org.apache.kafka.common.serialization.ByteArrayDeserializer"
nil))
(defn- type-deserializer-class [type]
(cond
;; A deserializer object is passed as constructor param instead of *.deserializer config.
(instance? Deserializer type)
nil
;; Try one of the presets
(or (string? type) (keyword? type) (symbol? type))
(-> type keyword preset-deserializer-class)
;; Class name (string) or Class
(some? type)
type))
(defn- set-deserializers
"Sets key.deserializer and value.deserializer in internal-config.
If the type option is a serializer object, we don't touch the internal-config.
Otherwise we translate it to a serializer class."
[internal-config opts]
(let [final-key-class (when-not (get internal-config "key.deserializer")
(type-deserializer-class (:ketu.source/key-type opts)))
final-value-class (when-not (get internal-config "value.deserializer")
(type-deserializer-class (:ketu.source/value-type opts)))]
(cond-> internal-config
final-key-class (assoc "key.deserializer" final-key-class)
final-value-class (assoc "value.deserializer" final-value-class))))
(defn- preset-serializer-class [value-type]
(case value-type
:string "org.apache.kafka.common.serialization.StringSerializer"
:byte-array "org.apache.kafka.common.serialization.ByteArraySerializer"
nil))
(defn- type-serializer-class [type]
(cond
;; A serializer object is passed as constructor param instead of *.serializer config.
(instance? Serializer type)
nil
;; Try one of the presets
(or (string? type) (keyword? type) (symbol? type))
(-> type keyword preset-serializer-class)
;; Class name (string) or Class
(some? type)
type))
(defn- set-serializers
"Sets key.serializer and value.serializer in internal-config.
If the type option is a serializer object, we don't touch the internal-config.
Otherwise we translate it to a serializer class."
[internal-config opts]
(let [final-key-class (when-not (get internal-config "key.serializer")
(type-serializer-class (:ketu.sink/key-type opts)))
final-value-class (when-not (get internal-config "value.serializer")
(type-serializer-class (:ketu.sink/value-type opts)))]
(cond-> internal-config
final-key-class (assoc "key.serializer" final-key-class)
final-value-class (assoc "value.serializer" final-value-class))))
(defn set-ketu-to-apache-opts
"Translates specific top-level opts to the internal java api config
and merges to the original internal config (original values win)."
[internal-config opts]
(let [kmap {:ketu/brokers "bootstrap.servers"
:ketu.source/group-id "group.id"
:ketu.apache.consumer/auto-offset-reset "auto.offset.reset"
:ketu.apache.producer/compression-type "compression.type"}]
(-> opts
(select-keys (keys kmap))
(clojure.set/rename-keys kmap)
(set-deserializers opts)
(set-serializers opts)
(merge internal-config))))