From b86e0e929eab649690bb024b2a7f5e3e1bfc7285 Mon Sep 17 00:00:00 2001 From: Michael Angelo Calimlim Date: Wed, 12 May 2021 18:17:51 +0800 Subject: [PATCH] add functions to add/remove stream threads --- src/ziggurat/streams.clj | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 50135c0f..f0fe03f1 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -16,11 +16,9 @@ [java.time Duration] [java.util Properties] [java.util.regex Pattern] - [org.apache.kafka.common.serialization Serdes] - [org.apache.kafka.common.utils SystemTime] + [org.apache.kafka.common.errors TimeoutException] [org.apache.kafka.streams KafkaStreams KafkaStreams$State StreamsConfig StreamsBuilder Topology] [org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier] - [org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (def default-config-for-stream @@ -284,3 +282,22 @@ (start-streams (:stream-routes (mount/args)) (ziggurat-config))) :stop (do (log/info "Stopping Kafka streams") (stop-streams stream))) + +(defn add-stream-thread + [stream] + (let [opt (.addStreamThread stream) + v (.orElse opt "Stream thread not created, it can only be created either its in the REBALANCING or RUNNING state")] + (log/infof "%s" v))) + +(defn renove-stream-thread + ([stream] + (let [opt (.removeStreamThread stream) + v (.orElse opt "Stream thread cannot be removed")] + (log/infof "%s" v))) + ([stream timeout-ms] + (try + (let [opt (.removeStreamThread stream (Duration/ofMillis timeout-ms)) + v (.orElse opt "Stream thread cannot be removed")] + (log/infof "%s" v)) + (catch TimeoutException e + (log/infof "%s" (.getMessage e))))))