diff --git a/.env b/.env new file mode 100644 index 00000000..457c5498 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +KAFKA_VERSION=2 \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index ca860a28..196bfb43 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,19 +1,31 @@ language: clojure + services: - rabbitmq +- docker stages: + - lint-check - test - name: deploy if: (repo == gojektech/ziggurat) AND (tag IS present) jobs: include: - - stage: test + - stage: lint-check script: - - lein clean - - lein deps - - mv -fv resources/config.test.{ci.edn,edn} - lein cljfmt check - - lein test-all + - lein kibit + - stage: test + name: "kafka-1" + env: + - KAFKA_VERSION=1 + script: + - ./bin/run_tests_in_ci.sh + - stage: test + name: "kafka-2" + env: + - KAFKA_VERSION=2 + script: + - ./bin/run_tests_in_ci.sh after_script: - lein code-coverage - curl --form 'json_file=@coverage/coveralls.json' "${COVERALLS_URL}" diff --git a/bin/run_tests_in_ci.sh b/bin/run_tests_in_ci.sh new file mode 100755 index 00000000..ec477134 --- /dev/null +++ b/bin/run_tests_in_ci.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -ex + +lein clean +lein deps +mv -fv resources/config.test.{ci.edn,edn} +docker-compose up -d +sleep 15 +docker exec -it ziggurat_kafka1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper_1 +lein test-all +docker-compose down \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..d8a12520 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,17 @@ +version: '3.3' + +services: + zookeeper: + image: 'bitnami/zookeeper:latest' + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka1: + image: 'bitnami/kafka:${KAFKA_VERSION}' + ports: + - '9092:9092' + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 + - ALLOW_PLAINTEXT_LISTENER=yes \ No newline at end of file diff --git a/project.clj b/project.clj index 0924f825..fb1c3a3b 100644 --- a/project.clj +++ b/project.clj @@ -17,7 +17,7 @@ [medley "0.8.4"] [mount "0.1.10"] [org.apache.httpcomponents/fluent-hc "4.5.4"] - [org.apache.kafka/kafka-streams "1.1.1" :exclusions [org.slf4j/slf4j-log4j12 log4j]] + [org.apache.kafka/kafka-streams "2.1.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]] [org.apache.logging.log4j/log4j-core "2.7"] [org.apache.logging.log4j/log4j-slf4j-impl "2.7"] [org.clojure/clojure "1.10.0"] @@ -47,18 +47,13 @@ :dependencies [[com.google.protobuf/protobuf-java "3.5.1"] [io.confluent/kafka-schema-registry "4.1.1"] [junit/junit "4.12"] - [org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] - [org.apache.kafka/kafka-clients "1.1.1" :classifier "test"] - [org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]] + [org.apache.kafka/kafka-streams "2.1.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] + [org.apache.kafka/kafka-clients "2.1.0" :classifier "test"] + [org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]] :plugins [[lein-cloverage "1.0.13"]] :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} - :dev {:plugins [[jonase/eastwood "0.2.6"] - [lein-cljfmt "0.6.3"] + :dev {:plugins [[lein-cljfmt "0.6.3"] [lein-cloverage "1.0.13"] - [lein-githooks "0.1.0"] - [lein-kibit "0.1.6"]] - :githooks {:auto-install true - :pre-commit ["lein cljfmt check && lein kibit"] - :pre-push ["lein test"]}} + [lein-kibit "0.1.6"]]} :1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]} :1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]}}) diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index c56c543c..7a830f0c 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -33,6 +33,7 @@ :stream-threads-count [1 :int] :origin-topic "kafka-topic-*" :proto-class "com.company.LogMessage" + :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}}}} \ No newline at end of file diff --git a/resources/config.test.edn b/resources/config.test.edn index 5261129a..482d85d0 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -33,6 +33,7 @@ :stream-threads-count [1 :int] :origin-topic "kafka-topic-*" :proto-class "com.company.LogMessage" + :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}}}} \ No newline at end of file diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index fc523af3..0ffdd463 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -9,6 +9,7 @@ [ziggurat.util.map :as umap] [ziggurat.mapper :as mpr] [ziggurat.timestamp-transformer :as transformer] + [ziggurat.config :refer [config]] [ziggurat.sentry :refer [sentry-reporter]]) (:import [java.util.regex Pattern] [java.util Properties] @@ -26,9 +27,21 @@ :auto-offset-reset-config "latest" :oldest-processed-message-in-s 604800}) -(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms]}] +(defn- set-upgrade-from-config + "Populates the upgrade.from config in kafka streams required for upgrading kafka-streams version from 1 to 2. If the + value is non-nil it sets the config (the value validation is done in the kafka streams code), to unset the value the + config needs to be set as nil " + [properties upgrade-from-config] + (if (some? upgrade-from-config) + (.put properties StreamsConfig/UPGRADE_FROM_CONFIG upgrade-from-config))) + +(defn- validate-auto-offset-reset-config + [auto-offset-reset-config] (if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config) - (throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))) + (throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config})))) + +(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms upgrade-from]}] + (validate-auto-offset-reset-config auto-offset-reset-config) (doto (Properties.) (.put StreamsConfig/APPLICATION_ID_CONFIG application-id) (.put StreamsConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers) @@ -38,7 +51,8 @@ (.put StreamsConfig/DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG IngestionTimeExtractor) (.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition)) (.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms) - (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config))) + (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config) + (set-upgrade-from-config upgrade-from))) (defn- get-metric-namespace [default topic] (str (name topic) "." default)) @@ -106,13 +120,13 @@ (start-streams stream-routes (ziggurat-config))) ([stream-routes stream-configs] (reduce (fn [streams stream] - (let [topic-entity (first stream) + (let [topic-entity (first stream) topic-handler-fn (-> stream second :handler-fn) - channels (chl/get-keys-for-topic stream-routes topic-entity) - stream-config (-> stream-configs - (get-in [:stream-router topic-entity]) - (umap/deep-merge default-config-for-stream)) - stream (start-stream* topic-handler-fn stream-config topic-entity channels)] + channels (chl/get-keys-for-topic stream-routes topic-entity) + stream-config (-> stream-configs + (get-in [:stream-router topic-entity]) + (umap/deep-merge default-config-for-stream)) + stream (start-stream* topic-handler-fn stream-config topic-entity channels)] (.start stream) (conj streams stream))) [] diff --git a/src/ziggurat/timestamp_transformer.clj b/src/ziggurat/timestamp_transformer.clj index 2b635cc1..6863b304 100644 --- a/src/ziggurat/timestamp_transformer.clj +++ b/src/ziggurat/timestamp_transformer.clj @@ -13,7 +13,7 @@ (deftype IngestionTimeExtractor [] TimestampExtractor (extract [_ record _] (let [ingestion-time (get-timestamp-from-record record)] - (if (< ingestion-time 0) + (if (neg? ingestion-time) (get-current-time-in-millis) ingestion-time)))) @@ -26,7 +26,6 @@ (when (message-to-process? message-time oldest-processed-message-in-s) (calculate-and-report-kafka-delay metric-namespace message-time) (KeyValue/pair record-key record-value)))) - (punctuate [_ _] nil) (close [_] nil)) (defn create [metric-namespace process-message-since-in-s] diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 6987a66b..34239ea5 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -14,7 +14,15 @@ (def config-map {:stream-router {:vehicle {:application-id "test" :bootstrap-servers "localhost:9092" :stream-threads-count 1 - :proto-class "flatland.protobuf.test.Example$Photo"}}}) + :proto-class "flatland.protobuf.test.Example$Photo" + :upgrade-from "1.1"}}}) + +(def props (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers])) + (.put ProducerConfig/ACKS_CONFIG "all") + (.put ProducerConfig/RETRIES_CONFIG (int 0)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer") + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer"))) (def message {:id 7 :path "/photos/h2k3j4h9h23"}) @@ -22,9 +30,14 @@ (defn create-photo [] (proto/protobuf-dump proto-log-type message)) +(def message-key-value (KeyValue/pair (create-photo) (create-photo))) + (defn mapped-fn [_] :success) +(defn rand-application-id [] + (str "test" "-" (rand-int 999999999))) + (deftest start-streams-with-since-test (let [message-received-count (atom 0)] (with-redefs [mapped-fn (fn [message-from-kafka] @@ -32,20 +45,11 @@ (swap! message-received-count inc)) :success)] (let [topic "topic" - cluster (doto (EmbeddedKafkaCluster. 1) (.start)) - bootstrap-serves (.bootstrapServers cluster) times 6 oldest-processed-message-in-s 10 - kvs (repeat times (KeyValue/pair (create-photo) (create-photo))) - props (doto (Properties.) - (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-serves) - (.put ProducerConfig/ACKS_CONFIG "all") - (.put ProducerConfig/RETRIES_CONFIG (int 0)) - (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer") - (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")) - _ (.createTopic cluster topic) + kvs (repeat times message-key-value) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map - (assoc-in [:stream-router :vehicle :bootstrap-servers] bootstrap-serves) + (assoc-in [:stream-router :vehicle :application-id] (rand-application-id)) (assoc-in [:stream-router :vehicle :oldest-processed-message-in-s] oldest-processed-message-in-s) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] (Thread/sleep 20000) ;;waiting for streams to start @@ -61,19 +65,10 @@ (swap! message-received-count inc)) :success)] (let [topic "topic" - cluster (doto (EmbeddedKafkaCluster. 1) (.start)) - bootstrap-serves (.bootstrapServers cluster) times 6 - kvs (repeat times (KeyValue/pair (create-photo) (create-photo))) - props (doto (Properties.) - (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-serves) - (.put ProducerConfig/ACKS_CONFIG "all") - (.put ProducerConfig/RETRIES_CONFIG (int 0)) - (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer") - (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")) - _ (.createTopic cluster topic) + kvs (repeat times message-key-value) streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map - (assoc-in [:stream-router :vehicle :bootstrap-servers] bootstrap-serves) + (assoc-in [:stream-router :vehicle :application-id] (rand-application-id)) (assoc-in [:stream-router :vehicle :origin-topic] topic)))] (Thread/sleep 20000) ;;waiting for streams to start (IntegrationTestUtils/produceKeyValuesSynchronously topic kvs props (MockTime.)) diff --git a/test/ziggurat/util/map_test.clj b/test/ziggurat/util/map_test.clj index 7b520930..274196cd 100644 --- a/test/ziggurat/util/map_test.clj +++ b/test/ziggurat/util/map_test.clj @@ -25,4 +25,4 @@ (testing "applies a function on a key" (let [a {:something "value"} expected-result {"something" "value"}] - (is (= (nested-map-keys str expected-result)))))) + (is (= (nested-map-keys name a) expected-result)))))