Skip to content

Commit

Permalink
moves the upgrade-from config to stream-routes
Browse files Browse the repository at this point in the history
  • Loading branch information
theanirudhvyas committed Mar 25, 2019
1 parent 323590a commit 523d60b
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 5 deletions.
1 change: 1 addition & 0 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
1 change: 1 addition & 0 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
8 changes: 4 additions & 4 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
"Populates the upgrade.from config in kafka streams required for upgrading kafka-streams version from 1 to 2. If the
value is non-nil it sets the config (the value validation is done in the kafka streams code), to unset the value the
config needs to be set as nil "
[properties]
(if-let [upgrade-from-config (get-in config [:ziggurat :upgrade-from])]
[properties upgrade-from-config]
(if (some? upgrade-from-config)
(.put properties StreamsConfig/UPGRADE_FROM_CONFIG upgrade-from-config)))

(defn- validate-auto-offset-reset-config
[auto-offset-reset-config]
(if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config)
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))))

(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms]}]
(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms upgrade-from]}]
(validate-auto-offset-reset-config auto-offset-reset-config)
(doto (Properties.)
(.put StreamsConfig/APPLICATION_ID_CONFIG application-id)
Expand All @@ -52,7 +52,7 @@
(.put StreamsConfig/BUFFERED_RECORDS_PER_PARTITION_CONFIG (int buffered-records-per-partition))
(.put StreamsConfig/COMMIT_INTERVAL_MS_CONFIG commit-interval-ms)
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)
(set-upgrade-from-config)))
(set-upgrade-from-config upgrade-from)))

(defn- get-metric-namespace [default topic]
(str (name topic) "." default))
Expand Down
3 changes: 2 additions & 1 deletion test/ziggurat/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
(def config-map {:stream-router {:vehicle {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count 1
:proto-class "flatland.protobuf.test.Example$Photo"}}})
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"}}})

(def props (doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG (get-in config-map [:stream-router :vehicle :bootstrap-servers]))
Expand Down

0 comments on commit 523d60b

Please sign in to comment.