diff --git a/Makefile b/Makefile index 1f97ed89..7f9dc759 100644 --- a/Makefile +++ b/Makefile @@ -11,8 +11,9 @@ setup: sleep 10 docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(another_test_topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper + test: setup - ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test + TESTING_TYPE=default lein test docker-compose down setup-cluster: @@ -24,10 +25,12 @@ setup-cluster: # Sleeping for 30s to allow the cluster to come up docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1 docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(another_test_topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1 + test-cluster: setup-cluster - ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test-cluster + TESTING_TYPE=cluster lein test-cluster docker-compose -f docker-compose-cluster.yml down rm -rf /tmp/ziggurat_kafka_cluster_data + coverage: setup lein code-coverage docker-compose down diff --git a/bin/run_cluster_tests_in_ci.sh b/bin/run_cluster_tests_in_ci.sh index 9124dcf3..85a927b9 100755 --- a/bin/run_cluster_tests_in_ci.sh +++ b/bin/run_cluster_tests_in_ci.sh @@ -3,5 +3,4 @@ set -ex lein clean -mv -fv resources/config.test.{cluster.ci.edn,cluster.edn} sudo make test-cluster diff --git a/bin/run_tests_in_ci.sh b/bin/run_tests_in_ci.sh index 45a0c800..c290e8aa 100755 --- a/bin/run_tests_in_ci.sh +++ b/bin/run_tests_in_ci.sh @@ -3,5 +3,4 @@ set -ex lein clean -mv -fv resources/config.test.{ci.edn,edn} -make test \ No newline at end of file +make test diff --git a/project.clj b/project.clj index 89519522..ff67293f 100644 --- a/project.clj +++ b/project.clj @@ -63,7 +63,6 @@ :java-source-paths ["src/com"] :aliases {"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--lcov"] "test-cluster" ["shell" "lein" "test"]} - :shell {:env {"TEST_CONFIG_FILE" "config.test.cluster.edn"}} :aot [ziggurat.kafka-consumer.invalid-return-type-exception] :profiles {:uberjar {:aot :all :global-vars {*warn-on-reflection* true} diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn deleted file mode 100644 index 4537fa5e..00000000 --- a/resources/config.test.ci.edn +++ /dev/null @@ -1,87 +0,0 @@ -{:ziggurat {:app-name "application_name" - :env [:dev :keyword] - :nrepl-server {:port [7011 :int]} - :datadog {:host "localhost" - :port [8126 :int] - :enabled [false :bool]} - :statsd {:host "localhost" - :port [8125 :int] - :enabled [false :bool]} - :sentry {:enabled [false :bool] - :dsn "dummy" - :worker-count [10 :int] - :queue-size [10 :int] - :thread-termination-wait-s [1 :int]} - :rabbit-mq-connection {:host "127.0.0.1" - :port [5672 :int] - :username "guest" - :password "guest" - :channel-timeout [2000 :int]} - :jobs {:instant {:worker-count [4 :int] - :prefetch-count [4 :int]}} - :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" - :exchange-name "application_name_delay_exchange_test" - :dead-letter-exchange "application_name_instant_exchange_test" - :queue-timeout-ms [100 :int]} - :instant {:queue-name "application_name_instant_queue_test" - :exchange-name "application_name_instant_exchange_test"} - :dead-letter {:queue-name "application_name_dead_letter_queue_test" - :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]} - :http-server {:middlewares {:swagger {:enabled false}} - :port [8010 :int] - :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "topic" - :changelog-topic-replication-factor [1 :int] - :channels {:channel-1 {:worker-count [10 :int] - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]}}} - :producer {:bootstrap-servers "localhost:9092" - :acks "all" - :retries 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} - :using-string-serde {:application-id "test" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "another-test-topic" - :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :key-deserializer-encoding "UTF8" - :value-deserializer-encoding "UTF8" - :deserializer-encoding "UTF8" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9092" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9092" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} - :tracer {:enabled [true :bool] - :custom-provider ""} - :new-relic {:report-errors false}}} diff --git a/resources/config.test.cluster.ci.edn b/resources/config.test.cluster.ci.edn deleted file mode 100644 index 5f76a8b3..00000000 --- a/resources/config.test.cluster.ci.edn +++ /dev/null @@ -1,87 +0,0 @@ -{:ziggurat {:app-name "application_name" - :env [:dev :keyword] - :nrepl-server {:port [7011 :int]} - :datadog {:host "localhost" - :port [8126 :int] - :enabled [false :bool]} - :statsd {:host "localhost" - :port [8125 :int] - :enabled [false :bool]} - :sentry {:enabled [false :bool] - :dsn "dummy" - :worker-count [10 :int] - :queue-size [10 :int] - :thread-termination-wait-s [1 :int]} - :rabbit-mq-connection {:host "127.0.0.1" - :port [5672 :int] - :username "guest" - :password "guest" - :channel-timeout [2000 :int]} - :jobs {:instant {:worker-count [4 :int] - :prefetch-count [4 :int]}} - :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" - :exchange-name "application_name_delay_exchange_test" - :dead-letter-exchange "application_name_instant_exchange_test" - :queue-timeout-ms [100 :int]} - :instant {:queue-name "application_name_instant_queue_test" - :exchange-name "application_name_instant_exchange_test"} - :dead-letter {:queue-name "application_name_dead_letter_queue_test" - :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]} - :http-server {:middlewares {:swagger {:enabled false}} - :port [8010 :int] - :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :stream-threads-count [1 :int] - :origin-topic "topic" - :changelog-topic-replication-factor [1 :int] - :channels {:channel-1 {:worker-count [10 :int] - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]}}} - :producer {:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :acks "all" - :retries 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} - :using-string-serde {:application-id "test" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :stream-threads-count [1 :int] - :origin-topic "another-test-topic" - :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :key-deserializer-encoding "UTF8" - :value-deserializer-encoding "UTF8" - :deserializer-encoding "UTF8" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} - :tracer {:enabled [true :bool] - :custom-provider ""} - :new-relic {:report-errors false}}} diff --git a/resources/config.test.cluster.edn b/resources/config.test.cluster.edn deleted file mode 100644 index 5f76a8b3..00000000 --- a/resources/config.test.cluster.edn +++ /dev/null @@ -1,87 +0,0 @@ -{:ziggurat {:app-name "application_name" - :env [:dev :keyword] - :nrepl-server {:port [7011 :int]} - :datadog {:host "localhost" - :port [8126 :int] - :enabled [false :bool]} - :statsd {:host "localhost" - :port [8125 :int] - :enabled [false :bool]} - :sentry {:enabled [false :bool] - :dsn "dummy" - :worker-count [10 :int] - :queue-size [10 :int] - :thread-termination-wait-s [1 :int]} - :rabbit-mq-connection {:host "127.0.0.1" - :port [5672 :int] - :username "guest" - :password "guest" - :channel-timeout [2000 :int]} - :jobs {:instant {:worker-count [4 :int] - :prefetch-count [4 :int]}} - :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" - :exchange-name "application_name_delay_exchange_test" - :dead-letter-exchange "application_name_instant_exchange_test" - :queue-timeout-ms [100 :int]} - :instant {:queue-name "application_name_instant_queue_test" - :exchange-name "application_name_instant_exchange_test"} - :dead-letter {:queue-name "application_name_dead_letter_queue_test" - :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]} - :http-server {:middlewares {:swagger {:enabled false}} - :port [8010 :int] - :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :stream-threads-count [1 :int] - :origin-topic "topic" - :changelog-topic-replication-factor [1 :int] - :channels {:channel-1 {:worker-count [10 :int] - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]}}} - :producer {:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :acks "all" - :retries 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} - :using-string-serde {:application-id "test" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :stream-threads-count [1 :int] - :origin-topic "another-test-topic" - :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" - :key-deserializer-encoding "UTF8" - :value-deserializer-encoding "UTF8" - :deserializer-encoding "UTF8" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :default-api-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} - :tracer {:enabled [true :bool] - :custom-provider ""} - :new-relic {:report-errors false}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index 36bf69e3..5702689b 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -15,7 +15,7 @@ :worker-count [10 :int] :queue-size [10 :int] :thread-termination-wait-s [1 :int]} - :rabbit-mq-connection {:host "localhost" + :rabbit-mq-connection {:host "127.0.0.1" :port [5672 :int] :username "guest" :password "guest" @@ -30,8 +30,8 @@ :exchange-name "application_name_instant_exchange_test"} :dead-letter {:queue-name "application_name_dead_letter_queue_test" :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:count [5 :int] - :type [:linear :keyword] + :retry {:type [:linear :keyword] + :count [5 :int] :enabled [true :bool]} :http-server {:middlewares {:swagger {:enabled false}} :port [8010 :int] diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index e654299e..a0b2a348 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -1,7 +1,6 @@ (ns ziggurat.fixtures (:require [clojure.test :refer :all] [clojure.tools.logging :as log] - [clojure.stacktrace :as st] [mount.core :as mount] [ziggurat.config :as config] [ziggurat.messaging.util :as util] @@ -13,7 +12,8 @@ [langohr.exchange :as le] [langohr.queue :as lq] [ziggurat.tracer :as tracer] - [ziggurat.kafka-consumer.executor-service :refer [thread-pool]]) + [ziggurat.kafka-consumer.executor-service :refer [thread-pool]] + [ziggurat.metrics :as metrics]) (:import (java.util Properties) (org.apache.kafka.clients.producer ProducerConfig) (org.apache.kafka.clients.consumer ConsumerConfig) @@ -24,12 +24,27 @@ ^{:static true} [mountProducer [] void] ^{:static true} [unmountAll [] void]])) -(defn get-config-file-name [] - (or (System/getenv "TEST_CONFIG_FILE") "config.test.edn")) +(defn get-config-file-name [] "config.test.edn") + +(def bootstrap-servers (if (= (System/getenv "TESTING_TYPE") "cluster") + "localhost:9091,localhost:9092,localhost:9093" + "localhost:9092")) + +(defn get-default-or-cluster-config + [m] + (let [keys [[:ziggurat :stream-router :default :bootstrap-servers] + [:ziggurat :stream-router :using-string-serde :bootstrap-servers] + [:ziggurat :batch-routes :consumer-1 :bootstrap-servers] + [:ziggurat :batch-routes :consumer-2 :bootstrap-servers] + [:ziggurat :stream-router :default :producer :bootstrap-servers]]] + (reduce (fn [s k] + (assoc-in s k bootstrap-servers)) + m + keys))) (defn mount-config [] (-> (mount/only [#'config/config]) - (mount/swap {#'config/config (config/config-from-env (get-config-file-name))}) + (mount/swap {#'config/config (get-default-or-cluster-config (config/config-from-env (get-config-file-name)))}) (mount/start))) (defn mount-only-config [f] @@ -45,9 +60,9 @@ (mount/stop))) (defn mount-metrics [f] - (mount/start (mount/only [#'ziggurat.metrics/statsd-reporter])) + (mount/start (mount/only [#'metrics/statsd-reporter])) (f) - (mount/stop #'ziggurat.metrics/statsd-reporter)) + (mount/stop #'metrics/statsd-reporter)) (defn mount-tracer [] (with-redefs [tracer/create-tracer (fn [] (MockTracer.))]