From 5059a238df182471662be4709917597ebf413075 Mon Sep 17 00:00:00 2001 From: mjayprateek Date: Mon, 17 Jun 2019 11:57:45 +0530 Subject: [PATCH] Multiple Kafka producers support in ziggurat (#55) * Enabling producing support in Ziggurat * Extended producing support to multiple producers * User can define producer config inside a stream-router config * User can publish data to any producer by calling send method with the stream-router config key against which producer config is defined. * Few unit tests --- README.md | 38 +++++++++- project.clj | 3 +- resources/config.test.ci.edn | 31 +++++--- resources/config.test.edn | 31 +++++--- src/ziggurat/init.clj | 27 +++++-- src/ziggurat/producer.clj | 124 ++++++++++++++++++++++++++++++++ test/ziggurat/fixtures.clj | 41 ++++++++++- test/ziggurat/init_test.clj | 37 +++++++++- test/ziggurat/producer_test.clj | 68 ++++++++++++++++++ 9 files changed, 373 insertions(+), 27 deletions(-) create mode 100644 src/ziggurat/producer.clj create mode 100644 test/ziggurat/producer_test.clj diff --git a/README.md b/README.md index 35249902..69dc95e7 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,24 @@ There are four modes supported by ziggurat You can pass in multiple modes and it will start accordingly If nothing passed to modes then it will start all the modes. +## Publishing data to Kafka Topics in Ziggurat +To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`. + +At the time of initialization, an instance of `org.apache.kafka.clients.producer.KafkaProducer` is constructed using config values provided in `resources/config.edn`. A producer can be configured for each of the stream-routes in config.edn. Please see the example below. + +At present, only a few configurations are supported for constructing KafkaProducer. These have been explained [here](#configuration). Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs) +for a complete list of all producer configs available in Kafka. + +Ziggurat.producer namespace defines a multi-arity `send` function which is a thin wrapper around [KafkaProducer#send](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-). This method publishes data to a Kafka topic through a Kafka producer +defined in the stream router configuration. See configuration section below. + +E.g. +For publishing data using a producer which is defined for the stream router config with key `:default`, use send like this: + +`(send :default "test-topic" "key" "value")` + +`(send :default "test-topic" 1 "key" "value")` + ## Configuration All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key. @@ -151,7 +169,14 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur :origin-topic "kafka-topic-*" :oldest-processed-message-in-s [604800 :int] :proto-class "proto-class" - :changelog-topic-replication-factor [3 :int]}} + :changelog-topic-replication-factor [3 :int] + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}} :datadog {:host "localhost" :port [8125 :int] :enabled [false :bool]} @@ -179,7 +204,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} :http-server {:port [8010 :int] - :thread-count [100 :int]}}} + :thread-count [100 :int]}}} ``` * app-name - Refers to the name of the application. Used to namespace queues and metrics. * nrepl-server - Port on which the repl server will be hosted @@ -192,6 +217,15 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur * oldest-processed-messages-in-s - The oldest message which will be processed by stream in second. By default the value is 604800 (1 week) * proto-class - The proto-class of the message so that it can be decompiled before being passed to the mapper function * changelog-topic-replication-factor - the internal changelog topic replication factor. By default the value is 3 + * producer - Configuration for KafkaProducer. Currently, only following options are supported. Please see [Producer Configs](https://kafka.apache.org/documentation/#producerconfigs) for detailed explanation for each of the configuration parameters. + * bootstrap.servers - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + * acks - The number of acknowledgments the producer requires the leader to have received before considering a request complete. Valid values are [all, -1, 0, 1]. + * retries - Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. + * key.serializer - Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface. + * value.serializer - Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. + * max.in.flight.requests.per.connection - The maximum number of unacknowledged requests the client will send on a single connection before blocking. + * enable.idempotence - When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. + * datadog - The statsd host and port that metrics should be sent to, although the key name is datadog, it supports statsd as well to send metrics. * sentry - Whenever a :failure keyword is returned from the mapper-function or an exception is raised while executing the mapper-function, an event is sent to sentry. You can skip this flow by disabling it. * rabbit-mq-connection - The details required to make a connection to rabbitmq. We use rabbitmq for the retry mechanism. diff --git a/project.clj b/project.clj index 4a685bf7..90c678f5 100644 --- a/project.clj +++ b/project.clj @@ -49,7 +49,8 @@ [junit/junit "4.12"] [org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] [org.apache.kafka/kafka-clients "1.1.1" :classifier "test"] - [org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]] + [org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"] + [org.clojure/test.check "0.9.0"]] :plugins [[lein-cloverage "1.0.13"]] :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} :dev {:plugins [[jonase/eastwood "0.2.6"] diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index c56c543c..39962a8c 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -28,11 +28,26 @@ :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "application-name-test-02-multiple" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "kafka-topic-*" - :proto-class "com.company.LogMessage" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}}}} \ No newline at end of file + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}} + :without-producer {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index 5261129a..9609011b 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -28,11 +28,26 @@ :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "application-name-test-02-multiple" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "kafka-topic-*" - :proto-class "com.company.LogMessage" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}}}} \ No newline at end of file + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}} + :without-producer {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}}} diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 1fb5e741..023c84f3 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -11,7 +11,8 @@ [ziggurat.nrepl-server :as nrepl-server] [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.server :as server] - [ziggurat.streams :as streams])) + [ziggurat.streams :as streams] + [ziggurat.producer :as producer :refer [kafka-producers]])) (defstate statsd-reporter :start (metrics/start-statsd-reporter (:datadog (ziggurat-config)) @@ -38,9 +39,16 @@ (start-rabbitmq-connection args) (messaging-producer/make-queues (get args :stream-routes))) +(defn start-kafka-producers [] + (start* #{#'kafka-producers})) + +(defn start-kafka-streams [args] + (start* #{#'streams/stream} args)) + (defn start-stream [args] + (start-kafka-producers) (start-rabbitmq-producers args) - (start* #{#'streams/stream} args)) + (start-kafka-streams args)) (defn start-management-apis [args] (start-rabbitmq-connection args) @@ -51,22 +59,31 @@ (start* #{#'server/server} args)) (defn start-workers [args] + (start-kafka-producers) (start-rabbitmq-producers args) (start-rabbitmq-consumers args)) (defn- stop-rabbitmq-connection [] (mount/stop #'messaging-connection/connection)) +(defn stop-kafka-producers [] + (mount/stop #'kafka-producers)) + +(defn stop-kafka-streams [] + (mount/stop #'streams/stream)) + (defn stop-workers [] - (stop-rabbitmq-connection)) + (stop-rabbitmq-connection) + (stop-kafka-producers)) (defn stop-server [] (mount/stop #'server/server) (stop-rabbitmq-connection)) (defn stop-stream [] - (mount/stop #'streams/stream) - (stop-rabbitmq-connection)) + (stop-kafka-streams) + (stop-rabbitmq-connection) + (stop-kafka-producers)) (defn stop-management-apis [] (mount/stop #'server/server) diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj new file mode 100644 index 00000000..b4ee344b --- /dev/null +++ b/src/ziggurat/producer.clj @@ -0,0 +1,124 @@ +(ns ziggurat.producer + "This namespace defines methods for publishing data to + Kafka topics. The methods defined here are essentially wrapper + around variants of `send` methods defined in + `org.apache.kafka.clients.producer.KafkaProducer`. + + At the time of initialization, an instance of + `org.apache.kafka.clients.producer.KafkaProducer` + is constructed using config values provided in `resources/config.edn`. + + A producer can be configured for each of the stream-routes + in config.edn. Please see the example below + + ` + :stream-router {:default {:application-id \"test\"\n + :bootstrap-servers \"localhost:9092\"\n + :stream-threads-count [1 :int]\n + :origin-topic \"topic\"\n + :proto-class \"flatland.protobuf.test.Example$Photo\"\n + :channels {:channel-1 {:worker-count [10 :int]\n :retry {:count [5 :int]\n :enabled [true :bool]}}}\n + :producer {:bootstrap-servers \"localhost:9092\"\n + :acks \"all\"\n + :retries-config 5\n + :max-in-flight-requests-per-connection 5\n + :enable-idempotence false\n + :value-serializer \"org.apache.kafka.common.serialization.StringSerializer\"\n + :key-serializer \"org.apache.kafka.common.serialization.StringSerializer\"}} + ` + + Usage: + + ` + Please see `send` for publishing data via Kafka producers + ` + + These are the KafkaProducer configs currenlty supported in Ziggurat. + - bootstrap.servers + - acks + - retries + - key.serializer + - value.serializer + - max.in.flight.requests.per.connection + - enable.idempotencecd + + Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs) + for a complete list of all producer configs available in Kafka." + + (:require [ziggurat.config :refer [ziggurat-config]] + [clojure.tools.logging :as log] + [mount.core :refer [defstate]]) + (:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig) + (java.util Properties))) + +(defn- producer-properties-from-config [{:keys [bootstrap-servers + acks + key-serializer + value-serializer + enable-idempotence + retries-config + max-in-flight-requests-per-connection]}] + (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers) + (.put ProducerConfig/ACKS_CONFIG acks) + (.put ProducerConfig/RETRIES_CONFIG (int retries-config)) + (.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence) + (.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION (int max-in-flight-requests-per-connection)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer) + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer))) + +(defn producer-properties-map [] + (reduce (fn [producer-map [stream-config-key stream-config]] + (let [producer-config (:producer stream-config)] + (if (some? producer-config) + (assoc producer-map stream-config-key (producer-properties-from-config producer-config)) + producer-map))) + {} + (seq (:stream-router (ziggurat-config))))) + +(defstate kafka-producers + :start (if (not-empty (producer-properties-map)) + (do (log/info "Starting Kafka producers ...") + (reduce (fn [producers [stream-config-key properties]] + (do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ") + (assoc producers stream-config-key (KafkaProducer. properties)))) + {} + (seq (producer-properties-map)))) + (log/info "No producers found. Can not initiate start.")) + + :stop (if (not-empty kafka-producers) + (do (log/info "Stopping Kafka producers ...") + (doall (map (fn [[stream-config-key producer]] + (log/debug "Stopping Kafka producer associated with [" stream-config-key "]") + (doto producer + (.flush) + (.close))) + (seq kafka-producers)))) + (log/info "No producers found. Can not initiate stop."))) + +(defn send + "A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables + the users of Ziggurat to produce data to a Kafka topic using a Kafka producer + associated with a Kafka stream config key. + + E.g. + For publishing data to producer defined for the + stream router config with defined agains + key `:default`, use send like this. + + `(send :default \"test-topic\" \"key\" \"value\")` + `(send :default \"test-topic\" 1 \"key\" \"value\")` + + " + + ([stream-config-key topic key value] + (send stream-config-key topic nil key value)) + + ([stream-config-key topic partition key value] + (if (some? (get kafka-producers stream-config-key)) + (let [producer-record (ProducerRecord. topic partition key value)] + (.send (stream-config-key kafka-producers) producer-record)) + + (let [error-msg (str "Can't publish data. No producers defined for stream config [" stream-config-key "]")] + (do (log/error error-msg) + (throw (ex-info error-msg {:stream-config-key stream-config-key}))))))) diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index 72c1c05f..cba85fab 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -8,9 +8,14 @@ [ziggurat.messaging.connection :refer [connection]] [ziggurat.server :refer [server]] [ziggurat.messaging.producer :as pr] + [ziggurat.producer :as producer] [langohr.channel :as lch] [langohr.exchange :as le] - [langohr.queue :as lq])) + [langohr.queue :as lq]) + (:import (org.apache.kafka.streams.integration.utils EmbeddedKafkaCluster) + (java.util Properties) + (org.apache.kafka.clients.producer ProducerConfig) + (org.apache.kafka.clients.consumer ConsumerConfig))) (defn mount-config [] (-> (mount/only [#'config/config]) @@ -91,3 +96,37 @@ (finally (delete-queues ~stream-routes) (delete-exchanges ~stream-routes)))) + +(defn mount-producer [] + (-> (mount/only [#'producer/kafka-producers]) + (mount/start))) + +(defn construct-embedded-kafka-cluster [] + (doto (EmbeddedKafkaCluster. 1) + (.start))) + +(def ^:dynamic *embedded-kafka-cluster* nil) +(def ^:dynamic *bootstrap-servers* nil) +(def ^:dynamic *consumer-properties* nil) +(def ^:dynamic *producer-properties* nil) + +(defn mount-only-config-and-producer [f] + (do + (mount-config) + (mount-producer) + (binding [*embedded-kafka-cluster* (construct-embedded-kafka-cluster)] + (binding [*bootstrap-servers* (.bootstrapServers *embedded-kafka-cluster*)] + (binding [*consumer-properties* (doto (Properties.) + (.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG, *bootstrap-servers*) + (.put ConsumerConfig/GROUP_ID_CONFIG, "ziggurat-consumer") + (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG, "earliest") + (.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + (.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")) + *producer-properties* (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG *bootstrap-servers*) + (.put ProducerConfig/ACKS_CONFIG "all") + (.put ProducerConfig/RETRIES_CONFIG (int 0)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer") + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))] + (f))))) + (mount/stop)) \ No newline at end of file diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index 497d30c5..41321e64 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -5,8 +5,10 @@ [ziggurat.messaging.connection :as rmqc] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] - [ziggurat.streams :as streams] - [ziggurat.server.test-utils :as tu])) + [ziggurat.streams :as streams :refer [stream]] + [mount.core :refer [defstate]] + [ziggurat.server.test-utils :as tu] + [mount.core :as mount])) (deftest start-calls-actor-start-fn-test (testing "The actor start fn starts before the ziggurat state and can read config" @@ -155,3 +157,34 @@ (let [modes [:invalid-modes :api-server :second-invalid]] (is (thrown? clojure.lang.ExceptionInfo (init/validate-modes modes)))))) +(deftest kafka-producers-should-start + (let [args {:actor-routes [] + :stream-routes []} + producer-has-started (atom false)] + (with-redefs [init/start-kafka-producers (fn [] (reset! producer-has-started true)) + init/start-kafka-streams (constantly nil)] + (testing "Starting the streams should start kafka-producers as well" + (init/start-stream args) + (is (= true @producer-has-started))) + (testing "Starting the workers should start kafka-producers as well" + (reset! producer-has-started false) + (init/start-workers args) + (is (= true @producer-has-started)))))) + +(deftest kafka-producers-should-stop + (let [producer-has-stopped (atom false)] + (with-redefs [init/stop-kafka-producers (fn [] (reset! producer-has-stopped true)) + init/stop-kafka-streams (constantly nil)] + (testing "Stopping the streams should stop kafka-producers as well" + (init/stop-stream) + (is (= true @producer-has-stopped))) + (testing "Stopping the workers should stop kafka-producers as well" + (reset! producer-has-stopped false) + (init/stop-workers) + (is (= true @producer-has-stopped))) + (mount/stop)))) + + + + + diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj new file mode 100644 index 00000000..2d29bad9 --- /dev/null +++ b/test/ziggurat/producer_test.clj @@ -0,0 +1,68 @@ +(ns ziggurat.producer-test + (:require [clojure.test :refer :all] + [ziggurat.streams :refer [start-streams stop-streams]] + [ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties* *embedded-kafka-cluster*]] + [ziggurat.config :refer [ziggurat-config]] + [ziggurat.producer :refer [producer-properties-map send kafka-producers]] + [clojure.test.check.generators :as gen]) + (:import (org.apache.kafka.streams.integration.utils IntegrationTestUtils) + (org.apache.kafka.clients.producer KafkaProducer))) + +(use-fixtures :once fix/mount-only-config-and-producer) + +(defn stream-router-config-without-producer []) +(:stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}) + +(deftest send-data-with-topic-and-value-test + (with-redefs + [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [topic (gen/generate gen/string-alphanumeric) + key "message" + value "Hello World!!"] + (.createTopic *embedded-kafka-cluster* topic) + (send :default topic key value) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 1000)] + (is (= value (.value (first result)))))))) + +(deftest send-data-with-topic-key-partition-and-value-test + (with-redefs + [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [topic (gen/generate gen/string-alphanumeric) + key "message" + value "Hello World!!" + partition (int 0)] + (.createTopic *embedded-kafka-cluster* topic) + (send :default topic partition key value) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 1000)] + (is (= value (.value (first result)))))))) + +(deftest send-throws-exception-when-no-producers-are-configured + (with-redefs + [kafka-producers {}] + (let [topic "test-topic" + key "message" + value "Hello World!! from non-existant Kafka Producers"] + (is (not-empty (try (send :default topic key value) + (catch Exception e (ex-data e)))))))) + +(deftest producer-properties-map-is-empty-if-no-producers-configured + ; Here ziggurat-config has been substituted with a custom map which + ; does not have any valid producer configs. + (with-redefs + [ziggurat-config stream-router-config-without-producer] + (is (empty? (producer-properties-map))))) + +(deftest producer-properties-map-is-not-empty-if-producers-are-configured + ; Here the config is read from config.test.edn which contains + ; valid producer configs. + (is (seq (producer-properties-map)))) + + +