Skip to content

Commit

Permalink
add functions to add/remove stream threads
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed May 20, 2021
1 parent 7028d0a commit b1ddc5b
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 19 deletions.
27 changes: 24 additions & 3 deletions src/ziggurat/streams.clj
Expand Up @@ -16,11 +16,9 @@
[java.time Duration]
[java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.common.utils SystemTime]
[org.apache.kafka.common.errors TimeoutException]
[org.apache.kafka.streams KafkaStreams KafkaStreams$State StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier]
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))

(def default-config-for-stream
Expand Down Expand Up @@ -284,3 +282,26 @@
(start-streams (:stream-routes (mount/args)) (ziggurat-config)))
:stop (do (log/info "Stopping Kafka streams")
(stop-streams stream)))

(defn add-stream-thread
[topic-entity]
(let [opt (.addStreamThread (get stream topic-entity))
v (.orElse opt "Stream thread not created, it can only be created either its in the REBALANCING or RUNNING state")]
(log/infof "%s" v)))

(defn remove-stream-thread
([topic-entity]
(let [opt (.removeStreamThread (get stream topic-entity))
v (.orElse opt "Stream thread cannot be removed")]
(log/infof "%s" v)))
([topic-entity timeout-ms]
(try
(let [opt (.removeStreamThread (get stream topic-entity) (Duration/ofMillis timeout-ms))
v (.orElse opt "Stream thread cannot be removed")]
(log/infof "%s" v))
(catch TimeoutException e
(log/infof "%s" (.getMessage e))))))

(defn get-stream-thread-count
[topic-entity]
(.size (.localThreadsMetadata (get stream topic-entity))))
78 changes: 62 additions & 16 deletions test/ziggurat/streams_test.clj
@@ -1,24 +1,22 @@
(ns ziggurat.streams-test
(:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]]
[protobuf.core :as proto]
[mount.core :as mount]
[ziggurat.streams :refer [start-streams stop-streams stop-stream start-stream]]
[ziggurat.fixtures :as fix]
[protobuf.core :as proto]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.fixtures :as fix]
[ziggurat.middleware.default :as default-middleware]
[ziggurat.middleware.stream-joins :as stream-joins-middleware]
[ziggurat.middleware.json :as json-middleware]
[ziggurat.tracer :refer [tracer]]
[ziggurat.mapper :refer [mapper-func]]
[ziggurat.config :as config])
[ziggurat.middleware.stream-joins :as stream-joins-middleware]
[ziggurat.streams :refer [add-stream-thread get-stream-thread-count remove-stream-thread start-streams stop-streams stop-stream start-stream]]
[ziggurat.tracer :refer [tracer]])
(:import [flatland.protobuf.test Example$Photo]
[io.opentracing.tag Tags]
[java.util Properties]
[org.apache.kafka.clients.producer ProducerConfig]
(org.apache.kafka.common.utils MockTime)
[org.apache.kafka.streams KeyValue]
[org.apache.kafka.streams KafkaStreams$State]
[org.apache.kafka.streams.integration.utils IntegrationTestUtils]
[io.opentracing.tag Tags]
(org.apache.kafka.common.utils MockTime)))
[org.apache.kafka.streams.integration.utils IntegrationTestUtils]))

(use-fixtures :once (join-fixtures [fix/mount-config-with-tracer
fix/silence-logging
Expand Down Expand Up @@ -218,8 +216,8 @@
(deftest start-stream-joins-test-with-inner-join
(testing "stream joins using inner join"
(let [orig-config (ziggurat-config)]
(with-redefs [config/ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(with-redefs [ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
Expand Down Expand Up @@ -248,8 +246,8 @@
(deftest start-stream-joins-test-with-left-join
(testing "stream joins using left join"
(let [orig-config (ziggurat-config)]
(with-redefs [config/ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(with-redefs [ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
Expand Down Expand Up @@ -278,8 +276,8 @@
(deftest start-stream-joins-test-with-outer-join
(testing "stream joins using outer join"
(let [orig-config (ziggurat-config)]
(with-redefs [config/ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(with-redefs [ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
Expand Down Expand Up @@ -386,3 +384,51 @@
(Thread/sleep 5000) ;;wating for streams to consume messages
(stop-streams streams)
(is (= times @message-received-count))))

(deftest add-remove-stream-thread-test
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count)
times 6
kvs (repeat times message-key-value)
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
streams (start-streams {:default {:handler-fn handler-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :application-id] (rand-application-id))
(assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))
_ (Thread/sleep 20000) ;;waiting for streams to start
stream-thread-count-before-add (get-stream-thread-count :default)
_ (add-stream-thread :default)
stream-thread-count-after-add (get-stream-thread-count :default)
_ (is (= stream-thread-count-after-add (+ stream-thread-count-before-add 1)))
_ (remove-stream-thread :default)
stream-thread-count-after-remove (get-stream-thread-count :default)
_ (is (= stream-thread-count-before-add stream-thread-count-after-remove))]
(IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic])
kvs
(props)
(MockTime.))
(Thread/sleep 5000) ;;wating for streams to consume messages
(stop-streams streams)
(is (= times @message-received-count))))

(deftest remove-stream-thread-to-zero-test
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count)
times 0
kvs (repeat times message-key-value)
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
streams (start-streams {:default {:handler-fn handler-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :application-id] (rand-application-id))
(assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))
_ (Thread/sleep 10000) ;;waiting for streams to start
_ (remove-stream-thread :default)
stream-thread-count-after-remove (get-stream-thread-count :default)
_ (is (= stream-thread-count-after-remove 0))]
(IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic])
kvs
(props)
(MockTime.))
(Thread/sleep 5000) ;;wating for streams to consume messages
(stop-streams streams)
(is (= times @message-received-count))))

0 comments on commit b1ddc5b

Please sign in to comment.