Skip to content

Commit

Permalink
Add changelog-topic-replication to test
Browse files Browse the repository at this point in the history
Co-authored-by: Kartik <rulz.gupta@gmail.com>
  • Loading branch information
Saptanto Sindu and kartik7153 committed May 16, 2019
1 parent e8991e0 commit d68d622
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ all: test-all test
topic="topic"

setup:
docker-compose down
lein deps
docker-compose up -d
sleep 10
Expand Down
9 changes: 8 additions & 1 deletion src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,14 @@
(if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config)
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))))

(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms upgrade-from]}]
(defn- properties [{:keys [application-id
bootstrap-servers
stream-threads-count
auto-offset-reset-config
buffered-records-per-partition
commit-interval-ms
upgrade-from
changelog-topic-replication-factor]}]
(validate-auto-offset-reset-config auto-offset-reset-config)
(doto (Properties.)
(.put StreamsConfig/APPLICATION_ID_CONFIG application-id)
Expand Down
11 changes: 8 additions & 3 deletions test/ziggurat/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
[java.util Properties]
[kafka.utils MockTime]
[org.apache.kafka.clients.producer ProducerConfig]
[org.apache.kafka.streams KeyValue]))
[org.apache.kafka.streams KeyValue]
[org.apache.kafka.streams.integration.utils IntegrationTestUtils]))

(use-fixtures :once fix/mount-only-config)

Expand Down Expand Up @@ -42,11 +43,13 @@
:success)]
(let [times 6
oldest-processed-message-in-s 10
changelog-topic-replication-factor 1
kvs (repeat times message-key-value)
streams (start-streams {:default {:handler-fn mapped-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :application-id] (rand-application-id))
(assoc-in [:stream-router :default :oldest-processed-message-in-s] oldest-processed-message-in-s)))]
(assoc-in [:stream-router :default :oldest-processed-message-in-s] oldest-processed-message-in-s)
(assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))]
(Thread/sleep 10000) ;;waiting for streams to start
(IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic])
kvs
Expand All @@ -63,10 +66,12 @@
(swap! message-received-count inc))
:success)]
(let [times 6
changelog-topic-replication-factor 1
kvs (repeat times message-key-value)
streams (start-streams {:default {:handler-fn mapped-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :application-id] (rand-application-id))))]
(assoc-in [:stream-router :default :application-id] (rand-application-id))
(assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))]
(Thread/sleep 10000) ;;waiting for streams to start
(IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic])
kvs
Expand Down

0 comments on commit d68d622

Please sign in to comment.