Skip to content

Commit

Permalink
Fixes Issue #26 - Upgrade to kafka streams 2.0 (#27)
Browse files Browse the repository at this point in the history
* Adds a docker-compose.yml and removes embedded kafka from integration tests

* Adds docker-compose commands to travis-ci

* runs docker-compose on travis in detached mode

* fixes incorrect test

* removes git hooks as they have been deprecated

* refactors stream tests

* Fixes ci: updates travis.yml to run tests for both kafka 1 and kafka 2

* removes deprecated method punctuate from timestamp transformer

* updates kafka-streams version to 2.1.0

* Adds upgrade-from config for updating from kafka-streams 1 to 2

* refactors streams.clj, moves offset-reset-config validation out into a function

* adds a .env file to set default KAFKA_VERSION for docker-compose

* refactors ci: creates topic from within docker container instead of pulling kafka code

* moves logic for running tests in ci out into a script

* moves the upgrade-from config to stream-routes

* all integration stream tests now run with different application-ids

* refactors code and introduces lint stage in ci

* Adds a make-file and changes container name in docker-compose

* Adds rabbitmq to docker-compose

* removes duplicate import from streams.clj

* Runs make test-all command in ci

* refactors code, stream-tests get values from config.

* Updates dev-setup in readme

* Add changelog-topic-replication to test

* Add code-coverage stage in CI pipeline
  • Loading branch information
theanirudhvyas authored and kartik7153 committed May 17, 2019
1 parent a4ff017 commit 461666e
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 86 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
KAFKA_VERSION=2
32 changes: 23 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
language: clojure

services:
- rabbitmq
- docker

stages:
- lint-check
- test
- coverage
- name: deploy
if: (repo == gojek/ziggurat) AND (tag IS present)
jobs:
include:
- stage: test
- stage: lint-check
script:
- lein clean
- lein deps
- mv -fv resources/config.test.{ci.edn,edn}
- lein cljfmt check
- lein test-all
after_script:
- lein code-coverage
- curl --form 'json_file=@coverage/coveralls.json' "${COVERALLS_URL}"
- lein kibit
- stage: test
name: "kafka-1"
env:
- KAFKA_VERSION=1
script:
- ./bin/run_tests_in_ci.sh
- stage: test
name: "kafka-2"
env:
- KAFKA_VERSION=2
script:
- ./bin/run_tests_in_ci.sh

- stage: coverage
script:
- make coverage

- stage: deploy
script: lein deploy clojars
26 changes: 26 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
.PHONY: all
all: test-all test

topic="topic"

setup:
docker-compose down
lein deps
docker-compose up -d
sleep 10
docker exec -it ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper

test-all: setup
ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test-all
docker-compose down

test: setup
ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test
docker-compose down

coverage: setup
lein code-coverage
ifdef COVERALLS_URL
curl --form 'json_file=@coverage/coveralls.json' "$(COVERALLS_URL)"
endif
docker-compose down
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ Refer [concepts](doc/CONCEPTS.md) to understand the concepts referred to in this
## Dev Setup
(For mac users only)

- Install leiningen: ```brew install leiningen```
- Install Clojure: ```brew install clojure```

- Install Rabbitmq: ```brew install rabbitmq```
- Install leiningen: ```brew install leiningen```

- Start Rabbitmq: ```brew services run rabbitmq``` : ensure the default user-id and password for rabbitmq is `guest`
- Run docker-compose: ```docker-compose up```. This starts
- Kafka on localhost:9092
- ZooKeeper on localhost:2181
- RabbitMQ on localhost:5672

- Run tests: ```lein test```
- Run tests: ```make test```


## Usage
Expand Down
7 changes: 7 additions & 0 deletions bin/run_tests_in_ci.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

set -ex

lein clean
mv -fv resources/config.test.{ci.edn,edn}
make test-all
24 changes: 24 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: '3.3'

services:
rabbitmq:
image: 'rabbitmq'
ports:
- '5672:5672'
container_name: 'ziggurat_rabbitmq'
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
container_name: 'ziggurat_zookeeper'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:${KAFKA_VERSION}'
ports:
- '9092:9092'
container_name: 'ziggurat_kafka'
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
17 changes: 6 additions & 11 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
[medley "0.8.4"]
[mount "0.1.10"]
[org.apache.httpcomponents/fluent-hc "4.5.4"]
[org.apache.kafka/kafka-streams "1.1.1" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-streams "2.1.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.logging.log4j/log4j-core "2.7"]
[org.apache.logging.log4j/log4j-slf4j-impl "2.7"]
[org.clojure/clojure "1.10.0"]
Expand Down Expand Up @@ -47,18 +47,13 @@
:dependencies [[com.google.protobuf/protobuf-java "3.5.1"]
[io.confluent/kafka-schema-registry "4.1.1"]
[junit/junit "4.12"]
[org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "1.1.1" :classifier "test"]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]]
[org.apache.kafka/kafka-streams "2.1.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.1.0" :classifier "test"]
[org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]]
:plugins [[lein-cloverage "1.0.13"]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[jonase/eastwood "0.2.6"]
[lein-cljfmt "0.6.3"]
:dev {:plugins [[lein-cljfmt "0.6.3"]
[lein-cloverage "1.0.13"]
[lein-githooks "0.1.0"]
[lein-kibit "0.1.6"]]
:githooks {:auto-install true
:pre-commit ["lein cljfmt check && lein kibit"]
:pre-push ["lein test"]}}
[lein-kibit "0.1.6"]]}
:1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]}
:1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]}})
7 changes: 4 additions & 3 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
7 changes: 4 additions & 3 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
38 changes: 29 additions & 9 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,28 @@
:oldest-processed-message-in-s 604800
:changelog-topic-replication-factor 3})

(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms changelog-topic-replication-factor]}]
(defn- set-upgrade-from-config
"Populates the upgrade.from config in kafka streams required for upgrading kafka-streams version from 1 to 2. If the
value is non-nil it sets the config (the value validation is done in the kafka streams code), to unset the value the
config needs to be set as nil "
[properties upgrade-from-config]
(if (some? upgrade-from-config)
(.put properties StreamsConfig/UPGRADE_FROM_CONFIG upgrade-from-config)))

(defn- validate-auto-offset-reset-config
[auto-offset-reset-config]
(if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config)
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config})))
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))))

(defn- properties [{:keys [application-id
bootstrap-servers
stream-threads-count
auto-offset-reset-config
buffered-records-per-partition
commit-interval-ms
upgrade-from
changelog-topic-replication-factor]}]
(validate-auto-offset-reset-config auto-offset-reset-config)
(doto (Properties.)
(.put StreamsConfig/APPLICATION_ID_CONFIG application-id)
(.put StreamsConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers)
Expand All @@ -40,7 +59,8 @@
(.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition))
(.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms)
(.put StreamsConfig/REPLICATION_FACTOR_CONFIG (int changelog-topic-replication-factor))
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)))
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)
(set-upgrade-from-config upgrade-from)))

(defn- get-metric-namespace [default topic]
(str (name topic) "." default))
Expand Down Expand Up @@ -108,13 +128,13 @@
(start-streams stream-routes (ziggurat-config)))
([stream-routes stream-configs]
(reduce (fn [streams stream]
(let [topic-entity (first stream)
(let [topic-entity (first stream)
topic-handler-fn (-> stream second :handler-fn)
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
(.start stream)
(conj streams stream)))
[]
Expand Down
1 change: 0 additions & 1 deletion src/ziggurat/timestamp_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
(when (message-to-process? message-time oldest-processed-message-in-s)
(calculate-and-report-kafka-delay metric-namespace message-time)
(KeyValue/pair record-key record-value))))
(punctuate [_ _] nil)
(close [_] nil))

(defn create [metric-namespace process-message-since-in-s]
Expand Down
Loading

0 comments on commit 461666e

Please sign in to comment.