Skip to content

Commit

Permalink
Merge 3d91f18 into 6684f94
Browse files Browse the repository at this point in the history
  • Loading branch information
anmol1vw13 committed Mar 8, 2021
2 parents 6684f94 + 3d91f18 commit dc8c3c6
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 50 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ jobs:
env:
KAFKA_VERSION: 2

run_tests_with_kafka_cluster:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-java@v1
with:
java-version: "1.8"
- uses: DeLaGuardo/setup-clojure@master
with:
lein: "2.8.1"
- uses: actions/checkout@v2

- name: Run Tests on Kafka Cluster
run: ./bin/run_cluster_tests_in_ci.sh
env:
KAFKA_VERSION: 2

calculate_coverage:
runs-on: ubuntu-latest
steps:
Expand All @@ -85,6 +101,7 @@ jobs:
lint_check,
run_tests_with_kafka_1,
run_tests_with_kafka_2,
run_tests_with_kafka_cluster,
calculate_coverage,
]
if: ${{ startsWith(github.ref, 'refs/tags/') }}
Expand Down
16 changes: 14 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,23 @@ setup:
sleep 10
docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(another_test_topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper

setup-cluster: cleanup-cluster
docker-compose -f docker-compose-cluster.yml down
lein deps
docker-compose -f docker-compose-cluster.yml up -d
sleep 30
# Sleeping for 30s to allow the cluster to come up
docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1
docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(another_test_topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1
test: setup
ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test
docker-compose down

test-cluster: setup-cluster
ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test-cluster
docker-compose -f docker-compose-cluster.yml down
cleanup-cluster
cleanup-cluster:
rm -rf /tmp/ziggurat_kafka_cluster_data
coverage: setup
lein code-coverage
docker-compose down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,19 @@ Refer [concepts](doc/CONCEPTS.md) to understand the concepts referred to in this
- Kafka on localhost:9092
- ZooKeeper on localhost:2181
- RabbitMQ on localhost:5672

- Run tests: `make test`

#### Running a cluster set up locally

- `make setup-cluster` This clears up the volume and starts
- 3 Kafka brokers on localhost:9091, localhost:9092 and localhost:9093
- Zookeeper on localhost:2181
- RabbitMQ on localhost:5672

#### Running tests via a cluster
- `make test-cluster`
- This uses `config.test.cluster.edn` instead of `config.test.edn`

## Usage

Add this to your project.clj
Expand Down
7 changes: 7 additions & 0 deletions bin/run_cluster_tests_in_ci.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

set -ex

lein clean
mv -fv resources/config.test.{cluster.ci.edn,cluster.edn}
sudo make test-cluster
85 changes: 85 additions & 0 deletions docker-compose-cluster.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
version: '3'
services:
rabbitmq:
image: 'rabbitmq:3.8.2-management-alpine'
ports:
- '5672:5672'
- '15672:15672'
container_name: 'ziggurat_rabbitmq'

zookeeper:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
ZOO_TICK_TIME: 2000
volumes:
- /tmp/ziggurat_kafka_cluster_data/zookeeper/data:/data
- /tmp/ziggurat_kafka_cluster_data/zookeeper/datalog:/datalog

kafka1:
image: confluentinc/cp-kafka:5.5.3
cap_add:
- NET_ADMIN
- SYS_ADMIN
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper

kafka2:
image: confluentinc/cp-kafka:5.5.3
cap_add:
- NET_ADMIN
- SYS_ADMIN
hostname: kafka2
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka2/data:/var/lib/kafka/data
depends_on:
- zookeeper

kafka3:
image: confluentinc/cp-kafka:5.5.3
cap_add:
- NET_ADMIN
- SYS_ADMIN
hostname: kafka3
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka3/data:/var/lib/kafka/data
depends_on:
- zookeeper
23 changes: 13 additions & 10 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,26 @@
:username :env/clojars_username
:password :env/clojars_password
:sign-releases false}]]
:plugins [[lein-shell "0.5.0"]]
:pedantic? :warn
:java-source-paths ["src/com"]
:aliases {"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--lcov"]}
:aliases {"code-coverage" ["with-profile" "test" "cloverage" "--output" "coverage" "--lcov"]
"test-cluster" ["shell" "lein" "test"]}
:shell {:env {"TEST_CONFIG_FILE" "config.test.cluster.edn"}}
:aot [ziggurat.kafka-consumer.invalid-return-type-exception]
:profiles {:uberjar {:aot :all
:global-vars {*warn-on-reflection* true}
:pedantic? :abort}
:test {:java-source-paths ["src/com" "test/com"]
:jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"]
:dependencies [[com.google.protobuf/protobuf-java "3.5.1"]
[junit/junit "4.12"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka-streams "2.4.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.4.1" :classifier "test"]
[org.clojure/test.check "0.10.0"]]
:plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"]
:dependencies [[com.google.protobuf/protobuf-java "3.5.1"]
[junit/junit "4.12"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka-streams "2.4.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.4.1" :classifier "test"]
[org.clojure/test.check "0.10.0"]]
:plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[lein-ancient "0.6.15"]
[lein-cljfmt "0.6.4"]
[lein-cloverage "1.0.13"]
Expand Down
87 changes: 87 additions & 0 deletions resources/config.test.cluster.ci.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
{:ziggurat {:app-name "application_name"
:env [:dev :keyword]
:nrepl-server {:port [7011 :int]}
:datadog {:host "localhost"
:port [8126 :int]
:enabled [false :bool]}
:statsd {:host "localhost"
:port [8125 :int]
:enabled [false :bool]}
:sentry {:enabled [false :bool]
:dsn "dummy"
:worker-count [10 :int]
:queue-size [10 :int]
:thread-termination-wait-s [1 :int]}
:rabbit-mq-connection {:host "127.0.0.1"
:port [5672 :int]
:username "guest"
:password "guest"
:channel-timeout [2000 :int]}
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
:rabbit-mq {:delay {:queue-name "application_name_delay_queue_test"
:exchange-name "application_name_delay_exchange_test"
:dead-letter-exchange "application_name_instant_exchange_test"
:queue-timeout-ms [100 :int]}
:instant {:queue-name "application_name_instant_queue_test"
:exchange-name "application_name_instant_exchange_test"}
:dead-letter {:queue-name "application_name_dead_letter_queue_test"
:exchange-name "application_name_dead_letter_exchange_test"}}
:retry {:type [:linear :keyword]
:count [5 :int]
:enabled [true :bool]}
:http-server {:middlewares {:swagger {:enabled false}}
:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093"
:stream-threads-count [1 :int]
:origin-topic "topic"
:upgrade-from "1.1"
:changelog-topic-replication-factor [1 :int]
:channels {:channel-1 {:worker-count [10 :int]
:retry {:type [:linear :keyword]
:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093"
:acks "all"
:retries 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer-class "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}}
:using-string-serde {:application-id "test"
:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093"
:stream-threads-count [1 :int]
:origin-topic "another-test-topic"
:default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:key-deserializer-encoding "UTF8"
:value-deserializer-encoding "UTF8"
:deserializer-encoding "UTF8"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002"
:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093"
:max-poll-records [1000 :int]
:origin-topic "topic"
:commit-interval-ms [5000 :int]
:poll-timeout-ms-config [1000 :int]
:thread-count [2 :int]
:session-timeout-ms-config [60000 :int]
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}
:consumer-2 {:consumer-group-id "test-consumer-2002"
:bootstrap-servers "localhost:9091,localhost:9092,localhost:9093"
:max-poll-records [2000 :int]
:origin-topic "topic"
:poll-timeout-ms-config [1000 :int]
:thread-count [4 :int]
:session-timeout-ms-config [60000 :int]
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:tracer {:enabled [true :bool]
:custom-provider ""}
:new-relic {:report-errors false}}}

0 comments on commit dc8c3c6

Please sign in to comment.