Skip to content

Commit

Permalink
Merge 32adba6 into 444ddfe
Browse files Browse the repository at this point in the history
  • Loading branch information
isomnath committed Oct 5, 2021
2 parents 444ddfe + 32adba6 commit c35f83b
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 6 deletions.
79 changes: 75 additions & 4 deletions src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
[clonfig.core :as clonfig]
[mount.core :refer [defstate]]
[ziggurat.util.java-util :as util])
(:import (java.util Properties))
(:import (java.util Properties)
[org.apache.kafka.common.config SaslConfigs])
(:gen-class
:methods [^{:static true} [get [String] Object]
^{:static true} [getIn [java.lang.Iterable] Object]]
Expand Down Expand Up @@ -79,6 +80,9 @@
(defn ziggurat-config []
(get config :ziggurat))

(defn ssl-config []
(get-in config [:ziggurat :ssl]))

(defn rabbitmq-config []
(get (ziggurat-config) :rabbit-mq))

Expand Down Expand Up @@ -142,7 +146,9 @@
:origin-topic
:poll-timeout-ms-config
:producer
:thread-count])
:thread-count
:enabled
:jaas])

(defn- to-list
[s]
Expand Down Expand Up @@ -173,9 +179,74 @@
(.setProperty p sk nv))))
p)

(def jaas-template
{"PLAIN" "org.apache.kafka.common.security.plain.PlainLoginModule"
"SCRAM-SHA-512" "org.apache.kafka.common.security.scram.ScramLoginModule"})

(defn create-jaas-properties
[user-name password mechanism]
(let [jaas-template (get jaas-template mechanism)]
(format "%s required username=\"%s\" password=\"%s\";" jaas-template user-name password)))

(defn- add-jaas-properties
[properties jaas-config]
(if (some? jaas-config)
(let [username (get jaas-config :username)
password (get jaas-config :password)
mechanism (get jaas-config :mechanism)]
(doto properties
(.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties username password mechanism))))
properties))

(defn build-ssl-properties
[properties set-property-fn ssl-config-map]
"Builds SSL properties from ssl-config-map which is a map where keys are
Clojure keywords in kebab case. These keys are converted to Kafka properties by set-property-fn.
SSL properties are set only if key sequence [:ziggurat :ssl :enabled] returns true.
Creates JAAS template if values are provided in the map provided agains this key sequence
[:ziggurat :ssl :jaas].
Example of a ssl-config-map
{:enabled true
:ssl-keystore-location <>
:ssl-keystore-password <>
{:jaas {:username <>
:password <>
:mechanism <>}}}
"
(let [ssl-configs-enabled (:enabled ssl-config-map)
jaas-config (get ssl-config-map :jaas)]
(if (true? ssl-configs-enabled)
(as-> properties pr
(add-jaas-properties pr jaas-config)
(reduce-kv set-property-fn pr ssl-config-map))
properties)))

(defn build-properties
[set-property-fn m]
(reduce-kv set-property-fn (Properties.) m))
"Builds Properties object from the provided config-map which is a map where keys are
Clojure keywords in kebab case. These keys are converted to Kafka properties by set-property-fn.
First, ssl properties are set using the config map returned by a call to `ssl-config`
Then, properties from the provided `config-map` are applied. `config-map` can carry properties for Streams,
Consumer or Producer APIs
The method allows individual Streams, Producer, Consumer APIs to override SSL configs
if ssl properties are provided in `config-map`
Example of a config-map for streams
{:auto-offset-reset :latest
:replication-factor 2
:group-id \"foo\"}
"
[set-property-fn config-map]
(as-> (Properties.) pr
(build-ssl-properties pr set-property-fn (ssl-config))
(reduce-kv set-property-fn pr config-map)))

(def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table)))

Expand Down
111 changes: 109 additions & 2 deletions test/ziggurat/config_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
rabbitmq-config
set-property
statsd-config
ziggurat-config]]
ziggurat-config
ssl-config
create-jaas-properties]]
[ziggurat.fixtures :as f])
(:import (java.util ArrayList Properties)))

Expand Down Expand Up @@ -239,7 +241,112 @@
not-found "NOT FOUND!"
v (.getProperty props string-key not-found)]
(is (= v not-found))))
config-map))))))
config-map))))
(testing "should set ssl properties for streams if enabled is set to true"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:auto-offset-reset :latest
:group-id "foo"}
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
group-id (.getProperty props "group.id")
ssl-ks-location (.getProperty props "ssl.keystore.location")
ssl-ks-password (.getProperty props "ssl.keystore.password")]
(is (= auto-offset-reset "latest"))
(is (= group-id "foo"))
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password")))))
(testing "should set ssl properties for consumer API if enabled is set to true"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:max-poll-records 500
:enable-auto-commit true}
props (build-consumer-config-properties streams-config-map)
max-poll-records (.getProperty props "max.poll.records")
enable-auto-comit (.getProperty props "enable.auto.commit")
ssl-ks-location (.getProperty props "ssl.keystore.location")
ssl-ks-password (.getProperty props "ssl.keystore.password")]
(is (= max-poll-records "500"))
(is (= enable-auto-comit "true"))
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password")))))
(testing "should set ssl properties for producer API if enabled is set to true"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:batch.size 500
:acks 1}
props (build-consumer-config-properties streams-config-map)
batch-size (.getProperty props "batch.size")
acks (.getProperty props "acks")
ssl-ks-location (.getProperty props "ssl.keystore.location")
ssl-ks-password (.getProperty props "ssl.keystore.password")]
(is (= batch-size "500"))
(is (= acks "1"))
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password")))))
(testing "should not set ssl properties for streams if eenabled is set to false"
(with-redefs [ssl-config (constantly {:enabled false
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:auto-offset-reset :latest
:group-id "foo"}
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
group-id (.getProperty props "group.id")
ssl-ks-location (.getProperty props "ssl.keystore.location")
ssl-ks-password (.getProperty props "ssl.keystore.password")]
(is (= auto-offset-reset "latest"))
(is (= group-id "foo"))
(is (nil? ssl-ks-location))
(is (nil? ssl-ks-password)))))
(testing "ssl properties from streams config map overrides the ssl properties provided in [:ziggurat :ssl]"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:auto-offset-reset :latest
:ssl-keystore-location "/some/different/location"
:ssl-keystore-password "different-password"}
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
ssl-ks-location (.getProperty props "ssl.keystore.location")
ssl-ks-password (.getProperty props "ssl.keystore.password")]
(is (= auto-offset-reset "latest"))
(is (= ssl-ks-location "/some/different/location"))
(is (= ssl-ks-password "different-password")))))
(testing "ssl properties create jaas template from the map provided in [:ziggurat :ssl :jaas]"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"
:jaas {:username "myuser"
:password "mypassword"
:mechanism "SCRAM-SHA-512"}})]
(let [streams-config-map {:auto-offset-reset :latest}
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
ssl-ks-location (.getProperty props "ssl.keystore.location")
ssl-ks-password (.getProperty props "ssl.keystore.password")
sasl-jaas-config (.getProperty props "sasl.jaas.config")]
(is (= auto-offset-reset "latest"))
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password"))
(is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "SCRAM-SHA-512"))))))
(testing "ssl properties DO NOT create jaas template if no value is provided for key sequence [:ziggurat :ssl :jaas]"
(with-redefs [ssl-config (constantly {:enabled true
:ssl-keystore-location "/some/location"
:ssl-keystore-password "some-password"})]
(let [streams-config-map {:auto-offset-reset :latest}
props (build-streams-config-properties streams-config-map)
auto-offset-reset (.getProperty props "auto.offset.reset")
ssl-ks-location (.getProperty props "ssl.keystore.location")
ssl-ks-password (.getProperty props "ssl.keystore.password")
sasl-jaas-config (.getProperty props "sasl.jaas.config")]
(is (= auto-offset-reset "latest"))
(is (= ssl-ks-location "/some/location"))
(is (= ssl-ks-password "some-password"))
(is (nil? sasl-jaas-config)))))))

(deftest test-set-property
(testing "set-property with empty (with spaces) value"
Expand Down

0 comments on commit c35f83b

Please sign in to comment.