From 3c68b2234356db46e6cc98e9b8be48e7192029fd Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Wed, 29 May 2019 16:59:37 +0530 Subject: [PATCH 1/2] Enabling producing support in Ziggurat --- resources/config.test.ci.edn | 9 +++- resources/config.test.edn | 9 +++- src/ziggurat/init.clj | 3 +- src/ziggurat/producer.clj | 80 +++++++++++++++++++++++++++++++++ test/ziggurat/fixtures.clj | 12 +++++ test/ziggurat/producer_test.clj | 27 +++++++++++ 6 files changed, 137 insertions(+), 3 deletions(-) create mode 100644 src/ziggurat/producer.clj create mode 100644 test/ziggurat/producer_test.clj diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index c56c543c..f5038bc3 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -35,4 +35,11 @@ :proto-class "com.company.LogMessage" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] - :enabled [true :bool]}}}}}}} \ No newline at end of file + :enabled [true :bool]}}}}} + :producer {:default {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}} \ No newline at end of file diff --git a/resources/config.test.edn b/resources/config.test.edn index 5261129a..0025b214 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -35,4 +35,11 @@ :proto-class "com.company.LogMessage" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] - :enabled [true :bool]}}}}}}} \ No newline at end of file + :enabled [true :bool]}}}}} + :producer {:default {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}} \ No newline at end of file diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 1fb5e741..2d2e7d72 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -11,7 +11,8 @@ [ziggurat.nrepl-server :as nrepl-server] [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.server :as server] - [ziggurat.streams :as streams])) + [ziggurat.streams :as streams] + [ziggurat.producer :refer [kafka-producer]])) (defstate statsd-reporter :start (metrics/start-statsd-reporter (:datadog (ziggurat-config)) diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj new file mode 100644 index 00000000..8e3c2141 --- /dev/null +++ b/src/ziggurat/producer.clj @@ -0,0 +1,80 @@ +(ns ziggurat.producer + "This namespace defines methods for publishing data to + Kafka topics. The methods defined here are essentially wrapper + around variants of `send` methods defined in + `org.apache.kafka.clients.producer.KafkaProducer`. + + Users of Ziggurat can use methods defined here to send data + to Kafka topics. + + Constructs an instance of `org.apache.kafka.clients.producer.KafkaProducer` using config values + provided in `resources/config.edn`. Producer configs currently supported in Ziggurat are + mentioned below. + + - bootstrap.servers + - acks + - retries + - key.serializer + - value.serializer + - max.in.flight.requests.per.connection + - enable.idempotencecd + + Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs) + for a complete list of all producer configs available in Kafka." + + (:require [ziggurat.config :refer [ziggurat-config]] + [clojure.tools.logging :as log] + [mount.core :refer [defstate]]) + (:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig) + (java.util Properties))) + +(defn producer-config [] (get-in (ziggurat-config) [:producer :default])) + +(defn producer-config-properties [] + (let [bootstrap-servers (:bootstrap-servers (producer-config)) + acks (:acks (producer-config)) + key-serializer-class (:key-serializer (producer-config)) + value-serializer-class (:value-serializer (producer-config)) + enable-idempotence (:enable-idempotence (producer-config)) + retries-config (int (:retries-config (producer-config))) + max-in-flight-requests-per-connection (int (:max-in-flight-requests-per-connection (producer-config)))] + (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers) + (.put ProducerConfig/ACKS_CONFIG acks) + (.put ProducerConfig/RETRIES_CONFIG retries-config) + (.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence) + (.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max-in-flight-requests-per-connection) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer-class) + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer-class)))) + +(defstate kafka-producer + :start (do (log/info "Starting Kafka producer ...") + (KafkaProducer. (producer-config-properties))) + :stop (do (log/info "Stopping Kafka producer ...") + (doto kafka-producer + (.flush) + (.close)))) + +(defn send + "A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables + the users of ziggurat to produce data to a Kafka topic using a Kafka producer. + + Please see `producer` for the list of config currently supported in ziggurat. + + E.g. + (send \"test-topic\" \"value\") + (send \"test-topic\" 1 \"key\" \"value\") + " + + ([topic data] + (let [producer-record (ProducerRecord. topic data)] + (.send kafka-producer producer-record))) + + ([topic partition key data] + (let [producer-record (ProducerRecord. topic (int partition) key data)] + (.send kafka-producer producer-record)))) + + + + + diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index 72c1c05f..f5e6b3c7 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -8,6 +8,7 @@ [ziggurat.messaging.connection :refer [connection]] [ziggurat.server :refer [server]] [ziggurat.messaging.producer :as pr] + [ziggurat.producer :as producer] [langohr.channel :as lch] [langohr.exchange :as le] [langohr.queue :as lq])) @@ -91,3 +92,14 @@ (finally (delete-queues ~stream-routes) (delete-exchanges ~stream-routes)))) + +(defn mount-producer [] + (-> (mount/only [#'producer/kafka-producer]) + (mount/start))) + +(defn mount-only-config-and-producer [f] + (do + (mount-config) + (mount-producer)) + (f) + (mount/stop)) \ No newline at end of file diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj new file mode 100644 index 00000000..4bf91f87 --- /dev/null +++ b/test/ziggurat/producer_test.clj @@ -0,0 +1,27 @@ +(ns ziggurat.producer-test + (:require [clojure.test :refer :all] + [flatland.protobuf.core :as proto] + [ziggurat.streams :refer [start-streams stop-streams]] + [ziggurat.fixtures :as fix] + [ziggurat.config :refer [ziggurat-config]] + [ziggurat.producer :as producer])) + +(use-fixtures :once fix/mount-only-config-and-producer) + +(deftest properties-map-is-generated-correctly + (is (= "localhost:9092" (.get (producer/producer-config-properties) "bootstrap.servers"))) + (is (= "all" (.get (producer/producer-config-properties) "acks"))) + (is (= 5 (.get (producer/producer-config-properties) "retries"))) + (is (= 5 (.get (producer/producer-config-properties) "max.in.flight.requests.per.connection"))) + (is (= "org.apache.kafka.common.serialization.StringSerializer" (.get (producer/producer-config-properties) "key.serializer"))) + (is (= "org.apache.kafka.common.serialization.StringSerializer" (.get (producer/producer-config-properties) "value.serializer"))) + (is (false? (.get (producer/producer-config-properties) "enable.idempotence")))) + +(deftest send-data-with-topic-and-value-test + (let [topic "test-topic" + data "Hello World!!!"] + (let [future (producer/send topic data)] + (Thread/sleep 1000) + (is (-> future .get .partition (>= 0)))))) + + From 0831bd520246c34866496bd01bf7617a03579b8a Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Tue, 4 Jun 2019 17:55:30 +0530 Subject: [PATCH 2/2] Extended producing support to multiple producers * User can define producer config inside a stream-router config * User can publish data to any producer by calling send method with the stream-router config key against which producer config is defined. * Few unit tests --- project.clj | 3 +- resources/config.test.ci.edn | 38 +++++---- resources/config.test.edn | 38 +++++---- src/ziggurat/init.clj | 8 +- src/ziggurat/producer.clj | 138 +++++++++++++++++++++----------- test/ziggurat/fixtures.clj | 35 +++++++- test/ziggurat/producer_test.clj | 73 +++++++++++++---- 7 files changed, 232 insertions(+), 101 deletions(-) diff --git a/project.clj b/project.clj index 4a685bf7..90c678f5 100644 --- a/project.clj +++ b/project.clj @@ -49,7 +49,8 @@ [junit/junit "4.12"] [org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] [org.apache.kafka/kafka-clients "1.1.1" :classifier "test"] - [org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]] + [org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"] + [org.clojure/test.check "0.9.0"]] :plugins [[lein-cloverage "1.0.13"]] :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} :dev {:plugins [[jonase/eastwood "0.2.6"] diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index f5038bc3..39962a8c 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -28,18 +28,26 @@ :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "application-name-test-02-multiple" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "kafka-topic-*" - :proto-class "com.company.LogMessage" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}} - :producer {:default {:bootstrap-servers "localhost:9092" - :acks "all" - :retries-config 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}} \ No newline at end of file + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}} + :without-producer {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index 0025b214..9609011b 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -28,18 +28,26 @@ :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "application-name-test-02-multiple" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "kafka-topic-*" - :proto-class "com.company.LogMessage" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}} - :producer {:default {:bootstrap-servers "localhost:9092" - :acks "all" - :retries-config 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}} \ No newline at end of file + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}} + :without-producer {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}}} diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 2d2e7d72..eca5d06d 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -12,7 +12,7 @@ [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.server :as server] [ziggurat.streams :as streams] - [ziggurat.producer :refer [kafka-producer]])) + [ziggurat.producer :as producer :refer [kafka-producers]])) (defstate statsd-reporter :start (metrics/start-statsd-reporter (:datadog (ziggurat-config)) @@ -41,7 +41,8 @@ (defn start-stream [args] (start-rabbitmq-producers args) - (start* #{#'streams/stream} args)) + (start* #{#'streams/stream} args) + (start* #{#'producer/kafka-producers} args)) (defn start-management-apis [args] (start-rabbitmq-connection args) @@ -67,7 +68,8 @@ (defn stop-stream [] (mount/stop #'streams/stream) - (stop-rabbitmq-connection)) + (stop-rabbitmq-connection) + (mount/stop #'producer/kafka-producers)) (defn stop-management-apis [] (mount/stop #'server/server) diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj index 8e3c2141..b4ee344b 100644 --- a/src/ziggurat/producer.clj +++ b/src/ziggurat/producer.clj @@ -4,13 +4,36 @@ around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`. - Users of Ziggurat can use methods defined here to send data - to Kafka topics. - - Constructs an instance of `org.apache.kafka.clients.producer.KafkaProducer` using config values - provided in `resources/config.edn`. Producer configs currently supported in Ziggurat are - mentioned below. - + At the time of initialization, an instance of + `org.apache.kafka.clients.producer.KafkaProducer` + is constructed using config values provided in `resources/config.edn`. + + A producer can be configured for each of the stream-routes + in config.edn. Please see the example below + + ` + :stream-router {:default {:application-id \"test\"\n + :bootstrap-servers \"localhost:9092\"\n + :stream-threads-count [1 :int]\n + :origin-topic \"topic\"\n + :proto-class \"flatland.protobuf.test.Example$Photo\"\n + :channels {:channel-1 {:worker-count [10 :int]\n :retry {:count [5 :int]\n :enabled [true :bool]}}}\n + :producer {:bootstrap-servers \"localhost:9092\"\n + :acks \"all\"\n + :retries-config 5\n + :max-in-flight-requests-per-connection 5\n + :enable-idempotence false\n + :value-serializer \"org.apache.kafka.common.serialization.StringSerializer\"\n + :key-serializer \"org.apache.kafka.common.serialization.StringSerializer\"}} + ` + + Usage: + + ` + Please see `send` for publishing data via Kafka producers + ` + + These are the KafkaProducer configs currenlty supported in Ziggurat. - bootstrap.servers - acks - retries @@ -28,53 +51,74 @@ (:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig) (java.util Properties))) -(defn producer-config [] (get-in (ziggurat-config) [:producer :default])) - -(defn producer-config-properties [] - (let [bootstrap-servers (:bootstrap-servers (producer-config)) - acks (:acks (producer-config)) - key-serializer-class (:key-serializer (producer-config)) - value-serializer-class (:value-serializer (producer-config)) - enable-idempotence (:enable-idempotence (producer-config)) - retries-config (int (:retries-config (producer-config))) - max-in-flight-requests-per-connection (int (:max-in-flight-requests-per-connection (producer-config)))] - (doto (Properties.) - (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers) - (.put ProducerConfig/ACKS_CONFIG acks) - (.put ProducerConfig/RETRIES_CONFIG retries-config) - (.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence) - (.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max-in-flight-requests-per-connection) - (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer-class) - (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer-class)))) - -(defstate kafka-producer - :start (do (log/info "Starting Kafka producer ...") - (KafkaProducer. (producer-config-properties))) - :stop (do (log/info "Stopping Kafka producer ...") - (doto kafka-producer - (.flush) - (.close)))) +(defn- producer-properties-from-config [{:keys [bootstrap-servers + acks + key-serializer + value-serializer + enable-idempotence + retries-config + max-in-flight-requests-per-connection]}] + (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers) + (.put ProducerConfig/ACKS_CONFIG acks) + (.put ProducerConfig/RETRIES_CONFIG (int retries-config)) + (.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence) + (.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION (int max-in-flight-requests-per-connection)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer) + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer))) + +(defn producer-properties-map [] + (reduce (fn [producer-map [stream-config-key stream-config]] + (let [producer-config (:producer stream-config)] + (if (some? producer-config) + (assoc producer-map stream-config-key (producer-properties-from-config producer-config)) + producer-map))) + {} + (seq (:stream-router (ziggurat-config))))) + +(defstate kafka-producers + :start (if (not-empty (producer-properties-map)) + (do (log/info "Starting Kafka producers ...") + (reduce (fn [producers [stream-config-key properties]] + (do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ") + (assoc producers stream-config-key (KafkaProducer. properties)))) + {} + (seq (producer-properties-map)))) + (log/info "No producers found. Can not initiate start.")) + + :stop (if (not-empty kafka-producers) + (do (log/info "Stopping Kafka producers ...") + (doall (map (fn [[stream-config-key producer]] + (log/debug "Stopping Kafka producer associated with [" stream-config-key "]") + (doto producer + (.flush) + (.close))) + (seq kafka-producers)))) + (log/info "No producers found. Can not initiate stop."))) (defn send "A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables - the users of ziggurat to produce data to a Kafka topic using a Kafka producer. - - Please see `producer` for the list of config currently supported in ziggurat. + the users of Ziggurat to produce data to a Kafka topic using a Kafka producer + associated with a Kafka stream config key. E.g. - (send \"test-topic\" \"value\") - (send \"test-topic\" 1 \"key\" \"value\") - " - - ([topic data] - (let [producer-record (ProducerRecord. topic data)] - (.send kafka-producer producer-record))) - - ([topic partition key data] - (let [producer-record (ProducerRecord. topic (int partition) key data)] - (.send kafka-producer producer-record)))) + For publishing data to producer defined for the + stream router config with defined agains + key `:default`, use send like this. + `(send :default \"test-topic\" \"key\" \"value\")` + `(send :default \"test-topic\" 1 \"key\" \"value\")` + " + ([stream-config-key topic key value] + (send stream-config-key topic nil key value)) + ([stream-config-key topic partition key value] + (if (some? (get kafka-producers stream-config-key)) + (let [producer-record (ProducerRecord. topic partition key value)] + (.send (stream-config-key kafka-producers) producer-record)) + (let [error-msg (str "Can't publish data. No producers defined for stream config [" stream-config-key "]")] + (do (log/error error-msg) + (throw (ex-info error-msg {:stream-config-key stream-config-key}))))))) diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index f5e6b3c7..cba85fab 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -11,7 +11,11 @@ [ziggurat.producer :as producer] [langohr.channel :as lch] [langohr.exchange :as le] - [langohr.queue :as lq])) + [langohr.queue :as lq]) + (:import (org.apache.kafka.streams.integration.utils EmbeddedKafkaCluster) + (java.util Properties) + (org.apache.kafka.clients.producer ProducerConfig) + (org.apache.kafka.clients.consumer ConsumerConfig))) (defn mount-config [] (-> (mount/only [#'config/config]) @@ -94,12 +98,35 @@ (delete-exchanges ~stream-routes)))) (defn mount-producer [] - (-> (mount/only [#'producer/kafka-producer]) + (-> (mount/only [#'producer/kafka-producers]) (mount/start))) +(defn construct-embedded-kafka-cluster [] + (doto (EmbeddedKafkaCluster. 1) + (.start))) + +(def ^:dynamic *embedded-kafka-cluster* nil) +(def ^:dynamic *bootstrap-servers* nil) +(def ^:dynamic *consumer-properties* nil) +(def ^:dynamic *producer-properties* nil) + (defn mount-only-config-and-producer [f] (do (mount-config) - (mount-producer)) - (f) + (mount-producer) + (binding [*embedded-kafka-cluster* (construct-embedded-kafka-cluster)] + (binding [*bootstrap-servers* (.bootstrapServers *embedded-kafka-cluster*)] + (binding [*consumer-properties* (doto (Properties.) + (.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG, *bootstrap-servers*) + (.put ConsumerConfig/GROUP_ID_CONFIG, "ziggurat-consumer") + (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG, "earliest") + (.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + (.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")) + *producer-properties* (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG *bootstrap-servers*) + (.put ProducerConfig/ACKS_CONFIG "all") + (.put ProducerConfig/RETRIES_CONFIG (int 0)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer") + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))] + (f))))) (mount/stop)) \ No newline at end of file diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj index 4bf91f87..7def28f5 100644 --- a/test/ziggurat/producer_test.clj +++ b/test/ziggurat/producer_test.clj @@ -1,27 +1,68 @@ (ns ziggurat.producer-test (:require [clojure.test :refer :all] - [flatland.protobuf.core :as proto] [ziggurat.streams :refer [start-streams stop-streams]] - [ziggurat.fixtures :as fix] + [ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties* *embedded-kafka-cluster*]] [ziggurat.config :refer [ziggurat-config]] - [ziggurat.producer :as producer])) + [ziggurat.producer :refer [producer-properties-map send kafka-producers]] + [clojure.test.check.generators :as gen]) + (:import (org.apache.kafka.streams.integration.utils IntegrationTestUtils) + (org.apache.kafka.clients.producer KafkaProducer))) (use-fixtures :once fix/mount-only-config-and-producer) -(deftest properties-map-is-generated-correctly - (is (= "localhost:9092" (.get (producer/producer-config-properties) "bootstrap.servers"))) - (is (= "all" (.get (producer/producer-config-properties) "acks"))) - (is (= 5 (.get (producer/producer-config-properties) "retries"))) - (is (= 5 (.get (producer/producer-config-properties) "max.in.flight.requests.per.connection"))) - (is (= "org.apache.kafka.common.serialization.StringSerializer" (.get (producer/producer-config-properties) "key.serializer"))) - (is (= "org.apache.kafka.common.serialization.StringSerializer" (.get (producer/producer-config-properties) "value.serializer"))) - (is (false? (.get (producer/producer-config-properties) "enable.idempotence")))) +(defn stream-router-config-without-producer []) +(:stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}) (deftest send-data-with-topic-and-value-test - (let [topic "test-topic" - data "Hello World!!!"] - (let [future (producer/send topic data)] - (Thread/sleep 1000) - (is (-> future .get .partition (>= 0)))))) + (with-redefs + [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [topic (gen/generate gen/string-alphanumeric) + key "message" + value "Hello World!!"] + (.createTopic *embedded-kafka-cluster* topic) + (send :default topic key value) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 1000)] + (is (= value (.value (first result)))))))) + +(deftest send-data-with-topic-key-partition-and-value-test + (with-redefs + [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [topic (gen/generate gen/string-alphanumeric) + key "message" + value "Hello World!!" + partition (int 0)] + (.createTopic *embedded-kafka-cluster* topic) + (send :default topic partition key value) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 1000)] + (is (= value (.value (first result)))))))) + +(deftest send-throws-exception-when-no-producers-are-configured + (with-redefs + [kafka-producers {}] + (let [topic "test-topic" + key "message" + value "Hello World!! from non-existant Kafka Producers"] + (is (not-empty (try (send :default topic key value) + (catch Exception e (ex-data e)))))))) + +(deftest producer-properties-map-is-empty-if-no-producers-configured + ; Here ziggurat-config has been substituted with a custom map which + ; does not have any valid producer configs. + (with-redefs + [ziggurat-config stream-router-config-without-producer] + (is (empty? (producer-properties-map))))) + +(deftest producer-properties-map-is-not-empty-if-producers-are-configured + ; Here the config is read from config.test.edn which contains + ; valid producer configs. + (is (seq (producer-properties-map)))) +