diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e4c24ccb..063a0963 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -59,6 +59,22 @@ jobs: env: KAFKA_VERSION: 2 + run_tests_with_kafka_cluster: + runs-on: ubuntu-latest + steps: + - uses: actions/setup-java@v1 + with: + java-version: "1.8" + - uses: DeLaGuardo/setup-clojure@master + with: + lein: "2.8.1" + - uses: actions/checkout@v2 + + - name: Run Tests on Kafka Cluster + run: ./bin/run_cluster_tests_in_ci.sh + env: + KAFKA_VERSION: 2 + calculate_coverage: runs-on: ubuntu-latest steps: @@ -85,6 +101,7 @@ jobs: lint_check, run_tests_with_kafka_1, run_tests_with_kafka_2, + run_tests_with_kafka_cluster, calculate_coverage, ] if: ${{ startsWith(github.ref, 'refs/tags/') }} diff --git a/Makefile b/Makefile index 0d6b4eed..73237cba 100644 --- a/Makefile +++ b/Makefile @@ -11,11 +11,21 @@ setup: sleep 10 docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(another_test_topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper - test: setup ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test docker-compose down - +setup-cluster: rm -rf /tmp/ziggurat_kafka_cluster_data + docker-compose -f docker-compose-cluster.yml down + lein deps + docker-compose -f docker-compose-cluster.yml up -d + sleep 30 +# Sleeping for 30s to allow the cluster to come up + docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1 + docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(another_test_topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1 +test-cluster: setup-cluster + ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test-cluster + docker-compose -f docker-compose-cluster.yml down + rm -rf /tmp/ziggurat_kafka_cluster_data coverage: setup lein code-coverage docker-compose down diff --git a/README.md b/README.md index 0406281b..e8b059e1 100644 --- a/README.md +++ b/README.md @@ -57,9 +57,19 @@ Refer [concepts](doc/CONCEPTS.md) to understand the concepts referred to in this - Kafka on localhost:9092 - ZooKeeper on localhost:2181 - RabbitMQ on localhost:5672 - - Run tests: `make test` +#### Running a cluster set up locally + +- `make setup-cluster` This clears up the volume and starts + - 3 Kafka brokers on localhost:9091, localhost:9092 and localhost:9093 + - Zookeeper on localhost:2181 + - RabbitMQ on localhost:5672 + +#### Running tests via a cluster +- `make test-cluster` + - This uses `config.test.cluster.edn` instead of `config.test.edn` + ## Usage Add this to your project.clj diff --git a/bin/run_cluster_tests_in_ci.sh b/bin/run_cluster_tests_in_ci.sh new file mode 100755 index 00000000..9124dcf3 --- /dev/null +++ b/bin/run_cluster_tests_in_ci.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +set -ex + +lein clean +mv -fv resources/config.test.{cluster.ci.edn,cluster.edn} +sudo make test-cluster diff --git a/docker-compose-cluster.yml b/docker-compose-cluster.yml new file mode 100644 index 00000000..7dcde5ca --- /dev/null +++ b/docker-compose-cluster.yml @@ -0,0 +1,85 @@ +version: '3' +services: + rabbitmq: + image: 'rabbitmq:3.8.2-management-alpine' + ports: + - '5672:5672' + - '15672:15672' + container_name: 'ziggurat_rabbitmq' + + zookeeper: + image: zookeeper:3.4.9 + hostname: zookeeper + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_SERVERS: server.1=zookeeper:2888:3888 + ZOO_TICK_TIME: 2000 + volumes: + - /tmp/ziggurat_kafka_cluster_data/zookeeper/data:/data + - /tmp/ziggurat_kafka_cluster_data/zookeeper/datalog:/datalog + + kafka1: + image: confluentinc/cp-kafka:5.5.3 + cap_add: + - NET_ADMIN + - SYS_ADMIN + hostname: kafka1 + ports: + - "9091:9091" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_BROKER_ID: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 3 + KAFKA_NUM_PARTITIONS: 3 + volumes: + - /tmp/ziggurat_kafka_cluster_data/kafka1/data:/var/lib/kafka/data + depends_on: + - zookeeper + + kafka2: + image: confluentinc/cp-kafka:5.5.3 + cap_add: + - NET_ADMIN + - SYS_ADMIN + hostname: kafka2 + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_BROKER_ID: 2 + KAFKA_DEFAULT_REPLICATION_FACTOR: 3 + KAFKA_NUM_PARTITIONS: 3 + volumes: + - /tmp/ziggurat_kafka_cluster_data/kafka2/data:/var/lib/kafka/data + depends_on: + - zookeeper + + kafka3: + image: confluentinc/cp-kafka:5.5.3 + cap_add: + - NET_ADMIN + - SYS_ADMIN + hostname: kafka3 + ports: + - "9093:9093" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_BROKER_ID: 3 + KAFKA_DEFAULT_REPLICATION_FACTOR: 3 + KAFKA_NUM_PARTITIONS: 3 + volumes: + - /tmp/ziggurat_kafka_cluster_data/kafka3/data:/var/lib/kafka/data + depends_on: + - zookeeper diff --git a/project.clj b/project.clj index 6afa4645..877d9b07 100644 --- a/project.clj +++ b/project.clj @@ -54,23 +54,26 @@ :username :env/clojars_username :password :env/clojars_password :sign-releases false}]] + :plugins [[lein-shell "0.5.0"]] :pedantic? :warn :java-source-paths ["src/com"] - :aliases {"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--lcov"]} + :aliases {"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--lcov"] + "test-cluster" ["shell" "lein" "test"]} + :shell {:env {"TEST_CONFIG_FILE" "config.test.cluster.edn"}} :aot [ziggurat.kafka-consumer.invalid-return-type-exception] :profiles {:uberjar {:aot :all :global-vars {*warn-on-reflection* true} :pedantic? :abort} :test {:java-source-paths ["src/com" "test/com"] - :jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"] - :dependencies [[com.google.protobuf/protobuf-java "3.5.1"] - [junit/junit "4.12"] - [org.hamcrest/hamcrest-core "2.2"] - [org.apache.kafka/kafka-streams "2.4.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] - [org.apache.kafka/kafka-clients "2.4.1" :classifier "test"] - [org.clojure/test.check "0.10.0"]] - :plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]] - :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} + :jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"] + :dependencies [[com.google.protobuf/protobuf-java "3.5.1"] + [junit/junit "4.12"] + [org.hamcrest/hamcrest-core "2.2"] + [org.apache.kafka/kafka-streams "2.4.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] + [org.apache.kafka/kafka-clients "2.4.1" :classifier "test"] + [org.clojure/test.check "0.10.0"]] + :plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]] + :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} :dev {:plugins [[lein-ancient "0.6.15"] [lein-cljfmt "0.6.4"] [lein-cloverage "1.0.13"] diff --git a/resources/config.test.cluster.ci.edn b/resources/config.test.cluster.ci.edn new file mode 100644 index 00000000..b19b1767 --- /dev/null +++ b/resources/config.test.cluster.ci.edn @@ -0,0 +1,87 @@ +{:ziggurat {:app-name "application_name" + :env [:dev :keyword] + :nrepl-server {:port [7011 :int]} + :datadog {:host "localhost" + :port [8126 :int] + :enabled [false :bool]} + :statsd {:host "localhost" + :port [8125 :int] + :enabled [false :bool]} + :sentry {:enabled [false :bool] + :dsn "dummy" + :worker-count [10 :int] + :queue-size [10 :int] + :thread-termination-wait-s [1 :int]} + :rabbit-mq-connection {:host "127.0.0.1" + :port [5672 :int] + :username "guest" + :password "guest" + :channel-timeout [2000 :int]} + :jobs {:instant {:worker-count [4 :int] + :prefetch-count [4 :int]}} + :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" + :exchange-name "application_name_delay_exchange_test" + :dead-letter-exchange "application_name_instant_exchange_test" + :queue-timeout-ms [100 :int]} + :instant {:queue-name "application_name_instant_queue_test" + :exchange-name "application_name_instant_exchange_test"} + :dead-letter {:queue-name "application_name_dead_letter_queue_test" + :exchange-name "application_name_dead_letter_exchange_test"}} + :retry {:type [:linear :keyword] + :count [5 :int] + :enabled [true :bool]} + :http-server {:middlewares {:swagger {:enabled false}} + :port [8010 :int] + :thread-count [100 :int]} + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :stream-threads-count [1 :int] + :origin-topic "topic" + :upgrade-from "1.1" + :changelog-topic-replication-factor [1 :int] + :channels {:channel-1 {:worker-count [10 :int] + :retry {:type [:linear :keyword] + :count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :acks "all" + :retries 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} + :using-string-serde {:application-id "test" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :stream-threads-count [1 :int] + :origin-topic "another-test-topic" + :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" + :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" + :key-deserializer-encoding "UTF8" + :value-deserializer-encoding "UTF8" + :deserializer-encoding "UTF8" + :upgrade-from "1.1" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}} + :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :max-poll-records [1000 :int] + :origin-topic "topic" + :commit-interval-ms [5000 :int] + :poll-timeout-ms-config [1000 :int] + :thread-count [2 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} + :consumer-2 {:consumer-group-id "test-consumer-2002" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :max-poll-records [2000 :int] + :origin-topic "topic" + :poll-timeout-ms-config [1000 :int] + :thread-count [4 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} + :tracer {:enabled [true :bool] + :custom-provider ""} + :new-relic {:report-errors false}}} diff --git a/resources/config.test.cluster.edn b/resources/config.test.cluster.edn new file mode 100644 index 00000000..b19b1767 --- /dev/null +++ b/resources/config.test.cluster.edn @@ -0,0 +1,87 @@ +{:ziggurat {:app-name "application_name" + :env [:dev :keyword] + :nrepl-server {:port [7011 :int]} + :datadog {:host "localhost" + :port [8126 :int] + :enabled [false :bool]} + :statsd {:host "localhost" + :port [8125 :int] + :enabled [false :bool]} + :sentry {:enabled [false :bool] + :dsn "dummy" + :worker-count [10 :int] + :queue-size [10 :int] + :thread-termination-wait-s [1 :int]} + :rabbit-mq-connection {:host "127.0.0.1" + :port [5672 :int] + :username "guest" + :password "guest" + :channel-timeout [2000 :int]} + :jobs {:instant {:worker-count [4 :int] + :prefetch-count [4 :int]}} + :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" + :exchange-name "application_name_delay_exchange_test" + :dead-letter-exchange "application_name_instant_exchange_test" + :queue-timeout-ms [100 :int]} + :instant {:queue-name "application_name_instant_queue_test" + :exchange-name "application_name_instant_exchange_test"} + :dead-letter {:queue-name "application_name_dead_letter_queue_test" + :exchange-name "application_name_dead_letter_exchange_test"}} + :retry {:type [:linear :keyword] + :count [5 :int] + :enabled [true :bool]} + :http-server {:middlewares {:swagger {:enabled false}} + :port [8010 :int] + :thread-count [100 :int]} + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :stream-threads-count [1 :int] + :origin-topic "topic" + :upgrade-from "1.1" + :changelog-topic-replication-factor [1 :int] + :channels {:channel-1 {:worker-count [10 :int] + :retry {:type [:linear :keyword] + :count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :acks "all" + :retries 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} + :using-string-serde {:application-id "test" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :stream-threads-count [1 :int] + :origin-topic "another-test-topic" + :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" + :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" + :key-deserializer-encoding "UTF8" + :value-deserializer-encoding "UTF8" + :deserializer-encoding "UTF8" + :upgrade-from "1.1" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}} + :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :max-poll-records [1000 :int] + :origin-topic "topic" + :commit-interval-ms [5000 :int] + :poll-timeout-ms-config [1000 :int] + :thread-count [2 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} + :consumer-2 {:consumer-group-id "test-consumer-2002" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :max-poll-records [2000 :int] + :origin-topic "topic" + :poll-timeout-ms-config [1000 :int] + :thread-count [4 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} + :tracer {:enabled [true :bool] + :custom-provider ""} + :new-relic {:report-errors false}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index c64ec290..e204d90c 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -36,24 +36,24 @@ :http-server {:middlewares {:swagger {:enabled false}} :port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "topic" - :upgrade-from "1.1" - :consumer-type :default + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :upgrade-from "1.1" + :consumer-type :default :changelog-topic-replication-factor [1 :int] - :channels {:channel-1 {:worker-count [10 :int] - :retry {:type [:linear :keyword] - :count [5 :int] - :enabled [true :bool]}}} - :producer {:bootstrap-servers "localhost:9092" - :acks "all" - :retries 5 - :max-in-flight-requests-per-connection 5 - :enable-idempotence false - :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} + :channels {:channel-1 {:worker-count [10 :int] + :retry {:type [:linear :keyword] + :count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} :using-string-serde {:application-id "test" :bootstrap-servers "localhost:9092" :stream-threads-count [1 :int] diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj index c1cf35fd..40ecaa03 100644 --- a/src/ziggurat/producer.clj +++ b/src/ziggurat/producer.clj @@ -54,8 +54,8 @@ (io.opentracing.contrib.kafka TracingKafkaProducer)) (:gen-class :name tech.gojek.ziggurat.internal.Producer - :methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future] - ^{:static true} [send [String String int Object Object] java.util.concurrent.Future]])) + :methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future] + ^{:static true} [send [String String int Object Object] java.util.concurrent.Future]])) (defn *implements-serializer?* [serializer-class] (contains? (set (.getInterfaces (Class/forName serializer-class))) diff --git a/test/ziggurat/config_test.clj b/test/ziggurat/config_test.clj index a180f3b1..216698fa 100644 --- a/test/ziggurat/config_test.clj +++ b/test/ziggurat/config_test.clj @@ -2,6 +2,7 @@ (:require [clojure.test :refer [deftest is testing]] [clonfig.core :as clonfig] [mount.core :as mount] + [ziggurat.fixtures :as f] [ziggurat.config :refer [-get -getIn channel-retry-config @@ -17,11 +18,11 @@ (testing "calls clonfig" (let [config-values-from-env {:key "val"}] (with-redefs [clonfig/read-config (fn [_] config-values-from-env)] - (is (= config-values-from-env (config-from-env "config.test.edn"))))))) + (is (= config-values-from-env (config-from-env (f/get-config-file-name)))))))) (deftest config-test (testing "returns merged config from env variables and default values with env variables taking higher precedence" - (let [config-filename "config.test.edn" + (let [config-filename (f/get-config-file-name) config-values-from-env (-> (config-from-env config-filename) (update-in [:ziggurat] dissoc :nrepl-server))] (with-redefs [config-from-env (fn [_] config-values-from-env) @@ -34,7 +35,7 @@ (testing "returns default interpolated rabbitmq config when not present in env variables" (let [app-name "application_name" - config-filename "config.test.edn" + config-filename (f/get-config-file-name) config-values-from-env (-> (config-from-env config-filename) (update-in [:ziggurat :rabbit-mq] dissoc :delay) (assoc-in [:ziggurat :app-name] app-name)) @@ -50,7 +51,7 @@ (deftest ziggurat-config-test (testing "returns ziggurat config" - (let [config-filename "config.test.edn" + (let [config-filename (f/get-config-file-name) config-values-from-env (config-from-env config-filename)] (with-redefs [config-from-env (fn [_] config-values-from-env) config-file config-filename] @@ -60,7 +61,7 @@ (deftest rabbitmq-config-test (testing "returns rabbitmq config" - (let [config-filename "config.test.edn" + (let [config-filename (f/get-config-file-name) config-values-from-env (config-from-env config-filename)] (with-redefs [config-from-env (fn [_] config-values-from-env) config-file config-filename] @@ -70,7 +71,7 @@ (deftest statsd-config-test (testing "returns statsd config using the :statsd key or :datadog key" - (let [config-filename "config.test.edn" ;; inside config.test.edn, both :datadog and :statsd keys are present + (let [config-filename (f/get-config-file-name) ;; inside config.test.edn, both :datadog and :statsd keys are present config-values-from-env (config-from-env config-filename)] (with-redefs [config-from-env (fn [_] config-values-from-env) config-file config-filename] @@ -96,7 +97,7 @@ (deftest get-in-config-test (testing "returns config for key passed" - (let [config-filename "config.test.edn" + (let [config-filename (f/get-config-file-name) config-values-from-env (config-from-env config-filename)] (with-redefs [config-from-env (fn [_] config-values-from-env) config-file config-filename] @@ -104,7 +105,7 @@ (is (= (-> config-values-from-env :ziggurat :http-server :port) (get-in-config [:http-server :port]))) (mount/stop)))) (testing "returns config for key passed with default" - (let [config-filename "config.test.edn" + (let [config-filename (f/get-config-file-name) config-values-from-env (config-from-env config-filename) default "test"] (with-redefs [config-from-env (fn [_] config-values-from-env) @@ -115,7 +116,7 @@ (deftest channel-retry-config-test (testing "returns channel retry config" - (let [config-filename "config.test.edn" + (let [config-filename (f/get-config-file-name) config-values-from-env (config-from-env config-filename) topic-entity :default channel :channel-1] diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index 3c5b78fe..e654299e 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -24,9 +24,12 @@ ^{:static true} [mountProducer [] void] ^{:static true} [unmountAll [] void]])) +(defn get-config-file-name [] + (or (System/getenv "TEST_CONFIG_FILE") "config.test.edn")) + (defn mount-config [] (-> (mount/only [#'config/config]) - (mount/swap {#'config/config (config/config-from-env "config.test.edn")}) + (mount/swap {#'config/config (config/config-from-env (get-config-file-name))}) (mount/start))) (defn mount-only-config [f] @@ -145,7 +148,7 @@ (mount-config) (mount-tracer) (mount-producer) - (binding [*bootstrap-servers* "localhost:9092"] + (binding [*bootstrap-servers* (get-in (config/ziggurat-config) [:stream-router :default :bootstrap-servers])] (binding [*consumer-properties* (doto (Properties.) (.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG, *bootstrap-servers*) (.put ConsumerConfig/GROUP_ID_CONFIG, "ziggurat-consumer") diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj index 26624b95..b586e6b3 100644 --- a/test/ziggurat/producer_test.clj +++ b/test/ziggurat/producer_test.clj @@ -16,11 +16,11 @@ (def valid-config {:key-serializer-class "org.apache.kafka.common.serialization.StringSerializer" :value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" - :bootstrap-servers "localhost:8000"}) + :bootstrap-servers "valid_bootstrap_server1, valid_bootstrap_server2, valid_bootstrap_server3"}) (defn stream-router-config-without-producer []) (:stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9092" + :bootstrap-servers (get-in (ziggurat-config) [:stream-router :default :bootstrap-servers]) :stream-threads-count [1 :int] :origin-topic "topic" :channels {:channel-1 {:worker-count [10 :int] @@ -34,7 +34,7 @@ key "message" value "Hello World!!"] (send :default topic key value) - (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 2000)] + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 5000)] (is (= value (.value (first result)))))))) (deftest send-data-with-topic-key-partition-and-value-test @@ -45,7 +45,7 @@ value "Hello World!!" partition (int 0)] (send :default topic partition key value) - (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 2000)] + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 5000)] (is (= value (.value (first result)))))))) (deftest send-throws-exception-when-no-producers-are-configured @@ -75,7 +75,7 @@ value "Hello World!!"] (.reset tracer) (send :default topic key value) - (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 2000) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 5000) finished-spans (.finishedSpans tracer)] (is (= value (.value (first result)))) (is (= 1 (.size finished-spans))) @@ -128,9 +128,9 @@ (testing "with incorrect config" (let [valid-config (assoc valid-config :linger-ms-foo "1")] (is (thrown? java.lang.RuntimeException (producer-properties valid-config)))) - (let [valid-config (update valid-config :key-serializer-class (constantly "java.time.Clock"))] + (let [valid-config (update valid-config :key-serializer-class (constantly "java.time.Clock"))] (is (thrown? java.lang.RuntimeException (producer-properties valid-config)))) - (let [valid-config (update valid-config :key-serializer-class (constantly "java.foo.Bar"))] + (let [valid-config (update valid-config :key-serializer-class (constantly "java.foo.Bar"))] (is (thrown? java.lang.RuntimeException (producer-properties valid-config)))) (let [valid-config (dissoc valid-config :bootstrap-servers)] (is (thrown? java.lang.RuntimeException (producer-properties valid-config))))))