diff --git a/Makefile b/Makefile index 1f97ed89..7f9dc759 100644 --- a/Makefile +++ b/Makefile @@ -11,8 +11,9 @@ setup: sleep 10 docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(another_test_topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper + test: setup - ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test + TESTING_TYPE=default lein test docker-compose down setup-cluster: @@ -24,10 +25,12 @@ setup-cluster: # Sleeping for 30s to allow the cluster to come up docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1 docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(another_test_topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1 + test-cluster: setup-cluster - ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test-cluster + TESTING_TYPE=cluster lein test-cluster docker-compose -f docker-compose-cluster.yml down rm -rf /tmp/ziggurat_kafka_cluster_data + coverage: setup lein code-coverage docker-compose down diff --git a/bin/run_cluster_tests_in_ci.sh b/bin/run_cluster_tests_in_ci.sh index 9124dcf3..85a927b9 100755 --- a/bin/run_cluster_tests_in_ci.sh +++ b/bin/run_cluster_tests_in_ci.sh @@ -3,5 +3,4 @@ set -ex lein clean -mv -fv resources/config.test.{cluster.ci.edn,cluster.edn} sudo make test-cluster diff --git a/bin/run_tests_in_ci.sh b/bin/run_tests_in_ci.sh index 45a0c800..c290e8aa 100755 --- a/bin/run_tests_in_ci.sh +++ b/bin/run_tests_in_ci.sh @@ -3,5 +3,4 @@ set -ex lein clean -mv -fv resources/config.test.{ci.edn,edn} -make test \ No newline at end of file +make test diff --git a/project.clj b/project.clj index 89519522..ff67293f 100644 --- a/project.clj +++ b/project.clj @@ -63,7 +63,6 @@ :java-source-paths ["src/com"] :aliases {"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--lcov"] "test-cluster" ["shell" "lein" "test"]} - :shell {:env {"TEST_CONFIG_FILE" "config.test.cluster.edn"}} :aot [ziggurat.kafka-consumer.invalid-return-type-exception] :profiles {:uberjar {:aot :all :global-vars {*warn-on-reflection* true} diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn deleted file mode 100644 index 4537fa5e..00000000 --- a/resources/config.test.ci.edn +++ /dev/null @@ -1,87 +0,0 @@ -{:ziggurat {:app-name "application_name" - :env [:dev :keyword] - :nrepl-server {:port [7011 :int]} - :datadog {:host "localhost" - :port [8126 :int] - :enabled [false :bool]} - :statsd {:host "localhost" - :port [8125 :int] - :enabled [false :bool]} - :sentry {:enabled [false :bool] - :dsn "dummy" - :worker-count [10 :int] - :queue-size [10 :int] - :thread-termination-wait-s [1 :int]} - :rabbit-mq-connection {:host "127.0.0.1" - :port [5672 :int] - :username "guest" - :password "guest" - :channel-timeout [2000 :int]} - :jobs {:instant {:worker-count [4 :int] - :prefetch-count [4 :int]}} - :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" - :exchange-name "application_name_delay_exchange_test" - :dead-letter-exchange "application_name_instant_exchange_test" - :queue-timeout-ms [100 :int]} - :instant {:queue-name "application_name_instant_queue_test" - :exchange-name "application_name_instant_exchange_test"} - :dead-letter {:queue-name "application_name_dead_letter_queue_test" - :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]} - :http-server {:middlewares {:swagger {:enabled false}} - :port [8010 :int] - :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "topic" - :changelog-topic-replication-factor [1 :int] - :channels {:channel-1 {:worker-count [10 :int] - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]}}} - :producer {:bootstrap-servers "localhost:9092" - :acks "all" - :retries 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} - :using-string-serde {:application-id "test" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "another-test-topic" - :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :key-deserializer-encoding "UTF8" - :value-deserializer-encoding "UTF8" - :deserializer-encoding "UTF8" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9092" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9092" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} - :tracer {:enabled [true :bool] - :custom-provider ""} - :new-relic {:report-errors false}}} diff --git a/resources/config.test.cluster.ci.edn b/resources/config.test.cluster.ci.edn deleted file mode 100644 index 5f76a8b3..00000000 --- a/resources/config.test.cluster.ci.edn +++ /dev/null @@ -1,87 +0,0 @@ -{:ziggurat {:app-name "application_name" - :env [:dev :keyword] - :nrepl-server {:port [7011 :int]} - :datadog {:host "localhost" - :port [8126 :int] - :enabled [false :bool]} - :statsd {:host "localhost" - :port [8125 :int] - :enabled [false :bool]} - :sentry {:enabled [false :bool] - :dsn "dummy" - :worker-count [10 :int] - :queue-size [10 :int] - :thread-termination-wait-s [1 :int]} - :rabbit-mq-connection {:host "127.0.0.1" - :port [5672 :int] - :username "guest" - :password "guest" - :channel-timeout [2000 :int]} - :jobs {:instant {:worker-count [4 :int] - :prefetch-count [4 :int]}} - :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" - :exchange-name "application_name_delay_exchange_test" - :dead-letter-exchange "application_name_instant_exchange_test" - :queue-timeout-ms [100 :int]} - :instant {:queue-name "application_name_instant_queue_test" - :exchange-name "application_name_instant_exchange_test"} - :dead-letter {:queue-name "application_name_dead_letter_queue_test" - :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]} - :http-server {:middlewares {:swagger {:enabled false}} - :port [8010 :int] - :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :stream-threads-count [1 :int] - :origin-topic "topic" - :changelog-topic-replication-factor [1 :int] - :channels {:channel-1 {:worker-count [10 :int] - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]}}} - :producer {:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :acks "all" - :retries 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} - :using-string-serde {:application-id "test" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :stream-threads-count [1 :int] - :origin-topic "another-test-topic" - :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :key-deserializer-encoding "UTF8" - :value-deserializer-encoding "UTF8" - :deserializer-encoding "UTF8" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} - :tracer {:enabled [true :bool] - :custom-provider ""} - :new-relic {:report-errors false}}} diff --git a/resources/config.test.cluster.edn b/resources/config.test.cluster.edn deleted file mode 100644 index 5f76a8b3..00000000 --- a/resources/config.test.cluster.edn +++ /dev/null @@ -1,87 +0,0 @@ -{:ziggurat {:app-name "application_name" - :env [:dev :keyword] - :nrepl-server {:port [7011 :int]} - :datadog {:host "localhost" - :port [8126 :int] - :enabled [false :bool]} - :statsd {:host "localhost" - :port [8125 :int] - :enabled [false :bool]} - :sentry {:enabled [false :bool] - :dsn "dummy" - :worker-count [10 :int] - :queue-size [10 :int] - :thread-termination-wait-s [1 :int]} - :rabbit-mq-connection {:host "127.0.0.1" - :port [5672 :int] - :username "guest" - :password "guest" - :channel-timeout [2000 :int]} - :jobs {:instant {:worker-count [4 :int] - :prefetch-count [4 :int]}} - :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" - :exchange-name "application_name_delay_exchange_test" - :dead-letter-exchange "application_name_instant_exchange_test" - :queue-timeout-ms [100 :int]} - :instant {:queue-name "application_name_instant_queue_test" - :exchange-name "application_name_instant_exchange_test"} - :dead-letter {:queue-name "application_name_dead_letter_queue_test" - :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]} - :http-server {:middlewares {:swagger {:enabled false}} - :port [8010 :int] - :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :stream-threads-count [1 :int] - :origin-topic "topic" - :changelog-topic-replication-factor [1 :int] - :channels {:channel-1 {:worker-count [10 :int] - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]}}} - :producer {:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :acks "all" - :retries 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} - :using-string-serde {:application-id "test" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :stream-threads-count [1 :int] - :origin-topic "another-test-topic" - :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :key-deserializer-encoding "UTF8" - :value-deserializer-encoding "UTF8" - :deserializer-encoding "UTF8" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} - :tracer {:enabled [true :bool] - :custom-provider ""} - :new-relic {:report-errors false}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index 36bf69e3..5702689b 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -15,7 +15,7 @@ :worker-count [10 :int] :queue-size [10 :int] :thread-termination-wait-s [1 :int]} - :rabbit-mq-connection {:host "localhost" + :rabbit-mq-connection {:host "127.0.0.1" :port [5672 :int] :username "guest" :password "guest" @@ -30,8 +30,8 @@ :exchange-name "application_name_instant_exchange_test"} :dead-letter {:queue-name "application_name_dead_letter_queue_test" :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:count [5 :int] - :type [:linear :keyword] + :retry {:type [:linear :keyword] + :count [5 :int] :enabled [true :bool]} :http-server {:middlewares {:swagger {:enabled false}} :port [8010 :int] diff --git a/test/ziggurat/config_test.clj b/test/ziggurat/config_test.clj index ed021f71..0d9b2eec 100644 --- a/test/ziggurat/config_test.clj +++ b/test/ziggurat/config_test.clj @@ -26,11 +26,11 @@ (testing "calls clonfig" (let [config-values-from-env {:key "val"}] (with-redefs [clonfig/read-config (fn [_] config-values-from-env)] - (is (= config-values-from-env (config-from-env (f/get-config-file-name)))))))) + (is (= config-values-from-env (config-from-env f/test-config-file-name))))))) (deftest config-test (testing "returns merged config from env variables and default values with env variables taking higher precedence" - (let [config-filename (f/get-config-file-name) + (let [config-filename f/test-config-file-name config-values-from-env (-> (config-from-env config-filename) (update-in [:ziggurat] dissoc :nrepl-server))] (with-redefs [config-from-env (fn [_] config-values-from-env) @@ -43,7 +43,7 @@ (testing "returns default interpolated rabbitmq config when not present in env variables" (let [app-name "application_name" - config-filename (f/get-config-file-name) + config-filename f/test-config-file-name config-values-from-env (-> (config-from-env config-filename) (update-in [:ziggurat :rabbit-mq] dissoc :delay) (assoc-in [:ziggurat :app-name] app-name)) @@ -59,7 +59,7 @@ (deftest ziggurat-config-test (testing "returns ziggurat config" - (let [config-filename (f/get-config-file-name) + (let [config-filename f/test-config-file-name config-values-from-env (config-from-env config-filename)] (with-redefs [config-from-env (fn [_] config-values-from-env) config-file config-filename] @@ -69,7 +69,7 @@ (deftest rabbitmq-config-test (testing "returns rabbitmq config" - (let [config-filename (f/get-config-file-name) + (let [config-filename f/test-config-file-name config-values-from-env (config-from-env config-filename)] (with-redefs [config-from-env (fn [_] config-values-from-env) config-file config-filename] @@ -79,7 +79,7 @@ (deftest statsd-config-test (testing "returns statsd config using the :statsd key or :datadog key" - (let [config-filename (f/get-config-file-name) ;; inside config.test.edn, both :datadog and :statsd keys are present + (let [config-filename f/test-config-file-name ;; inside config.test.edn, both :datadog and :statsd keys are present config-values-from-env (config-from-env config-filename)] (with-redefs [config-from-env (fn [_] config-values-from-env) config-file config-filename] @@ -105,7 +105,7 @@ (deftest get-in-config-test (testing "returns config for key passed" - (let [config-filename (f/get-config-file-name) + (let [config-filename f/test-config-file-name config-values-from-env (config-from-env config-filename)] (with-redefs [config-from-env (fn [_] config-values-from-env) config-file config-filename] @@ -113,7 +113,7 @@ (is (= (-> config-values-from-env :ziggurat :http-server :port) (get-in-config [:http-server :port]))) (mount/stop)))) (testing "returns config for key passed with default" - (let [config-filename (f/get-config-file-name) + (let [config-filename f/test-config-file-name config-values-from-env (config-from-env config-filename) default "test"] (with-redefs [config-from-env (fn [_] config-values-from-env) @@ -124,7 +124,7 @@ (deftest channel-retry-config-test (testing "returns channel retry config" - (let [config-filename (f/get-config-file-name) + (let [config-filename f/test-config-file-name config-values-from-env (config-from-env config-filename) topic-entity :default channel :channel-1] diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index e654299e..751c0173 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -1,35 +1,50 @@ (ns ziggurat.fixtures - (:require [clojure.test :refer :all] - [clojure.tools.logging :as log] - [clojure.stacktrace :as st] + (:require [clojure.tools.logging :as log] + [langohr.channel :as lch] + [langohr.exchange :as le] + [langohr.queue :as lq] [mount.core :as mount] [ziggurat.config :as config] - [ziggurat.messaging.util :as util] + [ziggurat.kafka-consumer.executor-service :refer [thread-pool]] [ziggurat.messaging.connection :refer [connection]] - [ziggurat.server :refer [server]] [ziggurat.messaging.producer :as pr] + [ziggurat.messaging.util :as util] + [ziggurat.metrics :as metrics] [ziggurat.producer :as producer] - [langohr.channel :as lch] - [langohr.exchange :as le] - [langohr.queue :as lq] - [ziggurat.tracer :as tracer] - [ziggurat.kafka-consumer.executor-service :refer [thread-pool]]) - (:import (java.util Properties) - (org.apache.kafka.clients.producer ProducerConfig) + [ziggurat.server :refer [server]] + [ziggurat.tracer :as tracer]) + (:import (io.opentracing.mock MockTracer) + (java.util Properties) (org.apache.kafka.clients.consumer ConsumerConfig) - (io.opentracing.mock MockTracer)) + (org.apache.kafka.clients.producer ProducerConfig)) (:gen-class - :name tech.gojek.ziggurat.internal.test.Fixtures :methods [^{:static true} [mountConfig [] void] ^{:static true} [mountProducer [] void] - ^{:static true} [unmountAll [] void]])) - -(defn get-config-file-name [] - (or (System/getenv "TEST_CONFIG_FILE") "config.test.edn")) + ^{:static true} [unmountAll [] void]] + :name tech.gojek.ziggurat.internal.test.Fixtures)) + +(def test-config-file-name "config.test.edn") + +(def ^:private bootstrap-servers + (if (= (System/getenv "TESTING_TYPE") "cluster") + "localhost:9091,localhost:9092,localhost:9093" + "localhost:9092")) + +(defn- get-default-or-cluster-config + [m] + (let [keys [[:ziggurat :stream-router :default :bootstrap-servers] + [:ziggurat :stream-router :using-string-serde :bootstrap-servers] + [:ziggurat :batch-routes :consumer-1 :bootstrap-servers] + [:ziggurat :batch-routes :consumer-2 :bootstrap-servers] + [:ziggurat :stream-router :default :producer :bootstrap-servers]]] + (reduce (fn [s k] + (assoc-in s k bootstrap-servers)) + m + keys))) (defn mount-config [] (-> (mount/only [#'config/config]) - (mount/swap {#'config/config (config/config-from-env (get-config-file-name))}) + (mount/swap {#'config/config (get-default-or-cluster-config (config/config-from-env test-config-file-name))}) (mount/start))) (defn mount-only-config [f] @@ -45,9 +60,9 @@ (mount/stop))) (defn mount-metrics [f] - (mount/start (mount/only [#'ziggurat.metrics/statsd-reporter])) + (mount/start (mount/only [#'metrics/statsd-reporter])) (f) - (mount/stop #'ziggurat.metrics/statsd-reporter)) + (mount/stop #'metrics/statsd-reporter)) (defn mount-tracer [] (with-redefs [tracer/create-tracer (fn [] (MockTracer.))] @@ -144,24 +159,23 @@ (def ^:dynamic *producer-properties* nil) (defn mount-producer-with-config-and-tracer [f] - (do - (mount-config) - (mount-tracer) - (mount-producer) - (binding [*bootstrap-servers* (get-in (config/ziggurat-config) [:stream-router :default :bootstrap-servers])] - (binding [*consumer-properties* (doto (Properties.) - (.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG, *bootstrap-servers*) - (.put ConsumerConfig/GROUP_ID_CONFIG, "ziggurat-consumer") - (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG, "earliest") - (.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") - (.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")) - *producer-properties* (doto (Properties.) - (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG *bootstrap-servers*) - (.put ProducerConfig/ACKS_CONFIG "all") - (.put ProducerConfig/RETRIES_CONFIG (int 0)) - (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer") - (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))] - (f)))) + (mount-config) + (mount-tracer) + (mount-producer) + (binding [*bootstrap-servers* (get-in (config/ziggurat-config) [:stream-router :default :bootstrap-servers])] + (binding [*consumer-properties* (doto (Properties.) + (.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG, *bootstrap-servers*) + (.put ConsumerConfig/GROUP_ID_CONFIG, "ziggurat-consumer") + (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG, "earliest") + (.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + (.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")) + *producer-properties* (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG *bootstrap-servers*) + (.put ProducerConfig/ACKS_CONFIG "all") + (.put ProducerConfig/RETRIES_CONFIG (int 0)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer") + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))] + (f))) (mount/stop)) (defn unmount-all []