From 2757d44f3e232d1d6e9a13d2b0203879d692eb6d Mon Sep 17 00:00:00 2001 From: Michael Angelo Calimlim Date: Wed, 12 May 2021 18:17:51 +0800 Subject: [PATCH] add functions to add/remove stream threads --- src/ziggurat/streams.clj | 27 ++++++++- test/ziggurat/streams_test.clj | 100 +++++++++++++++++++++++++++------ 2 files changed, 108 insertions(+), 19 deletions(-) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 50135c0f..1476fda5 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -16,11 +16,9 @@ [java.time Duration] [java.util Properties] [java.util.regex Pattern] - [org.apache.kafka.common.serialization Serdes] - [org.apache.kafka.common.utils SystemTime] + [org.apache.kafka.common.errors TimeoutException] [org.apache.kafka.streams KafkaStreams KafkaStreams$State StreamsConfig StreamsBuilder Topology] [org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier] - [org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (def default-config-for-stream @@ -284,3 +282,26 @@ (start-streams (:stream-routes (mount/args)) (ziggurat-config))) :stop (do (log/info "Stopping Kafka streams") (stop-streams stream))) + +(defn add-stream-thread + [topic-entity] + (let [opt (.addStreamThread (get stream topic-entity)) + v (.orElse opt "Stream thread not created, it can only be created either its in the REBALANCING or RUNNING state")] + (log/infof "%s" v))) + +(defn remove-stream-thread + ([topic-entity] + (let [opt (.removeStreamThread (get stream topic-entity)) + v (.orElse opt "Stream thread cannot be removed")] + (log/infof "%s" v))) + ([topic-entity timeout-ms] + (try + (let [opt (.removeStreamThread (get stream topic-entity) (Duration/ofMillis timeout-ms)) + v (.orElse opt "Stream thread cannot be removed")] + (log/infof "%s" v)) + (catch TimeoutException e + (log/infof "%s" (.getMessage e)))))) + +(defn get-stream-thread-count + [topic-entity] + (.size (.localThreadsMetadata (get stream topic-entity)))) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index bde15ded..94ce9851 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -1,24 +1,22 @@ (ns ziggurat.streams-test (:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] - [protobuf.core :as proto] [mount.core :as mount] - [ziggurat.streams :refer [start-streams stop-streams stop-stream start-stream]] - [ziggurat.fixtures :as fix] + [protobuf.core :as proto] [ziggurat.config :refer [ziggurat-config]] + [ziggurat.fixtures :as fix] [ziggurat.middleware.default :as default-middleware] - [ziggurat.middleware.stream-joins :as stream-joins-middleware] [ziggurat.middleware.json :as json-middleware] - [ziggurat.tracer :refer [tracer]] - [ziggurat.mapper :refer [mapper-func]] - [ziggurat.config :as config]) + [ziggurat.middleware.stream-joins :as stream-joins-middleware] + [ziggurat.streams :refer [add-stream-thread get-stream-thread-count remove-stream-thread start-streams stop-streams stop-stream start-stream]] + [ziggurat.tracer :refer [tracer]]) (:import [flatland.protobuf.test Example$Photo] + [io.opentracing.tag Tags] [java.util Properties] [org.apache.kafka.clients.producer ProducerConfig] + (org.apache.kafka.common.utils MockTime) [org.apache.kafka.streams KeyValue] [org.apache.kafka.streams KafkaStreams$State] - [org.apache.kafka.streams.integration.utils IntegrationTestUtils] - [io.opentracing.tag Tags] - (org.apache.kafka.common.utils MockTime))) + [org.apache.kafka.streams.integration.utils IntegrationTestUtils])) (use-fixtures :once (join-fixtures [fix/mount-config-with-tracer fix/silence-logging @@ -218,8 +216,8 @@ (deftest start-stream-joins-test-with-inner-join (testing "stream joins using inner join" (let [orig-config (ziggurat-config)] - (with-redefs [config/ziggurat-config (fn [] (-> orig-config - (assoc-in [:alpha-features :stream-joins] true)))] + (with-redefs [ziggurat-config (fn [] (-> orig-config + (assoc-in [:alpha-features :stream-joins] true)))] (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 @@ -248,8 +246,8 @@ (deftest start-stream-joins-test-with-left-join (testing "stream joins using left join" (let [orig-config (ziggurat-config)] - (with-redefs [config/ziggurat-config (fn [] (-> orig-config - (assoc-in [:alpha-features :stream-joins] true)))] + (with-redefs [ziggurat-config (fn [] (-> orig-config + (assoc-in [:alpha-features :stream-joins] true)))] (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 @@ -278,8 +276,8 @@ (deftest start-stream-joins-test-with-outer-join (testing "stream joins using outer join" (let [orig-config (ziggurat-config)] - (with-redefs [config/ziggurat-config (fn [] (-> orig-config - (assoc-in [:alpha-features :stream-joins] true)))] + (with-redefs [ziggurat-config (fn [] (-> orig-config + (assoc-in [:alpha-features :stream-joins] true)))] (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 @@ -386,3 +384,73 @@ (Thread/sleep 5000) ;;wating for streams to consume messages (stop-streams streams) (is (= times @message-received-count)))) + +(deftest add-remove-stream-thread-test + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count) + times 6 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor))) + _ (Thread/sleep 20000) ;;waiting for streams to start + stream-thread-count-before-add (get-stream-thread-count :default) + _ (add-stream-thread :default) + stream-thread-count-after-add (get-stream-thread-count :default) + _ (is (= stream-thread-count-after-add (+ stream-thread-count-before-add 1))) + _ (remove-stream-thread :default) + stream-thread-count-after-remove (get-stream-thread-count :default) + _ (is (= stream-thread-count-before-add stream-thread-count-after-remove))] + (IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic]) + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count)))) + +(deftest remove-stream-thread-to-zero-test + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count) + times 0 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor))) + _ (Thread/sleep 10000) ;;waiting for streams to start + _ (remove-stream-thread :default) + stream-thread-count-after-remove (get-stream-thread-count :default) + _ (is (= stream-thread-count-after-remove 0))] + (IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic]) + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count)))) + +(deftest remove-stream-thread-with-timeout-to-zero-test + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count) + times 0 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor))) + _ (Thread/sleep 10000) ;;waiting for streams to start + _ (remove-stream-thread :default 5000) + stream-thread-count-after-remove (get-stream-thread-count :default) + _ (is (= stream-thread-count-after-remove 0))] + (IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic]) + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count))))