From c6b88d17ad2a0becd5832fd9bd38c03f94309ea0 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Tue, 19 Mar 2019 16:58:47 +0530 Subject: [PATCH 01/17] Adds a docker-compose.yml and removes embedded kafka from integration tests --- docker-compose.yml | 17 +++++++++++++++++ test/ziggurat/streams_test.clj | 16 ++++++---------- 2 files changed, 23 insertions(+), 10 deletions(-) create mode 100644 docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..6b072bcf --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,17 @@ +version: '3.7' + +services: + zookeeper: + image: 'bitnami/zookeeper:latest' + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka1: + image: 'bitnami/kafka:latest' + ports: + - '9092:9092' + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 + - ALLOW_PLAINTEXT_LISTENER=yes \ No newline at end of file diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 6987a66b..407a4478 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -32,20 +32,17 @@ (swap! message-received-count inc)) :success)] (let [topic "topic" - cluster (doto (EmbeddedKafkaCluster. 1) (.start)) - bootstrap-serves (.bootstrapServers cluster) times 6 oldest-processed-message-in-s 10 kvs (repeat times (KeyValue/pair (create-photo) (create-photo))) props (doto (Properties.) - (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-serves) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers])) (.put ProducerConfig/ACKS_CONFIG "all") (.put ProducerConfig/RETRIES_CONFIG (int 0)) (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer") (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")) - _ (.createTopic cluster topic) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map - (assoc-in [:stream-router :vehicle :bootstrap-servers] bootstrap-serves) + (assoc-in [:stream-router :vehicle :bootstrap-servers] (get-in config-map [:stream-router :vehicle :bootstrap-servers])) (assoc-in [:stream-router :vehicle :oldest-processed-message-in-s] oldest-processed-message-in-s) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] (Thread/sleep 20000) ;;waiting for streams to start @@ -54,6 +51,7 @@ (stop-streams streams) (is (= 0 @message-received-count)))))) + (deftest start-streams-test (let [message-received-count (atom 0)] (with-redefs [mapped-fn (fn [message-from-kafka] @@ -61,19 +59,17 @@ (swap! message-received-count inc)) :success)] (let [topic "topic" - cluster (doto (EmbeddedKafkaCluster. 1) (.start)) - bootstrap-serves (.bootstrapServers cluster) times 6 kvs (repeat times (KeyValue/pair (create-photo) (create-photo))) + _ (spit "proto" (create-photo)) props (doto (Properties.) - (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-serves) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers])) (.put ProducerConfig/ACKS_CONFIG "all") (.put ProducerConfig/RETRIES_CONFIG (int 0)) (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer") (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")) - _ (.createTopic cluster topic) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map - (assoc-in [:stream-router :vehicle :bootstrap-servers] bootstrap-serves) + (assoc-in [:stream-router :vehicle :bootstrap-servers] (get-in config-map [:stream-router :vehicle :bootstrap-servers])) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] (Thread/sleep 20000) ;;waiting for streams to start (IntegrationTestUtils/produceKeyValuesSynchronously topic kvs props (MockTime.)) From a5ba441a1a3d3f318b55b7494e0a34f23d86856c Mon Sep 17 00:00:00 2001 From: Anirudh Date: Tue, 19 Mar 2019 17:01:44 +0530 Subject: [PATCH 02/17] Adds docker-compose commands to travis-ci --- .travis.yml | 3 +++ docker-compose.yml | 2 +- test/ziggurat/streams_test.clj | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index ca860a28..1452a476 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: clojure services: - rabbitmq +- docker stages: - test - name: deploy @@ -13,7 +14,9 @@ jobs: - lein deps - mv -fv resources/config.test.{ci.edn,edn} - lein cljfmt check + - docker-compose up - lein test-all + - docker-compose down after_script: - lein code-coverage - curl --form 'json_file=@coverage/coveralls.json' "${COVERALLS_URL}" diff --git a/docker-compose.yml b/docker-compose.yml index 6b072bcf..573d2641 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.7' +version: '3.3' services: zookeeper: diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 407a4478..50a02b06 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -51,7 +51,6 @@ (stop-streams streams) (is (= 0 @message-received-count)))))) - (deftest start-streams-test (let [message-received-count (atom 0)] (with-redefs [mapped-fn (fn [message-from-kafka] From 743860fdf65fccca1b95cf0ae469a0825a558062 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Tue, 19 Mar 2019 17:14:46 +0530 Subject: [PATCH 03/17] runs docker-compose on travis in detached mode --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 1452a476..9fecb285 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,7 +14,7 @@ jobs: - lein deps - mv -fv resources/config.test.{ci.edn,edn} - lein cljfmt check - - docker-compose up + - docker-compose up -d - lein test-all - docker-compose down after_script: From 3dc184460ad453f823344d281a393b72da1a95bc Mon Sep 17 00:00:00 2001 From: Anirudh Date: Wed, 20 Mar 2019 11:58:22 +0530 Subject: [PATCH 04/17] fixes incorrect test --- test/ziggurat/util/map_test.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/ziggurat/util/map_test.clj b/test/ziggurat/util/map_test.clj index 7b520930..274196cd 100644 --- a/test/ziggurat/util/map_test.clj +++ b/test/ziggurat/util/map_test.clj @@ -25,4 +25,4 @@ (testing "applies a function on a key" (let [a {:something "value"} expected-result {"something" "value"}] - (is (= (nested-map-keys str expected-result)))))) + (is (= (nested-map-keys name a) expected-result))))) From dfa9838062433de57718046c5f45730497d97d36 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Wed, 20 Mar 2019 12:09:27 +0530 Subject: [PATCH 05/17] removes git hooks as they have been deprecated --- project.clj | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/project.clj b/project.clj index 0924f825..e09ea084 100644 --- a/project.clj +++ b/project.clj @@ -55,10 +55,6 @@ :dev {:plugins [[jonase/eastwood "0.2.6"] [lein-cljfmt "0.6.3"] [lein-cloverage "1.0.13"] - [lein-githooks "0.1.0"] - [lein-kibit "0.1.6"]] - :githooks {:auto-install true - :pre-commit ["lein cljfmt check && lein kibit"] - :pre-push ["lein test"]}} + [lein-kibit "0.1.6"]]} :1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]} :1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]}}) From 81ff1ba6e9426601a57c29930a7140964c707554 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Wed, 20 Mar 2019 16:18:38 +0530 Subject: [PATCH 06/17] refactors stream tests --- test/ziggurat/streams_test.clj | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 50a02b06..8b3207b6 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -16,16 +16,30 @@ :stream-threads-count 1 :proto-class "flatland.protobuf.test.Example$Photo"}}}) +(def props (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers])) + (.put ProducerConfig/ACKS_CONFIG "all") + (.put ProducerConfig/RETRIES_CONFIG (int 0)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer") + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer"))) + (def message {:id 7 :path "/photos/h2k3j4h9h23"}) (defn create-photo [] (proto/protobuf-dump proto-log-type message)) +(def message-key-value (KeyValue/pair (create-photo) (create-photo))) + +(defn push-dummy-message-to-create-topic + [] + (IntegrationTestUtils/produceKeyValuesSynchronously "topic" [message-key-value] props (MockTime.))) + (defn mapped-fn [_] :success) (deftest start-streams-with-since-test + (push-dummy-message-to-create-topic) (let [message-received-count (atom 0)] (with-redefs [mapped-fn (fn [message-from-kafka] (when (= message message-from-kafka) @@ -34,13 +48,7 @@ (let [topic "topic" times 6 oldest-processed-message-in-s 10 - kvs (repeat times (KeyValue/pair (create-photo) (create-photo))) - props (doto (Properties.) - (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers])) - (.put ProducerConfig/ACKS_CONFIG "all") - (.put ProducerConfig/RETRIES_CONFIG (int 0)) - (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer") - (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")) + kvs (repeat times message-key-value) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map (assoc-in [:stream-router :vehicle :bootstrap-servers] (get-in config-map [:stream-router :vehicle :bootstrap-servers])) (assoc-in [:stream-router :vehicle :oldest-processed-message-in-s] oldest-processed-message-in-s) @@ -52,6 +60,7 @@ (is (= 0 @message-received-count)))))) (deftest start-streams-test + (push-dummy-message-to-create-topic) (let [message-received-count (atom 0)] (with-redefs [mapped-fn (fn [message-from-kafka] (when (= message message-from-kafka) @@ -59,14 +68,7 @@ :success)] (let [topic "topic" times 6 - kvs (repeat times (KeyValue/pair (create-photo) (create-photo))) - _ (spit "proto" (create-photo)) - props (doto (Properties.) - (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers])) - (.put ProducerConfig/ACKS_CONFIG "all") - (.put ProducerConfig/RETRIES_CONFIG (int 0)) - (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer") - (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")) + kvs (repeat times message-key-value) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map (assoc-in [:stream-router :vehicle :bootstrap-servers] (get-in config-map [:stream-router :vehicle :bootstrap-servers])) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] From 12b2d6e432cfb00bcf28a9e83f5cddb13383ea60 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Thu, 21 Mar 2019 13:55:42 +0530 Subject: [PATCH 07/17] Fixes ci: updates travis.yml to run tests for both kafka 1 and kafka 2 --- .travis.yml | 27 +++++++++++++++++++++++++++ docker-compose.yml | 2 +- test/ziggurat/streams_test.clj | 6 ------ 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9fecb285..66e5e690 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,12 @@ language: clojure + +before_install: +- sudo apt-get install tar + +before_script: +- wget http://mirrors.estointernet.in/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz +- tar -xzf kafka_2.11-2.1.0.tgz + services: - rabbitmq - docker @@ -9,12 +17,31 @@ stages: jobs: include: - stage: test + name: "kafka-1" + env: + - KAFKA_VERSION=1 + script: + - lein clean + - lein deps + - mv -fv resources/config.test.{ci.edn,edn} + - lein cljfmt check + - docker-compose up -d + - sleep 15 + - ./kafka_2.11-2.1.0/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper localhost:2181 + - lein test-all + - docker-compose down + - stage: test + name: "kafka-2" + env: + - KAFKA_VERSION=2 script: - lein clean - lein deps - mv -fv resources/config.test.{ci.edn,edn} - lein cljfmt check - docker-compose up -d + - sleep 15 + - ./kafka_2.11-2.1.0/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper localhost:2181 - lein test-all - docker-compose down after_script: diff --git a/docker-compose.yml b/docker-compose.yml index 573d2641..d8a12520 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,7 @@ services: environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka1: - image: 'bitnami/kafka:latest' + image: 'bitnami/kafka:${KAFKA_VERSION}' ports: - '9092:9092' environment: diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 8b3207b6..ab8a94f1 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -31,15 +31,10 @@ (def message-key-value (KeyValue/pair (create-photo) (create-photo))) -(defn push-dummy-message-to-create-topic - [] - (IntegrationTestUtils/produceKeyValuesSynchronously "topic" [message-key-value] props (MockTime.))) - (defn mapped-fn [_] :success) (deftest start-streams-with-since-test - (push-dummy-message-to-create-topic) (let [message-received-count (atom 0)] (with-redefs [mapped-fn (fn [message-from-kafka] (when (= message message-from-kafka) @@ -60,7 +55,6 @@ (is (= 0 @message-received-count)))))) (deftest start-streams-test - (push-dummy-message-to-create-topic) (let [message-received-count (atom 0)] (with-redefs [mapped-fn (fn [message-from-kafka] (when (= message message-from-kafka) From 4022ab90c18041aa1822b6caff5fd75045cadae2 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Fri, 22 Mar 2019 12:33:32 +0530 Subject: [PATCH 08/17] removes deprecated method punctuate from timestamp transformer --- src/ziggurat/timestamp_transformer.clj | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ziggurat/timestamp_transformer.clj b/src/ziggurat/timestamp_transformer.clj index 2b635cc1..d76eb391 100644 --- a/src/ziggurat/timestamp_transformer.clj +++ b/src/ziggurat/timestamp_transformer.clj @@ -26,7 +26,6 @@ (when (message-to-process? message-time oldest-processed-message-in-s) (calculate-and-report-kafka-delay metric-namespace message-time) (KeyValue/pair record-key record-value)))) - (punctuate [_ _] nil) (close [_] nil)) (defn create [metric-namespace process-message-since-in-s] From ad92ef891a670fab864452145557e65a60c97eb4 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Fri, 22 Mar 2019 12:42:41 +0530 Subject: [PATCH 09/17] updates kafka-streams version to 2.1.0 --- project.clj | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/project.clj b/project.clj index e09ea084..11fa4fc0 100644 --- a/project.clj +++ b/project.clj @@ -17,7 +17,7 @@ [medley "0.8.4"] [mount "0.1.10"] [org.apache.httpcomponents/fluent-hc "4.5.4"] - [org.apache.kafka/kafka-streams "1.1.1" :exclusions [org.slf4j/slf4j-log4j12 log4j]] + [org.apache.kafka/kafka-streams "2.1.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]] [org.apache.logging.log4j/log4j-core "2.7"] [org.apache.logging.log4j/log4j-slf4j-impl "2.7"] [org.clojure/clojure "1.10.0"] @@ -47,9 +47,9 @@ :dependencies [[com.google.protobuf/protobuf-java "3.5.1"] [io.confluent/kafka-schema-registry "4.1.1"] [junit/junit "4.12"] - [org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] - [org.apache.kafka/kafka-clients "1.1.1" :classifier "test"] - [org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]] + [org.apache.kafka/kafka-streams "2.1.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] + [org.apache.kafka/kafka-clients "2.1.0" :classifier "test"] + [org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]] :plugins [[lein-cloverage "1.0.13"]] :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} :dev {:plugins [[jonase/eastwood "0.2.6"] From 9ffdf8d968cfb8ca785eda07e85a057e9f234ce1 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Fri, 22 Mar 2019 15:32:07 +0530 Subject: [PATCH 10/17] Adds upgrade-from config for updating from kafka-streams 1 to 2 --- src/ziggurat/streams.clj | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index fc523af3..4df117da 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -9,6 +9,7 @@ [ziggurat.util.map :as umap] [ziggurat.mapper :as mpr] [ziggurat.timestamp-transformer :as transformer] + [ziggurat.config :refer [config]] [ziggurat.sentry :refer [sentry-reporter]]) (:import [java.util.regex Pattern] [java.util Properties] @@ -26,6 +27,14 @@ :auto-offset-reset-config "latest" :oldest-processed-message-in-s 604800}) +(defn- set-upgrade-from-config + "Populates the upgrade.from config in kafka streams required for upgrading kafka-streams version from 1 to 2. If the + value is non-nil it sets the config (the value validation is done in the kafka streams code), to unset the value the + config needs to be set as nil " + [properties] + (if-let [upgrade-from-config (get-in config [:ziggurat :upgrade-from])] + (.put properties StreamsConfig/UPGRADE_FROM_CONFIG upgrade-from-config))) + (defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms]}] (if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config) (throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))) @@ -38,7 +47,8 @@ (.put StreamsConfig/DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG IngestionTimeExtractor) (.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition)) (.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms) - (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config))) + (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config) + (set-upgrade-from-config))) (defn- get-metric-namespace [default topic] (str (name topic) "." default)) @@ -106,13 +116,13 @@ (start-streams stream-routes (ziggurat-config))) ([stream-routes stream-configs] (reduce (fn [streams stream] - (let [topic-entity (first stream) + (let [topic-entity (first stream) topic-handler-fn (-> stream second :handler-fn) - channels (chl/get-keys-for-topic stream-routes topic-entity) - stream-config (-> stream-configs - (get-in [:stream-router topic-entity]) - (umap/deep-merge default-config-for-stream)) - stream (start-stream* topic-handler-fn stream-config topic-entity channels)] + channels (chl/get-keys-for-topic stream-routes topic-entity) + stream-config (-> stream-configs + (get-in [:stream-router topic-entity]) + (umap/deep-merge default-config-for-stream)) + stream (start-stream* topic-handler-fn stream-config topic-entity channels)] (.start stream) (conj streams stream))) [] From f8683a1ca0db77aaddc328adedb471b816894743 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Fri, 22 Mar 2019 15:37:28 +0530 Subject: [PATCH 11/17] refactors streams.clj, moves offset-reset-config validation out into a function --- src/ziggurat/streams.clj | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 4df117da..55c958f0 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -35,9 +35,13 @@ (if-let [upgrade-from-config (get-in config [:ziggurat :upgrade-from])] (.put properties StreamsConfig/UPGRADE_FROM_CONFIG upgrade-from-config))) -(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms]}] +(defn- validate-auto-offset-reset-config + [auto-offset-reset-config] (if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config) - (throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))) + (throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config})))) + +(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms]}] + (validate-auto-offset-reset-config auto-offset-reset-config) (doto (Properties.) (.put StreamsConfig/APPLICATION_ID_CONFIG application-id) (.put StreamsConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers) From 24d75d8e44de0ef85e9e04cebfb6cdebf5222b0c Mon Sep 17 00:00:00 2001 From: Anirudh Date: Fri, 22 Mar 2019 15:53:42 +0530 Subject: [PATCH 12/17] adds a .env file to set default KAFKA_VERSION for docker-compose --- .env | 1 + 1 file changed, 1 insertion(+) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 00000000..457c5498 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +KAFKA_VERSION=2 \ No newline at end of file From 171d86f63723f21b1374844b17338b93e62958fe Mon Sep 17 00:00:00 2001 From: Anirudh Date: Mon, 25 Mar 2019 14:02:55 +0530 Subject: [PATCH 13/17] refactors ci: creates topic from within docker container instead of pulling kafka code --- .travis.yml | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index 66e5e690..5cef0e6e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,5 @@ language: clojure -before_install: -- sudo apt-get install tar - -before_script: -- wget http://mirrors.estointernet.in/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz -- tar -xzf kafka_2.11-2.1.0.tgz - services: - rabbitmq - docker @@ -27,7 +20,7 @@ jobs: - lein cljfmt check - docker-compose up -d - sleep 15 - - ./kafka_2.11-2.1.0/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper localhost:2181 + - docker exec -it ziggurat_kafka1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper_1 - lein test-all - docker-compose down - stage: test @@ -41,7 +34,7 @@ jobs: - lein cljfmt check - docker-compose up -d - sleep 15 - - ./kafka_2.11-2.1.0/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper localhost:2181 + - docker exec -it ziggurat_kafka1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper_1 - lein test-all - docker-compose down after_script: From 323590a72f0d4dcf0c51ca00999654f0379e04e2 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Mon, 25 Mar 2019 14:41:23 +0530 Subject: [PATCH 14/17] moves logic for running tests in ci out into a script --- .travis.yml | 20 ++------------------ bin/run_tests_in_ci.sh | 13 +++++++++++++ 2 files changed, 15 insertions(+), 18 deletions(-) create mode 100755 bin/run_tests_in_ci.sh diff --git a/.travis.yml b/.travis.yml index 5cef0e6e..b40890cb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,29 +14,13 @@ jobs: env: - KAFKA_VERSION=1 script: - - lein clean - - lein deps - - mv -fv resources/config.test.{ci.edn,edn} - - lein cljfmt check - - docker-compose up -d - - sleep 15 - - docker exec -it ziggurat_kafka1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper_1 - - lein test-all - - docker-compose down + - ./bin/run_tests_in_ci.sh - stage: test name: "kafka-2" env: - KAFKA_VERSION=2 script: - - lein clean - - lein deps - - mv -fv resources/config.test.{ci.edn,edn} - - lein cljfmt check - - docker-compose up -d - - sleep 15 - - docker exec -it ziggurat_kafka1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper_1 - - lein test-all - - docker-compose down + - ./bin/run_tests_in_ci.sh after_script: - lein code-coverage - curl --form 'json_file=@coverage/coveralls.json' "${COVERALLS_URL}" diff --git a/bin/run_tests_in_ci.sh b/bin/run_tests_in_ci.sh new file mode 100755 index 00000000..c57337ec --- /dev/null +++ b/bin/run_tests_in_ci.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -ex + +lein clean +lein deps +mv -fv resources/config.test.{ci.edn,edn} +lein cljfmt check +docker-compose up -d +sleep 15 +docker exec -it ziggurat_kafka1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper_1 +lein test-all +docker-compose down \ No newline at end of file From 523d60b08b4488a9fefadeb8db796e925fcd9802 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Mon, 25 Mar 2019 15:06:05 +0530 Subject: [PATCH 15/17] moves the upgrade-from config to stream-routes --- resources/config.test.ci.edn | 1 + resources/config.test.edn | 1 + src/ziggurat/streams.clj | 8 ++++---- test/ziggurat/streams_test.clj | 3 ++- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index c56c543c..7a830f0c 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -33,6 +33,7 @@ :stream-threads-count [1 :int] :origin-topic "kafka-topic-*" :proto-class "com.company.LogMessage" + :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}}}} \ No newline at end of file diff --git a/resources/config.test.edn b/resources/config.test.edn index 5261129a..482d85d0 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -33,6 +33,7 @@ :stream-threads-count [1 :int] :origin-topic "kafka-topic-*" :proto-class "com.company.LogMessage" + :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}}}} \ No newline at end of file diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 55c958f0..0ffdd463 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -31,8 +31,8 @@ "Populates the upgrade.from config in kafka streams required for upgrading kafka-streams version from 1 to 2. If the value is non-nil it sets the config (the value validation is done in the kafka streams code), to unset the value the config needs to be set as nil " - [properties] - (if-let [upgrade-from-config (get-in config [:ziggurat :upgrade-from])] + [properties upgrade-from-config] + (if (some? upgrade-from-config) (.put properties StreamsConfig/UPGRADE_FROM_CONFIG upgrade-from-config))) (defn- validate-auto-offset-reset-config @@ -40,7 +40,7 @@ (if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config) (throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config})))) -(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms]}] +(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms upgrade-from]}] (validate-auto-offset-reset-config auto-offset-reset-config) (doto (Properties.) (.put StreamsConfig/APPLICATION_ID_CONFIG application-id) @@ -52,7 +52,7 @@ (.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition)) (.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms) (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config) - (set-upgrade-from-config))) + (set-upgrade-from-config upgrade-from))) (defn- get-metric-namespace [default topic] (str (name topic) "." default)) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index ab8a94f1..35ab6a0a 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -14,7 +14,8 @@ (def config-map {:stream-router {:vehicle {:application-id "test" :bootstrap-servers "localhost:9092" :stream-threads-count 1 - :proto-class "flatland.protobuf.test.Example$Photo"}}}) + :proto-class "flatland.protobuf.test.Example$Photo" + :upgrade-from "1.1"}}}) (def props (doto (Properties.) (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers])) From 488abc158bfac9f54a73edfef15cdbc79620567c Mon Sep 17 00:00:00 2001 From: Anirudh Date: Mon, 25 Mar 2019 16:30:36 +0530 Subject: [PATCH 16/17] all integration stream tests now run with different application-ids --- test/ziggurat/streams_test.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 35ab6a0a..3031e10a 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -46,7 +46,7 @@ oldest-processed-message-in-s 10 kvs (repeat times message-key-value) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map - (assoc-in [:stream-router :vehicle :bootstrap-servers] (get-in config-map [:stream-router :vehicle :bootstrap-servers])) + (assoc-in [:stream-router :vehicle :application-id] (str "test" "-" (rand-int 999999999))) (assoc-in [:stream-router :vehicle :oldest-processed-message-in-s] oldest-processed-message-in-s) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] (Thread/sleep 20000) ;;waiting for streams to start @@ -65,7 +65,7 @@ times 6 kvs (repeat times message-key-value) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map - (assoc-in [:stream-router :vehicle :bootstrap-servers] (get-in config-map [:stream-router :vehicle :bootstrap-servers])) + (assoc-in [:stream-router :vehicle :application-id] (str "test" "-" (rand-int 999999999))) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] (Thread/sleep 20000) ;;waiting for streams to start (IntegrationTestUtils/produceKeyValuesSynchronously topic kvs props (MockTime.)) From 197c785ac435d38d7d5ab504a3751539423e236e Mon Sep 17 00:00:00 2001 From: Anirudh Date: Mon, 25 Mar 2019 16:54:15 +0530 Subject: [PATCH 17/17] refactors code and introduces lint stage in ci --- .travis.yml | 5 +++++ bin/run_tests_in_ci.sh | 1 - project.clj | 3 +-- src/ziggurat/timestamp_transformer.clj | 2 +- test/ziggurat/streams_test.clj | 7 +++++-- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index b40890cb..196bfb43 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,11 +4,16 @@ services: - rabbitmq - docker stages: + - lint-check - test - name: deploy if: (repo == gojektech/ziggurat) AND (tag IS present) jobs: include: + - stage: lint-check + script: + - lein cljfmt check + - lein kibit - stage: test name: "kafka-1" env: diff --git a/bin/run_tests_in_ci.sh b/bin/run_tests_in_ci.sh index c57337ec..ec477134 100755 --- a/bin/run_tests_in_ci.sh +++ b/bin/run_tests_in_ci.sh @@ -5,7 +5,6 @@ set -ex lein clean lein deps mv -fv resources/config.test.{ci.edn,edn} -lein cljfmt check docker-compose up -d sleep 15 docker exec -it ziggurat_kafka1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper_1 diff --git a/project.clj b/project.clj index 11fa4fc0..fb1c3a3b 100644 --- a/project.clj +++ b/project.clj @@ -52,8 +52,7 @@ [org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]] :plugins [[lein-cloverage "1.0.13"]] :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} - :dev {:plugins [[jonase/eastwood "0.2.6"] - [lein-cljfmt "0.6.3"] + :dev {:plugins [[lein-cljfmt "0.6.3"] [lein-cloverage "1.0.13"] [lein-kibit "0.1.6"]]} :1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]} diff --git a/src/ziggurat/timestamp_transformer.clj b/src/ziggurat/timestamp_transformer.clj index d76eb391..6863b304 100644 --- a/src/ziggurat/timestamp_transformer.clj +++ b/src/ziggurat/timestamp_transformer.clj @@ -13,7 +13,7 @@ (deftype IngestionTimeExtractor [] TimestampExtractor (extract [_ record _] (let [ingestion-time (get-timestamp-from-record record)] - (if (< ingestion-time 0) + (if (neg? ingestion-time) (get-current-time-in-millis) ingestion-time)))) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 3031e10a..34239ea5 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -35,6 +35,9 @@ (defn mapped-fn [_] :success) +(defn rand-application-id [] + (str "test" "-" (rand-int 999999999))) + (deftest start-streams-with-since-test (let [message-received-count (atom 0)] (with-redefs [mapped-fn (fn [message-from-kafka] @@ -46,7 +49,7 @@ oldest-processed-message-in-s 10 kvs (repeat times message-key-value) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map - (assoc-in [:stream-router :vehicle :application-id] (str "test" "-" (rand-int 999999999))) + (assoc-in [:stream-router :vehicle :application-id] (rand-application-id)) (assoc-in [:stream-router :vehicle :oldest-processed-message-in-s] oldest-processed-message-in-s) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] (Thread/sleep 20000) ;;waiting for streams to start @@ -65,7 +68,7 @@ times 6 kvs (repeat times message-key-value) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map - (assoc-in [:stream-router :vehicle :application-id] (str "test" "-" (rand-int 999999999))) + (assoc-in [:stream-router :vehicle :application-id] (rand-application-id)) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] (Thread/sleep 20000) ;;waiting for streams to start (IntegrationTestUtils/produceKeyValuesSynchronously topic kvs props (MockTime.))