Skip to content

Commit

Permalink
add functions to add/remove stream threads
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed May 12, 2021
1 parent 7028d0a commit b213422
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
[java.time Duration]
[java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.common.utils SystemTime]
[org.apache.kafka.common.errors TimeoutException]
[org.apache.kafka.streams KafkaStreams KafkaStreams$State StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier]
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))

(def default-config-for-stream
Expand Down Expand Up @@ -284,3 +282,22 @@
(start-streams (:stream-routes (mount/args)) (ziggurat-config)))
:stop (do (log/info "Stopping Kafka streams")
(stop-streams stream)))

(defn add-stream-thread
[stream]
(let [opt (.addStreamThread stream)
v (.orElse opt "Stream thread not created, it can only be created either its in the REBALANCING or RUNNING state")]
(log/infof "%s" v)))

(defn renove-stream-thread
([stream]
(let [opt (.removeStreamThread stream)
v (.orElse opt "Stream thread cannot be removed")]
(log/infof "%s" v)))
([stream timeout-ms]
(try
(let [opt (.removeStreamThread stream (Duration/ofMillis timeout-ms))
v (.orElse opt "Stream thread cannot be removed")]
(log/infof "%s" v))
(catch TimeoutException e
(log/infof "%s" (.getMessage e))))))

0 comments on commit b213422

Please sign in to comment.