From 843c2bd22ecfd8844e2e70ce74732057913e044f Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Thu, 8 Apr 2021 14:06:00 +0530 Subject: [PATCH] Updated kafka streams to 2.6.1 and some minor refactoring in streams-test --- project.clj | 6 ++--- resources/config.test.ci.edn | 40 +++++++++++++--------------- resources/config.test.cluster.ci.edn | 40 +++++++++++++--------------- resources/config.test.cluster.edn | 40 +++++++++++++--------------- resources/config.test.edn | 2 -- test/ziggurat/streams_test.clj | 14 +++++++--- 6 files changed, 70 insertions(+), 72 deletions(-) diff --git a/project.clj b/project.clj index b5d6c9ee..62d2481e 100644 --- a/project.clj +++ b/project.clj @@ -29,7 +29,7 @@ [io.opentracing.contrib/opentracing-kafka-client "0.1.15" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.slf4j/slf4j-api org.xerial.snappy/snappy-java]] [io.opentracing.contrib/opentracing-rabbitmq-client "0.1.5" :exclusions [com.rabbitmq/amqp-client]] [org.apache.httpcomponents/fluent-hc "4.5.4"] - [org.apache.kafka/kafka-streams "2.5.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]] + [org.apache.kafka/kafka-streams "2.6.1" :exclusions [org.slf4j/slf4j-log4j12 log4j]] [org.apache.logging.log4j/log4j-core "2.12.1"] [org.apache.logging.log4j/log4j-slf4j-impl "2.12.1"] [org.clojure/clojure "1.10.0"] @@ -73,8 +73,8 @@ :dependencies [[com.google.protobuf/protobuf-java "3.5.1"] [junit/junit "4.12"] [org.hamcrest/hamcrest-core "2.2"] - [org.apache.kafka/kafka-streams "2.5.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] - [org.apache.kafka/kafka-clients "2.5.0" :classifier "test"] + [org.apache.kafka/kafka-streams "2.6.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] + [org.apache.kafka/kafka-clients "2.6.1" :classifier "test"] [org.clojure/test.check "0.10.0"]] :plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]] :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index 2a54190e..506b5e22 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -37,7 +37,6 @@ :bootstrap-servers "localhost:9092" :stream-threads-count [1 :int] :origin-topic "topic" - :upgrade-from "1.1" :changelog-topic-replication-factor [1 :int] :channels {:channel-1 {:worker-count [10 :int] :retry {:type [:linear :keyword] @@ -59,29 +58,28 @@ :key-deserializer-encoding "UTF8" :value-deserializer-encoding "UTF8" :deserializer-encoding "UTF8" - :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9092" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9092" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} + :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" + :bootstrap-servers "localhost:9092" + :max-poll-records [1000 :int] + :origin-topic "topic" + :commit-interval-ms [5000 :int] + :poll-timeout-ms-config [1000 :int] + :thread-count [2 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} + :consumer-2 {:consumer-group-id "test-consumer-2002" + :bootstrap-servers "localhost:9092" + :max-poll-records [2000 :int] + :origin-topic "topic" + :poll-timeout-ms-config [1000 :int] + :thread-count [4 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} :tracer {:enabled [true :bool] :custom-provider ""} :new-relic {:report-errors false}}} diff --git a/resources/config.test.cluster.ci.edn b/resources/config.test.cluster.ci.edn index b19b1767..3de57472 100644 --- a/resources/config.test.cluster.ci.edn +++ b/resources/config.test.cluster.ci.edn @@ -37,7 +37,6 @@ :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" :stream-threads-count [1 :int] :origin-topic "topic" - :upgrade-from "1.1" :changelog-topic-replication-factor [1 :int] :channels {:channel-1 {:worker-count [10 :int] :retry {:type [:linear :keyword] @@ -59,29 +58,28 @@ :key-deserializer-encoding "UTF8" :value-deserializer-encoding "UTF8" :deserializer-encoding "UTF8" - :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} + :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :max-poll-records [1000 :int] + :origin-topic "topic" + :commit-interval-ms [5000 :int] + :poll-timeout-ms-config [1000 :int] + :thread-count [2 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} + :consumer-2 {:consumer-group-id "test-consumer-2002" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :max-poll-records [2000 :int] + :origin-topic "topic" + :poll-timeout-ms-config [1000 :int] + :thread-count [4 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} :tracer {:enabled [true :bool] :custom-provider ""} :new-relic {:report-errors false}}} diff --git a/resources/config.test.cluster.edn b/resources/config.test.cluster.edn index b19b1767..3de57472 100644 --- a/resources/config.test.cluster.edn +++ b/resources/config.test.cluster.edn @@ -37,7 +37,6 @@ :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" :stream-threads-count [1 :int] :origin-topic "topic" - :upgrade-from "1.1" :changelog-topic-replication-factor [1 :int] :channels {:channel-1 {:worker-count [10 :int] :retry {:type [:linear :keyword] @@ -59,29 +58,28 @@ :key-deserializer-encoding "UTF8" :value-deserializer-encoding "UTF8" :deserializer-encoding "UTF8" - :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}} - :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [1000 :int] - :origin-topic "topic" - :commit-interval-ms [5000 :int] - :poll-timeout-ms-config [1000 :int] - :thread-count [2 :int] - :session-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} - :consumer-2 {:consumer-group-id "test-consumer-2002" - :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" - :max-poll-records [2000 :int] - :origin-topic "topic" - :poll-timeout-ms-config [1000 :int] - :thread-count [4 :int] - :session-timeout-ms-config [60000 :int] - :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" - :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} + :batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :max-poll-records [1000 :int] + :origin-topic "topic" + :commit-interval-ms [5000 :int] + :poll-timeout-ms-config [1000 :int] + :thread-count [2 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"} + :consumer-2 {:consumer-group-id "test-consumer-2002" + :bootstrap-servers "localhost:9091,localhost:9092,localhost:9093" + :max-poll-records [2000 :int] + :origin-topic "topic" + :poll-timeout-ms-config [1000 :int] + :thread-count [4 :int] + :session-timeout-ms-config [60000 :int] + :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" + :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}} :tracer {:enabled [true :bool] :custom-provider ""} :new-relic {:report-errors false}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index e204d90c..1a9e6ba9 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -40,7 +40,6 @@ :bootstrap-servers "localhost:9092" :stream-threads-count [1 :int] :origin-topic "topic" - :upgrade-from "1.1" :consumer-type :default :changelog-topic-replication-factor [1 :int] :channels {:channel-1 {:worker-count [10 :int] @@ -63,7 +62,6 @@ :key-deserializer-encoding "UTF8" :value-deserializer-encoding "UTF8" :deserializer-encoding "UTF8" - :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}} diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 8268b808..bde15ded 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -215,7 +215,7 @@ (start-stream :invalid-topic-entity) (is (= @is-close-called 0))))) -(deftest start-stream-joins-test +(deftest start-stream-joins-test-with-inner-join (testing "stream joins using inner join" (let [orig-config (ziggurat-config)] (with-redefs [config/ziggurat-config (fn [] (-> orig-config @@ -243,7 +243,9 @@ (MockTime.)) (Thread/sleep 5000) ;;wating for streams to consume messages (stop-streams streams) - (is (= times @message-received-count)))))) + (is (= times @message-received-count))))))) + +(deftest start-stream-joins-test-with-left-join (testing "stream joins using left join" (let [orig-config (ziggurat-config)] (with-redefs [config/ziggurat-config (fn [] (-> orig-config @@ -271,7 +273,9 @@ (MockTime.)) (Thread/sleep 5000) ;;wating for streams to consume messages (stop-streams streams) - (is (= times @message-received-count)))))) + (is (= times @message-received-count))))))) + +(deftest start-stream-joins-test-with-outer-join (testing "stream joins using outer join" (let [orig-config (ziggurat-config)] (with-redefs [config/ziggurat-config (fn [] (-> orig-config @@ -299,7 +303,9 @@ (MockTime.)) (Thread/sleep 5000) ;;wating for streams to consume messages (stop-streams streams) - (is (= times @message-received-count)))))) + (is (= times @message-received-count))))))) + +(deftest start-stream-joins-test-alpha-features-test (testing "stream-joins should not start if :alpha-features for stream-joins is `false`" (let [original-config (ziggurat-config)] (with-redefs [ziggurat-config (fn [] (-> original-config