Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Issue #26 - Upgrade to kafka streams 2.0 #27

Merged
merged 25 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
44f3338
Adds a docker-compose.yml and removes embedded kafka from integration…
theanirudhvyas Mar 19, 2019
8b7e0c8
Adds docker-compose commands to travis-ci
theanirudhvyas Mar 19, 2019
7758f58
runs docker-compose on travis in detached mode
theanirudhvyas Mar 19, 2019
b4b4a47
fixes incorrect test
theanirudhvyas Mar 20, 2019
6fdee7d
removes git hooks as they have been deprecated
theanirudhvyas Mar 20, 2019
e4122d4
refactors stream tests
theanirudhvyas Mar 20, 2019
62a2607
Fixes ci: updates travis.yml to run tests for both kafka 1 and kafka 2
theanirudhvyas Mar 21, 2019
d6fba90
removes deprecated method punctuate from timestamp transformer
theanirudhvyas Mar 22, 2019
64677e9
updates kafka-streams version to 2.1.0
theanirudhvyas Mar 22, 2019
a2068b5
Adds upgrade-from config for updating from kafka-streams 1 to 2
theanirudhvyas Mar 22, 2019
0919dfe
refactors streams.clj, moves offset-reset-config validation out into …
theanirudhvyas Mar 22, 2019
6aee372
adds a .env file to set default KAFKA_VERSION for docker-compose
theanirudhvyas Mar 22, 2019
77e704f
refactors ci: creates topic from within docker container instead of p…
theanirudhvyas Mar 25, 2019
4225f94
moves logic for running tests in ci out into a script
theanirudhvyas Mar 25, 2019
fd399f3
moves the upgrade-from config to stream-routes
theanirudhvyas Mar 25, 2019
a7f1e9c
all integration stream tests now run with different application-ids
theanirudhvyas Mar 25, 2019
df77bcb
refactors code and introduces lint stage in ci
theanirudhvyas Mar 25, 2019
5738774
Adds a make-file and changes container name in docker-compose
theanirudhvyas Mar 25, 2019
8c27818
Adds rabbitmq to docker-compose
theanirudhvyas Mar 25, 2019
0b8a79e
removes duplicate import from streams.clj
theanirudhvyas Mar 26, 2019
79baf58
Runs make test-all command in ci
theanirudhvyas Mar 26, 2019
1092b9d
refactors code, stream-tests get values from config.
theanirudhvyas Mar 26, 2019
e8991e0
Updates dev-setup in readme
theanirudhvyas Mar 26, 2019
d68d622
Add changelog-topic-replication to test
May 16, 2019
d0b9472
Add code-coverage stage in CI pipeline
sisinduku May 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
KAFKA_VERSION=2
32 changes: 23 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
language: clojure

services:
- rabbitmq
- docker

stages:
- lint-check
- test
- coverage
- name: deploy
if: (repo == gojek/ziggurat) AND (tag IS present)
jobs:
include:
- stage: test
- stage: lint-check
script:
- lein clean
- lein deps
- mv -fv resources/config.test.{ci.edn,edn}
- lein cljfmt check
- lein test-all
after_script:
- lein code-coverage
- curl --form 'json_file=@coverage/coveralls.json' "${COVERALLS_URL}"
- lein kibit
- stage: test
name: "kafka-1"
env:
- KAFKA_VERSION=1
script:
- ./bin/run_tests_in_ci.sh
- stage: test
name: "kafka-2"
env:
- KAFKA_VERSION=2
script:
- ./bin/run_tests_in_ci.sh

- stage: coverage
script:
- make coverage

- stage: deploy
script: lein deploy clojars
26 changes: 26 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
.PHONY: all
all: test-all test

topic="topic"

setup:
docker-compose down
lein deps
docker-compose up -d
sleep 10
docker exec -it ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper

test-all: setup
ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test-all
docker-compose down

test: setup
ZIGGURAT_STREAM_ROUTER_DEFAULT_ORIGIN_TOPIC=$(topic) lein test
docker-compose down

coverage: setup
lein code-coverage
ifdef COVERALLS_URL
curl --form 'json_file=@coverage/coveralls.json' "$(COVERALLS_URL)"
endif
docker-compose down
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ Refer [concepts](doc/CONCEPTS.md) to understand the concepts referred to in this
## Dev Setup
(For mac users only)

- Install leiningen: ```brew install leiningen```
- Install Clojure: ```brew install clojure```

- Install Rabbitmq: ```brew install rabbitmq```
- Install leiningen: ```brew install leiningen```

- Start Rabbitmq: ```brew services run rabbitmq``` : ensure the default user-id and password for rabbitmq is `guest`
- Run docker-compose: ```docker-compose up```. This starts
- Kafka on localhost:9092
- ZooKeeper on localhost:2181
- RabbitMQ on localhost:5672

- Run tests: ```lein test```
- Run tests: ```make test```


## Usage
Expand Down
7 changes: 7 additions & 0 deletions bin/run_tests_in_ci.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

set -ex

lein clean
mv -fv resources/config.test.{ci.edn,edn}
make test-all
24 changes: 24 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: '3.3'

services:
rabbitmq:
image: 'rabbitmq'
ports:
- '5672:5672'
container_name: 'ziggurat_rabbitmq'
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
container_name: 'ziggurat_zookeeper'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:${KAFKA_VERSION}'
ports:
- '9092:9092'
container_name: 'ziggurat_kafka'
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
17 changes: 6 additions & 11 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
[medley "0.8.4"]
[mount "0.1.10"]
[org.apache.httpcomponents/fluent-hc "4.5.4"]
[org.apache.kafka/kafka-streams "1.1.1" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-streams "2.1.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.logging.log4j/log4j-core "2.7"]
[org.apache.logging.log4j/log4j-slf4j-impl "2.7"]
[org.clojure/clojure "1.10.0"]
Expand Down Expand Up @@ -47,18 +47,13 @@
:dependencies [[com.google.protobuf/protobuf-java "3.5.1"]
[io.confluent/kafka-schema-registry "4.1.1"]
[junit/junit "4.12"]
[org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "1.1.1" :classifier "test"]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]]
[org.apache.kafka/kafka-streams "2.1.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.1.0" :classifier "test"]
[org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]]
:plugins [[lein-cloverage "1.0.13"]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[jonase/eastwood "0.2.6"]
[lein-cljfmt "0.6.3"]
:dev {:plugins [[lein-cljfmt "0.6.3"]
[lein-cloverage "1.0.13"]
[lein-githooks "0.1.0"]
[lein-kibit "0.1.6"]]
:githooks {:auto-install true
:pre-commit ["lein cljfmt check && lein kibit"]
:pre-push ["lein test"]}}
[lein-kibit "0.1.6"]]}
:1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]}
:1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]}})
7 changes: 4 additions & 3 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
7 changes: 4 additions & 3 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
38 changes: 29 additions & 9 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,28 @@
:oldest-processed-message-in-s 604800
:changelog-topic-replication-factor 3})

(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms changelog-topic-replication-factor]}]
(defn- set-upgrade-from-config
theanirudhvyas marked this conversation as resolved.
Show resolved Hide resolved
"Populates the upgrade.from config in kafka streams required for upgrading kafka-streams version from 1 to 2. If the
value is non-nil it sets the config (the value validation is done in the kafka streams code), to unset the value the
config needs to be set as nil "
[properties upgrade-from-config]
(if (some? upgrade-from-config)
(.put properties StreamsConfig/UPGRADE_FROM_CONFIG upgrade-from-config)))
theanirudhvyas marked this conversation as resolved.
Show resolved Hide resolved

(defn- validate-auto-offset-reset-config
[auto-offset-reset-config]
(if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config)
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config})))
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))))

(defn- properties [{:keys [application-id
bootstrap-servers
stream-threads-count
auto-offset-reset-config
buffered-records-per-partition
commit-interval-ms
upgrade-from
changelog-topic-replication-factor]}]
(validate-auto-offset-reset-config auto-offset-reset-config)
(doto (Properties.)
(.put StreamsConfig/APPLICATION_ID_CONFIG application-id)
(.put StreamsConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers)
Expand All @@ -40,7 +59,8 @@
(.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition))
(.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms)
(.put StreamsConfig/REPLICATION_FACTOR_CONFIG (int changelog-topic-replication-factor))
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)))
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)
(set-upgrade-from-config upgrade-from)))

(defn- get-metric-namespace [default topic]
(str (name topic) "." default))
Expand Down Expand Up @@ -108,13 +128,13 @@
(start-streams stream-routes (ziggurat-config)))
([stream-routes stream-configs]
(reduce (fn [streams stream]
(let [topic-entity (first stream)
(let [topic-entity (first stream)
topic-handler-fn (-> stream second :handler-fn)
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
(.start stream)
(conj streams stream)))
[]
Expand Down
1 change: 0 additions & 1 deletion src/ziggurat/timestamp_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
(when (message-to-process? message-time oldest-processed-message-in-s)
(calculate-and-report-kafka-delay metric-namespace message-time)
(KeyValue/pair record-key record-value))))
(punctuate [_ _] nil)
(close [_] nil))

(defn create [metric-namespace process-message-since-in-s]
Expand Down
Loading