Skip to content

Commit

Permalink
Merge 197c785 into 22619da
Browse files Browse the repository at this point in the history
  • Loading branch information
theanirudhvyas committed Mar 25, 2019
2 parents 22619da + 197c785 commit 2197874
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 51 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
KAFKA_VERSION=2
22 changes: 17 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
language: clojure

services:
- rabbitmq
- docker
stages:
- lint-check
- test
- name: deploy
if: (repo == gojektech/ziggurat) AND (tag IS present)
jobs:
include:
- stage: test
- stage: lint-check
script:
- lein clean
- lein deps
- mv -fv resources/config.test.{ci.edn,edn}
- lein cljfmt check
- lein test-all
- lein kibit
- stage: test
name: "kafka-1"
env:
- KAFKA_VERSION=1
script:
- ./bin/run_tests_in_ci.sh
- stage: test
name: "kafka-2"
env:
- KAFKA_VERSION=2
script:
- ./bin/run_tests_in_ci.sh
after_script:
- lein code-coverage
- curl --form 'json_file=@coverage/coveralls.json' "${COVERALLS_URL}"
Expand Down
12 changes: 12 additions & 0 deletions bin/run_tests_in_ci.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env bash

set -ex

lein clean
lein deps
mv -fv resources/config.test.{ci.edn,edn}
docker-compose up -d
sleep 15
docker exec -it ziggurat_kafka1_1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic topic --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper_1
lein test-all
docker-compose down
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: '3.3'

services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka1:
image: 'bitnami/kafka:${KAFKA_VERSION}'
ports:
- '9092:9092'
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
17 changes: 6 additions & 11 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
[medley "0.8.4"]
[mount "0.1.10"]
[org.apache.httpcomponents/fluent-hc "4.5.4"]
[org.apache.kafka/kafka-streams "1.1.1" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-streams "2.1.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.logging.log4j/log4j-core "2.7"]
[org.apache.logging.log4j/log4j-slf4j-impl "2.7"]
[org.clojure/clojure "1.10.0"]
Expand Down Expand Up @@ -47,18 +47,13 @@
:dependencies [[com.google.protobuf/protobuf-java "3.5.1"]
[io.confluent/kafka-schema-registry "4.1.1"]
[junit/junit "4.12"]
[org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "1.1.1" :classifier "test"]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]]
[org.apache.kafka/kafka-streams "2.1.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.1.0" :classifier "test"]
[org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]]
:plugins [[lein-cloverage "1.0.13"]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[jonase/eastwood "0.2.6"]
[lein-cljfmt "0.6.3"]
:dev {:plugins [[lein-cljfmt "0.6.3"]
[lein-cloverage "1.0.13"]
[lein-githooks "0.1.0"]
[lein-kibit "0.1.6"]]
:githooks {:auto-install true
:pre-commit ["lein cljfmt check && lein kibit"]
:pre-push ["lein test"]}}
[lein-kibit "0.1.6"]]}
:1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]}
:1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]}})
1 change: 1 addition & 0 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
1 change: 1 addition & 0 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
32 changes: 23 additions & 9 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[ziggurat.util.map :as umap]
[ziggurat.mapper :as mpr]
[ziggurat.timestamp-transformer :as transformer]
[ziggurat.config :refer [config]]
[ziggurat.sentry :refer [sentry-reporter]])
(:import [java.util.regex Pattern]
[java.util Properties]
Expand All @@ -26,9 +27,21 @@
:auto-offset-reset-config "latest"
:oldest-processed-message-in-s 604800})

(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms]}]
(defn- set-upgrade-from-config
"Populates the upgrade.from config in kafka streams required for upgrading kafka-streams version from 1 to 2. If the
value is non-nil it sets the config (the value validation is done in the kafka streams code), to unset the value the
config needs to be set as nil "
[properties upgrade-from-config]
(if (some? upgrade-from-config)
(.put properties StreamsConfig/UPGRADE_FROM_CONFIG upgrade-from-config)))

(defn- validate-auto-offset-reset-config
[auto-offset-reset-config]
(if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config)
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config})))
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))))

(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms upgrade-from]}]
(validate-auto-offset-reset-config auto-offset-reset-config)
(doto (Properties.)
(.put StreamsConfig/APPLICATION_ID_CONFIG application-id)
(.put StreamsConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers)
Expand All @@ -38,7 +51,8 @@
(.put StreamsConfig/DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG IngestionTimeExtractor)
(.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition))
(.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms)
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)))
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)
(set-upgrade-from-config upgrade-from)))

(defn- get-metric-namespace [default topic]
(str (name topic) "." default))
Expand Down Expand Up @@ -106,13 +120,13 @@
(start-streams stream-routes (ziggurat-config)))
([stream-routes stream-configs]
(reduce (fn [streams stream]
(let [topic-entity (first stream)
(let [topic-entity (first stream)
topic-handler-fn (-> stream second :handler-fn)
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
(.start stream)
(conj streams stream)))
[]
Expand Down
3 changes: 1 addition & 2 deletions src/ziggurat/timestamp_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
(deftype IngestionTimeExtractor [] TimestampExtractor
(extract [_ record _]
(let [ingestion-time (get-timestamp-from-record record)]
(if (< ingestion-time 0)
(if (neg? ingestion-time)
(get-current-time-in-millis)
ingestion-time))))

Expand All @@ -26,7 +26,6 @@
(when (message-to-process? message-time oldest-processed-message-in-s)
(calculate-and-report-kafka-delay metric-namespace message-time)
(KeyValue/pair record-key record-value))))
(punctuate [_ _] nil)
(close [_] nil))

(defn create [metric-namespace process-message-since-in-s]
Expand Down
41 changes: 18 additions & 23 deletions test/ziggurat/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,42 @@
(def config-map {:stream-router {:vehicle {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count 1
:proto-class "flatland.protobuf.test.Example$Photo"}}})
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"}}})

(def props (doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers]))
(.put ProducerConfig/ACKS_CONFIG "all")
(.put ProducerConfig/RETRIES_CONFIG (int 0))
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")))

(def message {:id 7
:path "/photos/h2k3j4h9h23"})

(defn create-photo []
(proto/protobuf-dump proto-log-type message))

(def message-key-value (KeyValue/pair (create-photo) (create-photo)))

(defn mapped-fn [_]
:success)

(defn rand-application-id []
(str "test" "-" (rand-int 999999999)))

(deftest start-streams-with-since-test
(let [message-received-count (atom 0)]
(with-redefs [mapped-fn (fn [message-from-kafka]
(when (= message message-from-kafka)
(swap! message-received-count inc))
:success)]
(let [topic "topic"
cluster (doto (EmbeddedKafkaCluster. 1) (.start))
bootstrap-serves (.bootstrapServers cluster)
times 6
oldest-processed-message-in-s 10
kvs (repeat times (KeyValue/pair (create-photo) (create-photo)))
props (doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-serves)
(.put ProducerConfig/ACKS_CONFIG "all")
(.put ProducerConfig/RETRIES_CONFIG (int 0))
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer"))
_ (.createTopic cluster topic)
kvs (repeat times message-key-value)
streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map
(assoc-in [:stream-router :vehicle :bootstrap-servers] bootstrap-serves)
(assoc-in [:stream-router :vehicle :application-id] (rand-application-id))
(assoc-in [:stream-router :vehicle :oldest-processed-message-in-s] oldest-processed-message-in-s)
(assoc-in [:stream-router :vehicle :origin-topic] topic)))]
(Thread/sleep 20000) ;;waiting for streams to start
Expand All @@ -61,19 +65,10 @@
(swap! message-received-count inc))
:success)]
(let [topic "topic"
cluster (doto (EmbeddedKafkaCluster. 1) (.start))
bootstrap-serves (.bootstrapServers cluster)
times 6
kvs (repeat times (KeyValue/pair (create-photo) (create-photo)))
props (doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-serves)
(.put ProducerConfig/ACKS_CONFIG "all")
(.put ProducerConfig/RETRIES_CONFIG (int 0))
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer"))
_ (.createTopic cluster topic)
kvs (repeat times message-key-value)
streams (start-streams {:vehicle {:handler-fn mapped-fn}} (-> config-map
(assoc-in [:stream-router :vehicle :bootstrap-servers] bootstrap-serves)
(assoc-in [:stream-router :vehicle :application-id] (rand-application-id))
(assoc-in [:stream-router :vehicle :origin-topic] topic)))]
(Thread/sleep 20000) ;;waiting for streams to start
(IntegrationTestUtils/produceKeyValuesSynchronously topic kvs props (MockTime.))
Expand Down
2 changes: 1 addition & 1 deletion test/ziggurat/util/map_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
(testing "applies a function on a key"
(let [a {:something "value"}
expected-result {"something" "value"}]
(is (= (nested-map-keys str expected-result))))))
(is (= (nested-map-keys name a) expected-result)))))

0 comments on commit 2197874

Please sign in to comment.