From 31a6fc366a5f089be778a619d7a7f6943f992909 Mon Sep 17 00:00:00 2001 From: isomnath Date: Sat, 11 Sep 2021 11:33:23 +0530 Subject: [PATCH 1/5] Issue #243 | Add toggle based SSL configs support for kafka streams consumers and producers --- src/ziggurat/producer.clj | 5 ++- src/ziggurat/ssl/constants.clj | 15 +++++++ src/ziggurat/ssl/properties.clj | 44 ++++++++++++++++++ src/ziggurat/streams.clj | 5 ++- test/ziggurat/ssl/constants_test.clj | 10 +++++ test/ziggurat/ssl/properties_test.clj | 65 +++++++++++++++++++++++++++ 6 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 src/ziggurat/ssl/constants.clj create mode 100644 src/ziggurat/ssl/properties.clj create mode 100644 test/ziggurat/ssl/constants_test.clj create mode 100644 test/ziggurat/ssl/properties_test.clj diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj index 40c68cda..60be9316 100644 --- a/src/ziggurat/producer.clj +++ b/src/ziggurat/producer.clj @@ -46,7 +46,8 @@ [mount.core :refer [defstate]] [ziggurat.config :refer [build-producer-config-properties ziggurat-config]] [ziggurat.tracer :refer [tracer]] - [ziggurat.util.java-util :refer [get-key]]) + [ziggurat.util.java-util :refer [get-key]] + [ziggurat.ssl.properties :as ssl-properties]) (:import (io.opentracing.contrib.kafka TracingKafkaProducer) (org.apache.kafka.clients.producer KafkaProducer ProducerRecord)) (:gen-class @@ -58,7 +59,7 @@ (reduce (fn [producer-map [stream-config-key stream-config]] (let [producer-config (:producer stream-config)] (if (some? producer-config) - (assoc producer-map stream-config-key (build-producer-config-properties producer-config)) + (assoc producer-map stream-config-key (ssl-properties/build-ssl-properties (build-producer-config-properties producer-config))) producer-map))) {} (seq (:stream-router (ziggurat-config))))) diff --git a/src/ziggurat/ssl/constants.clj b/src/ziggurat/ssl/constants.clj new file mode 100644 index 00000000..e4d34b4e --- /dev/null +++ b/src/ziggurat/ssl/constants.clj @@ -0,0 +1,15 @@ +(ns ziggurat.ssl.constants) + +(def mechanism-plain "PLAIN") +(def mechanism-scram-512 "SCRAM-SHA-512") + +(def protocol-plaintext "PLAINTEXT") +(def protocol_sasl_ssl "SASL_SSL") + +(def jaas-template + {mechanism-plain "org.apache.kafka.common.security.plain.PlainLoginModule" + mechanism-scram-512 "org.apache.kafka.common.security.scram.ScramLoginModule"}) + +(defn get-jaas-template + [mechanism] + (get jaas-template mechanism)) diff --git a/src/ziggurat/ssl/properties.clj b/src/ziggurat/ssl/properties.clj new file mode 100644 index 00000000..47230560 --- /dev/null +++ b/src/ziggurat/ssl/properties.clj @@ -0,0 +1,44 @@ +(ns ziggurat.ssl.properties + (:require [ziggurat.config :refer [ziggurat-config]] + [ziggurat.ssl.constants :as const]) + (:import [org.apache.kafka.streams StreamsConfig] + [org.apache.kafka.common.config SslConfigs SaslConfigs])) + +(defn- ssl-configs + [key] + (get-in (ziggurat-config) [:ssl-config key])) + +(defn- validate-mechanism + [mechanism] + (when-not (contains? #{const/mechanism-plain const/mechanism-scram-512} mechanism) + (let [message (format "SSL mechanism can either be %s or %s" const/mechanism-plain const/mechanism-scram-512)] + (throw (IllegalArgumentException. message {:mechanism mechanism}))))) + +(defn- validate-protocol + [protocol] + (when-not (contains? #{const/protocol-plaintext const/protocol_sasl_ssl} protocol) + (let [message (format "protocol can either be %s or %s" const/protocol-plaintext const/protocol_sasl_ssl)] + (throw (ex-info message {:protocol protocol}))))) + +(defn- create-jaas-properties + [user-name password mechanism] + (validate-mechanism mechanism) + (let [jaas-template (const/get-jaas-template mechanism)] + (format "%s required username=\"%s\" password=\"%s\";" jaas-template user-name password))) + +(defn build-ssl-properties + [properties] + (let [enabled (ssl-configs :enabled) + user-name (ssl-configs :user-name) + password (ssl-configs :password) + protocol (ssl-configs :protocol) + mechanism (ssl-configs :mechanism) + endpoint-algorithm-config (ssl-configs :endpoint-algorithm-config)] + (if enabled + (do (validate-protocol protocol) + (doto properties + (.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties user-name password mechanism)) + (.put SaslConfigs/SASL_MECHANISM mechanism) + (.put SslConfigs/SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG endpoint-algorithm-config) + (.put StreamsConfig/SECURITY_PROTOCOL_CONFIG protocol))) + properties))) \ No newline at end of file diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index f79006af..38def93b 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -10,7 +10,8 @@ [ziggurat.timestamp-transformer :as timestamp-transformer] [ziggurat.tracer :refer [tracer]] [ziggurat.util.map :as umap] - [cambium.core :as clog]) + [cambium.core :as clog] + [ziggurat.ssl.properties :as ssl-properties]) (:import [io.opentracing.contrib.kafka TracingKafkaUtils] [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] [io.opentracing.tag Tags] @@ -209,7 +210,7 @@ (when-not (nil? top) (KafkaStreams. ^Topology top - ^Properties (properties stream-config) + ^Properties (ssl-properties/build-ssl-properties (properties stream-config)) (new TracingKafkaClientSupplier tracer))))) (defn- merge-consumer-type-config diff --git a/test/ziggurat/ssl/constants_test.clj b/test/ziggurat/ssl/constants_test.clj new file mode 100644 index 00000000..a5259534 --- /dev/null +++ b/test/ziggurat/ssl/constants_test.clj @@ -0,0 +1,10 @@ +(ns ziggurat.ssl.constants-test + (:require [clojure.test :refer :all] + [ziggurat.ssl.constants :as const])) + +(deftest get-jaas-template-test + (testing "should return jaas plain template when mechanism is PLAIN" + (is (= "org.apache.kafka.common.security.plain.PlainLoginModule" (const/get-jaas-template const/mechanism-plain)))) + + (testing "should return jaas scram 512 template when mechanism is SCRAM-SHA-512" + (is (= "org.apache.kafka.common.security.scram.ScramLoginModule" (const/get-jaas-template const/mechanism-scram-512))))) diff --git a/test/ziggurat/ssl/properties_test.clj b/test/ziggurat/ssl/properties_test.clj new file mode 100644 index 00000000..a4111f0b --- /dev/null +++ b/test/ziggurat/ssl/properties_test.clj @@ -0,0 +1,65 @@ +(ns ziggurat.ssl.properties-test + (:require [clojure.test :refer :all] + [ziggurat.fixtures :as fix] + [ziggurat.config :refer [ziggurat-config]] + [ziggurat.ssl.properties :refer [build-ssl-properties]]) + (:import [java.util Properties] + [org.apache.kafka.streams StreamsConfig] + [org.apache.kafka.common.config SslConfigs SaslConfigs])) + +(use-fixtures :once fix/mount-only-config) + +(deftest build-ssl-properties-test + (let [config (ziggurat-config) + properties (Properties.)] + (testing "should return properties when ziggurat.ssl-config is not present" + (is (= properties (build-ssl-properties properties)))) + + (testing "should return properties when ziggurat.ssl-config is not enabled" + (let [ssl-configs {:enabled true + :user-name "test-user" + :password "test-password" + :protocol "SASL_SSL" + :mechanism "SCRAM-SHA-512" + :endpoint-algorithm-config "https"}] + (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] + (is (= properties (build-ssl-properties properties)))))) + + (testing "should throw exception when ziggurat.ssl protocol is not supported" + (let [ssl-configs {:enabled true + :user-name "test-user" + :password "test-password" + :protocol "INVALID_PROTOCOL" + :mechanism "SCRAM-SHA-512" + :endpoint-algorithm-config "https"} + exception-message #"protocol can either be PLAINTEXT or SASL_SSL" + exception-cause {:protocol "INVALID_PROTOCOL"}] + (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] + (is (thrown? Exception exception-message exception-cause (build-ssl-properties properties)))))) + + (testing "should throw exception when ziggurat.ssl mechanism is not supported" + (let [ssl-configs {:enabled true + :user-name "test-user" + :password "test-password" + :protocol "SASL_SSL" + :mechanism "INVALID_MECHANISM" + :endpoint-algorithm-config "https"} + exception-message #"SSL mechanism can either be PLAIN or SCRAM-SHA-512" + exception-cause {:mechanism "INVALID_MECHANISM"}] + (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] + (is (thrown? Exception exception-message exception-cause (build-ssl-properties properties)))))) + + (testing "should build and return properties with SSL configs" + (let [ssl-configs {:enabled true + :user-name "test-user" + :password "test-password" + :protocol "SASL_SSL" + :mechanism "SCRAM-SHA-512" + :endpoint-algorithm-config "https"} + expected-properties (doto properties + (.put SaslConfigs/SASL_JAAS_CONFIG "org.apache.kafka.common.security.scram.ScramLoginModule required username=test-user password=test-password") + (.put SaslConfigs/SASL_MECHANISM "SCRAM-SHA-512") + (.put SslConfigs/SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG "https") + (.put StreamsConfig/SECURITY_PROTOCOL_CONFIG "SASL_SSL"))] + (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] + (is (= expected-properties (build-ssl-properties properties)))))))) From 95b0f1a3221347b0ae2c74cd1c0bbbe5ff5e1aa3 Mon Sep 17 00:00:00 2001 From: isomnath Date: Sat, 18 Sep 2021 20:32:59 +0530 Subject: [PATCH 2/5] Issue #243 | Remove SSL Mechanism and Protocol validations --- src/ziggurat/ssl/constants.clj | 15 ----------- src/ziggurat/ssl/properties.clj | 37 +++++++++++---------------- test/ziggurat/ssl/constants_test.clj | 10 -------- test/ziggurat/ssl/properties_test.clj | 24 ----------------- 4 files changed, 15 insertions(+), 71 deletions(-) delete mode 100644 src/ziggurat/ssl/constants.clj delete mode 100644 test/ziggurat/ssl/constants_test.clj diff --git a/src/ziggurat/ssl/constants.clj b/src/ziggurat/ssl/constants.clj deleted file mode 100644 index e4d34b4e..00000000 --- a/src/ziggurat/ssl/constants.clj +++ /dev/null @@ -1,15 +0,0 @@ -(ns ziggurat.ssl.constants) - -(def mechanism-plain "PLAIN") -(def mechanism-scram-512 "SCRAM-SHA-512") - -(def protocol-plaintext "PLAINTEXT") -(def protocol_sasl_ssl "SASL_SSL") - -(def jaas-template - {mechanism-plain "org.apache.kafka.common.security.plain.PlainLoginModule" - mechanism-scram-512 "org.apache.kafka.common.security.scram.ScramLoginModule"}) - -(defn get-jaas-template - [mechanism] - (get jaas-template mechanism)) diff --git a/src/ziggurat/ssl/properties.clj b/src/ziggurat/ssl/properties.clj index 47230560..7632d9c7 100644 --- a/src/ziggurat/ssl/properties.clj +++ b/src/ziggurat/ssl/properties.clj @@ -1,29 +1,23 @@ (ns ziggurat.ssl.properties - (:require [ziggurat.config :refer [ziggurat-config]] - [ziggurat.ssl.constants :as const]) + (:require [ziggurat.config :refer [ziggurat-config]]) (:import [org.apache.kafka.streams StreamsConfig] [org.apache.kafka.common.config SslConfigs SaslConfigs])) -(defn- ssl-configs - [key] - (get-in (ziggurat-config) [:ssl-config key])) +(def jaas-template + {"PLAIN" "org.apache.kafka.common.security.plain.PlainLoginModule" + "SCRAM-SHA-512" "org.apache.kafka.common.security.scram.ScramLoginModule"}) -(defn- validate-mechanism +(defn- get-jaas-template [mechanism] - (when-not (contains? #{const/mechanism-plain const/mechanism-scram-512} mechanism) - (let [message (format "SSL mechanism can either be %s or %s" const/mechanism-plain const/mechanism-scram-512)] - (throw (IllegalArgumentException. message {:mechanism mechanism}))))) + (get jaas-template mechanism)) -(defn- validate-protocol - [protocol] - (when-not (contains? #{const/protocol-plaintext const/protocol_sasl_ssl} protocol) - (let [message (format "protocol can either be %s or %s" const/protocol-plaintext const/protocol_sasl_ssl)] - (throw (ex-info message {:protocol protocol}))))) +(defn- ssl-configs + [key] + (get-in (ziggurat-config) [:ssl-config key])) (defn- create-jaas-properties [user-name password mechanism] - (validate-mechanism mechanism) - (let [jaas-template (const/get-jaas-template mechanism)] + (let [jaas-template (get-jaas-template mechanism)] (format "%s required username=\"%s\" password=\"%s\";" jaas-template user-name password))) (defn build-ssl-properties @@ -35,10 +29,9 @@ mechanism (ssl-configs :mechanism) endpoint-algorithm-config (ssl-configs :endpoint-algorithm-config)] (if enabled - (do (validate-protocol protocol) - (doto properties - (.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties user-name password mechanism)) - (.put SaslConfigs/SASL_MECHANISM mechanism) - (.put SslConfigs/SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG endpoint-algorithm-config) - (.put StreamsConfig/SECURITY_PROTOCOL_CONFIG protocol))) + (doto properties + (.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties user-name password mechanism)) + (.put SaslConfigs/SASL_MECHANISM mechanism) + (.put SslConfigs/SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG endpoint-algorithm-config) + (.put StreamsConfig/SECURITY_PROTOCOL_CONFIG protocol)) properties))) \ No newline at end of file diff --git a/test/ziggurat/ssl/constants_test.clj b/test/ziggurat/ssl/constants_test.clj deleted file mode 100644 index a5259534..00000000 --- a/test/ziggurat/ssl/constants_test.clj +++ /dev/null @@ -1,10 +0,0 @@ -(ns ziggurat.ssl.constants-test - (:require [clojure.test :refer :all] - [ziggurat.ssl.constants :as const])) - -(deftest get-jaas-template-test - (testing "should return jaas plain template when mechanism is PLAIN" - (is (= "org.apache.kafka.common.security.plain.PlainLoginModule" (const/get-jaas-template const/mechanism-plain)))) - - (testing "should return jaas scram 512 template when mechanism is SCRAM-SHA-512" - (is (= "org.apache.kafka.common.security.scram.ScramLoginModule" (const/get-jaas-template const/mechanism-scram-512))))) diff --git a/test/ziggurat/ssl/properties_test.clj b/test/ziggurat/ssl/properties_test.clj index a4111f0b..4339ef6c 100644 --- a/test/ziggurat/ssl/properties_test.clj +++ b/test/ziggurat/ssl/properties_test.clj @@ -25,30 +25,6 @@ (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] (is (= properties (build-ssl-properties properties)))))) - (testing "should throw exception when ziggurat.ssl protocol is not supported" - (let [ssl-configs {:enabled true - :user-name "test-user" - :password "test-password" - :protocol "INVALID_PROTOCOL" - :mechanism "SCRAM-SHA-512" - :endpoint-algorithm-config "https"} - exception-message #"protocol can either be PLAINTEXT or SASL_SSL" - exception-cause {:protocol "INVALID_PROTOCOL"}] - (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] - (is (thrown? Exception exception-message exception-cause (build-ssl-properties properties)))))) - - (testing "should throw exception when ziggurat.ssl mechanism is not supported" - (let [ssl-configs {:enabled true - :user-name "test-user" - :password "test-password" - :protocol "SASL_SSL" - :mechanism "INVALID_MECHANISM" - :endpoint-algorithm-config "https"} - exception-message #"SSL mechanism can either be PLAIN or SCRAM-SHA-512" - exception-cause {:mechanism "INVALID_MECHANISM"}] - (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] - (is (thrown? Exception exception-message exception-cause (build-ssl-properties properties)))))) - (testing "should build and return properties with SSL configs" (let [ssl-configs {:enabled true :user-name "test-user" From 2b5957dd6134fb13f3e90f4bafd46b4cc6938ef1 Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Tue, 5 Oct 2021 16:05:08 +0530 Subject: [PATCH 3/5] moved the ssl properties building code inside config.clj and added tests cases --- src/ziggurat/config.clj | 79 +++++++++++++++++- src/ziggurat/producer.clj | 5 +- src/ziggurat/ssl/properties.clj | 37 --------- src/ziggurat/streams.clj | 5 +- test/ziggurat/config_test.clj | 111 +++++++++++++++++++++++++- test/ziggurat/ssl/properties_test.clj | 41 ---------- 6 files changed, 188 insertions(+), 90 deletions(-) delete mode 100644 src/ziggurat/ssl/properties.clj delete mode 100644 test/ziggurat/ssl/properties_test.clj diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index e1f00e0d..3c31d9fb 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -5,7 +5,8 @@ [clonfig.core :as clonfig] [mount.core :refer [defstate]] [ziggurat.util.java-util :as util]) - (:import (java.util Properties)) + (:import (java.util Properties) + [org.apache.kafka.common.config SaslConfigs]) (:gen-class :methods [^{:static true} [get [String] Object] ^{:static true} [getIn [java.lang.Iterable] Object]] @@ -79,6 +80,9 @@ (defn ziggurat-config [] (get config :ziggurat)) +(defn ssl-config [] + (get-in config [:ziggurat :ssl])) + (defn rabbitmq-config [] (get (ziggurat-config) :rabbit-mq)) @@ -142,7 +146,9 @@ :origin-topic :poll-timeout-ms-config :producer - :thread-count]) + :thread-count + :enabled + :jaas]) (defn- to-list [s] @@ -173,9 +179,74 @@ (.setProperty p sk nv)))) p) +(def jaas-template + {"PLAIN" "org.apache.kafka.common.security.plain.PlainLoginModule" + "SCRAM-SHA-512" "org.apache.kafka.common.security.scram.ScramLoginModule"}) + +(defn create-jaas-properties + [user-name password mechanism] + (let [jaas-template (get jaas-template mechanism)] + (format "%s required username=\"%s\" password=\"%s\";" jaas-template user-name password))) + +(defn- add-jaas-properties + [properties jaas-config] + (if (some? jaas-config) + (let [username (get jaas-config :username) + password (get jaas-config :password) + mechanism (get jaas-config :mechanism)] + (doto properties + (.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties username password mechanism)))) + properties)) + +(defn build-ssl-properties + [properties set-property-fn ssl-config-map] + "Builds SSL properties from ssl-config-map which is a map where keys are + Clojure keywords in kebab case. These keys are converted to Kafka properties by set-property-fn. + + SSL properties are set only if key sequence [:ziggurat :ssl :enabled] returns true. + + Creates JAAS template if values are provided in the map provided agains this key sequence + [:ziggurat :ssl :jaas]. + + Example of a ssl-config-map + + {:enabled true + :ssl-keystore-location <> + :ssl-keystore-password <> + {:jaas {:username <> + :password <> + :mechanism <>}}} + " + (let [ssl-configs-enabled (:enabled ssl-config-map) + jaas-config (get ssl-config-map :jaas)] + (if (true? ssl-configs-enabled) + (as-> properties pr + (add-jaas-properties pr jaas-config) + (reduce-kv set-property-fn pr ssl-config-map)) + properties))) + (defn build-properties - [set-property-fn m] - (reduce-kv set-property-fn (Properties.) m)) + "Builds Properties object from the provided config-map which is a map where keys are + Clojure keywords in kebab case. These keys are converted to Kafka properties by set-property-fn. + + First, ssl properties are set using the config map returned by a call to `ssl-config` + + Then, properties from the provided `config-map` are applied. `config-map` can carry properties for Streams, + Consumer or Producer APIs + + The method allows individual Streams, Producer, Consumer APIs to override SSL configs + if ssl properties are provided in `config-map` + + Example of a config-map for streams + {:auto-offset-reset :latest + :replication-factor 2 + :group-id \"foo\"} + + " + [set-property-fn config-map] + (as-> (Properties.) pr + (build-ssl-properties pr set-property-fn (ssl-config)) + (reduce-kv set-property-fn pr config-map))) (def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table))) diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj index 60be9316..40c68cda 100644 --- a/src/ziggurat/producer.clj +++ b/src/ziggurat/producer.clj @@ -46,8 +46,7 @@ [mount.core :refer [defstate]] [ziggurat.config :refer [build-producer-config-properties ziggurat-config]] [ziggurat.tracer :refer [tracer]] - [ziggurat.util.java-util :refer [get-key]] - [ziggurat.ssl.properties :as ssl-properties]) + [ziggurat.util.java-util :refer [get-key]]) (:import (io.opentracing.contrib.kafka TracingKafkaProducer) (org.apache.kafka.clients.producer KafkaProducer ProducerRecord)) (:gen-class @@ -59,7 +58,7 @@ (reduce (fn [producer-map [stream-config-key stream-config]] (let [producer-config (:producer stream-config)] (if (some? producer-config) - (assoc producer-map stream-config-key (ssl-properties/build-ssl-properties (build-producer-config-properties producer-config))) + (assoc producer-map stream-config-key (build-producer-config-properties producer-config)) producer-map))) {} (seq (:stream-router (ziggurat-config))))) diff --git a/src/ziggurat/ssl/properties.clj b/src/ziggurat/ssl/properties.clj deleted file mode 100644 index 7632d9c7..00000000 --- a/src/ziggurat/ssl/properties.clj +++ /dev/null @@ -1,37 +0,0 @@ -(ns ziggurat.ssl.properties - (:require [ziggurat.config :refer [ziggurat-config]]) - (:import [org.apache.kafka.streams StreamsConfig] - [org.apache.kafka.common.config SslConfigs SaslConfigs])) - -(def jaas-template - {"PLAIN" "org.apache.kafka.common.security.plain.PlainLoginModule" - "SCRAM-SHA-512" "org.apache.kafka.common.security.scram.ScramLoginModule"}) - -(defn- get-jaas-template - [mechanism] - (get jaas-template mechanism)) - -(defn- ssl-configs - [key] - (get-in (ziggurat-config) [:ssl-config key])) - -(defn- create-jaas-properties - [user-name password mechanism] - (let [jaas-template (get-jaas-template mechanism)] - (format "%s required username=\"%s\" password=\"%s\";" jaas-template user-name password))) - -(defn build-ssl-properties - [properties] - (let [enabled (ssl-configs :enabled) - user-name (ssl-configs :user-name) - password (ssl-configs :password) - protocol (ssl-configs :protocol) - mechanism (ssl-configs :mechanism) - endpoint-algorithm-config (ssl-configs :endpoint-algorithm-config)] - (if enabled - (doto properties - (.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties user-name password mechanism)) - (.put SaslConfigs/SASL_MECHANISM mechanism) - (.put SslConfigs/SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG endpoint-algorithm-config) - (.put StreamsConfig/SECURITY_PROTOCOL_CONFIG protocol)) - properties))) \ No newline at end of file diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 38def93b..f79006af 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -10,8 +10,7 @@ [ziggurat.timestamp-transformer :as timestamp-transformer] [ziggurat.tracer :refer [tracer]] [ziggurat.util.map :as umap] - [cambium.core :as clog] - [ziggurat.ssl.properties :as ssl-properties]) + [cambium.core :as clog]) (:import [io.opentracing.contrib.kafka TracingKafkaUtils] [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] [io.opentracing.tag Tags] @@ -210,7 +209,7 @@ (when-not (nil? top) (KafkaStreams. ^Topology top - ^Properties (ssl-properties/build-ssl-properties (properties stream-config)) + ^Properties (properties stream-config) (new TracingKafkaClientSupplier tracer))))) (defn- merge-consumer-type-config diff --git a/test/ziggurat/config_test.clj b/test/ziggurat/config_test.clj index e021b72a..71ebccf3 100644 --- a/test/ziggurat/config_test.clj +++ b/test/ziggurat/config_test.clj @@ -18,7 +18,9 @@ rabbitmq-config set-property statsd-config - ziggurat-config]] + ziggurat-config + ssl-config + create-jaas-properties]] [ziggurat.fixtures :as f]) (:import (java.util ArrayList Properties))) @@ -239,7 +241,112 @@ not-found "NOT FOUND!" v (.getProperty props string-key not-found)] (is (= v not-found)))) - config-map)))))) + config-map)))) + (testing "should set ssl properties for streams if enabled is set to true" + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] + (let [streams-config-map {:auto-offset-reset :latest + :group-id "foo"} + props (build-streams-config-properties streams-config-map) + auto-offset-reset (.getProperty props "auto.offset.reset") + group-id (.getProperty props "group.id") + ssl-ks-location (.getProperty props "ssl.keystore.location") + ssl-ks-password (.getProperty props "ssl.keystore.password")] + (is (= auto-offset-reset "latest")) + (is (= group-id "foo")) + (is (= ssl-ks-location "/some/location")) + (is (= ssl-ks-password "some-password"))))) + (testing "should set ssl properties for consumer API if enabled is set to true" + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] + (let [streams-config-map {:max-poll-records 500 + :enable-auto-commit true} + props (build-consumer-config-properties streams-config-map) + max-poll-records (.getProperty props "max.poll.records") + enable-auto-comit (.getProperty props "enable.auto.commit") + ssl-ks-location (.getProperty props "ssl.keystore.location") + ssl-ks-password (.getProperty props "ssl.keystore.password")] + (is (= max-poll-records "500")) + (is (= enable-auto-comit "true")) + (is (= ssl-ks-location "/some/location")) + (is (= ssl-ks-password "some-password"))))) + (testing "should set ssl properties for producer API if enabled is set to true" + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] + (let [streams-config-map {:batch.size 500 + :acks 1} + props (build-consumer-config-properties streams-config-map) + batch-size (.getProperty props "batch.size") + acks (.getProperty props "acks") + ssl-ks-location (.getProperty props "ssl.keystore.location") + ssl-ks-password (.getProperty props "ssl.keystore.password")] + (is (= batch-size "500")) + (is (= acks "1")) + (is (= ssl-ks-location "/some/location")) + (is (= ssl-ks-password "some-password"))))) + (testing "should not set ssl properties for streams if eenabled is set to false" + (with-redefs [ssl-config (constantly {:enabled false + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] + (let [streams-config-map {:auto-offset-reset :latest + :group-id "foo"} + props (build-streams-config-properties streams-config-map) + auto-offset-reset (.getProperty props "auto.offset.reset") + group-id (.getProperty props "group.id") + ssl-ks-location (.getProperty props "ssl.keystore.location") + ssl-ks-password (.getProperty props "ssl.keystore.password")] + (is (= auto-offset-reset "latest")) + (is (= group-id "foo")) + (is (nil? ssl-ks-location)) + (is (nil? ssl-ks-password))))) + (testing "ssl properties from streams config map overrides the ssl properties provided in [:ziggurat :ssl]" + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] + (let [streams-config-map {:auto-offset-reset :latest + :ssl-keystore-location "/some/different/location" + :ssl-keystore-password "different-password"} + props (build-streams-config-properties streams-config-map) + auto-offset-reset (.getProperty props "auto.offset.reset") + ssl-ks-location (.getProperty props "ssl.keystore.location") + ssl-ks-password (.getProperty props "ssl.keystore.password")] + (is (= auto-offset-reset "latest")) + (is (= ssl-ks-location "/some/different/location")) + (is (= ssl-ks-password "different-password"))))) + (testing "ssl properties create jaas template from the map provided in [:ziggurat :ssl :jaas]" + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password" + :jaas {:username "myuser" + :password "mypassword" + :mechanism "SCRAM-SHA-512"}})] + (let [streams-config-map {:auto-offset-reset :latest} + props (build-streams-config-properties streams-config-map) + auto-offset-reset (.getProperty props "auto.offset.reset") + ssl-ks-location (.getProperty props "ssl.keystore.location") + ssl-ks-password (.getProperty props "ssl.keystore.password") + sasl-jaas-config (.getProperty props "sasl.jaas.config")] + (is (= auto-offset-reset "latest")) + (is (= ssl-ks-location "/some/location")) + (is (= ssl-ks-password "some-password")) + (is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "SCRAM-SHA-512")))))) + (testing "ssl properties DO NOT create jaas template if no value is provided for key sequence [:ziggurat :ssl :jaas]" + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password"})] + (let [streams-config-map {:auto-offset-reset :latest} + props (build-streams-config-properties streams-config-map) + auto-offset-reset (.getProperty props "auto.offset.reset") + ssl-ks-location (.getProperty props "ssl.keystore.location") + ssl-ks-password (.getProperty props "ssl.keystore.password") + sasl-jaas-config (.getProperty props "sasl.jaas.config")] + (is (= auto-offset-reset "latest")) + (is (= ssl-ks-location "/some/location")) + (is (= ssl-ks-password "some-password")) + (is (nil? sasl-jaas-config))))))) (deftest test-set-property (testing "set-property with empty (with spaces) value" diff --git a/test/ziggurat/ssl/properties_test.clj b/test/ziggurat/ssl/properties_test.clj deleted file mode 100644 index 4339ef6c..00000000 --- a/test/ziggurat/ssl/properties_test.clj +++ /dev/null @@ -1,41 +0,0 @@ -(ns ziggurat.ssl.properties-test - (:require [clojure.test :refer :all] - [ziggurat.fixtures :as fix] - [ziggurat.config :refer [ziggurat-config]] - [ziggurat.ssl.properties :refer [build-ssl-properties]]) - (:import [java.util Properties] - [org.apache.kafka.streams StreamsConfig] - [org.apache.kafka.common.config SslConfigs SaslConfigs])) - -(use-fixtures :once fix/mount-only-config) - -(deftest build-ssl-properties-test - (let [config (ziggurat-config) - properties (Properties.)] - (testing "should return properties when ziggurat.ssl-config is not present" - (is (= properties (build-ssl-properties properties)))) - - (testing "should return properties when ziggurat.ssl-config is not enabled" - (let [ssl-configs {:enabled true - :user-name "test-user" - :password "test-password" - :protocol "SASL_SSL" - :mechanism "SCRAM-SHA-512" - :endpoint-algorithm-config "https"}] - (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] - (is (= properties (build-ssl-properties properties)))))) - - (testing "should build and return properties with SSL configs" - (let [ssl-configs {:enabled true - :user-name "test-user" - :password "test-password" - :protocol "SASL_SSL" - :mechanism "SCRAM-SHA-512" - :endpoint-algorithm-config "https"} - expected-properties (doto properties - (.put SaslConfigs/SASL_JAAS_CONFIG "org.apache.kafka.common.security.scram.ScramLoginModule required username=test-user password=test-password") - (.put SaslConfigs/SASL_MECHANISM "SCRAM-SHA-512") - (.put SslConfigs/SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG "https") - (.put StreamsConfig/SECURITY_PROTOCOL_CONFIG "SASL_SSL"))] - (with-redefs [ziggurat-config (fn [] (assoc-in config [:ssl-config] ssl-configs))] - (is (= expected-properties (build-ssl-properties properties)))))))) From 32adba6803c391b1414c201fd33b5510132ec1c2 Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Tue, 5 Oct 2021 16:06:13 +0530 Subject: [PATCH 4/5] linting fixes --- src/ziggurat/config.clj | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index 3c31d9fb..26ac217e 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -221,8 +221,8 @@ jaas-config (get ssl-config-map :jaas)] (if (true? ssl-configs-enabled) (as-> properties pr - (add-jaas-properties pr jaas-config) - (reduce-kv set-property-fn pr ssl-config-map)) + (add-jaas-properties pr jaas-config) + (reduce-kv set-property-fn pr ssl-config-map)) properties))) (defn build-properties @@ -245,8 +245,8 @@ " [set-property-fn config-map] (as-> (Properties.) pr - (build-ssl-properties pr set-property-fn (ssl-config)) - (reduce-kv set-property-fn pr config-map))) + (build-ssl-properties pr set-property-fn (ssl-config)) + (reduce-kv set-property-fn pr config-map))) (def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table))) From 65e5bc0011d30c3e1b6851522d0ed577bb331557 Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Tue, 5 Oct 2021 16:47:39 +0530 Subject: [PATCH 5/5] fixing a unit tests for ssl config with producer API --- test/ziggurat/config_test.clj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/ziggurat/config_test.clj b/test/ziggurat/config_test.clj index 71ebccf3..1337bdd4 100644 --- a/test/ziggurat/config_test.clj +++ b/test/ziggurat/config_test.clj @@ -8,6 +8,7 @@ build-properties build-consumer-config-properties build-streams-config-properties + build-producer-config-properties channel-retry-config config config-file config-from-env @@ -278,7 +279,7 @@ :ssl-keystore-password "some-password"})] (let [streams-config-map {:batch.size 500 :acks 1} - props (build-consumer-config-properties streams-config-map) + props (build-producer-config-properties streams-config-map) batch-size (.getProperty props "batch.size") acks (.getProperty props "acks") ssl-ks-location (.getProperty props "ssl.keystore.location")