Skip to content

Commit

Permalink
Merge ea6fd6b into 7028d0a
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed May 12, 2021
2 parents 7028d0a + ea6fd6b commit 35ff370
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 60 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,14 +479,14 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
:origin-topic "kafka-topic-*"
:oldest-processed-message-in-s [604800 :int]
:changelog-topic-replication-factor [3 :int]
:stream-thread-exception-response [:shutdown-client :keyword]
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}
:enable-streams-uncaught-exception-handling [true :bool]
:default-api-timeout-ms-config [600000 :int]
:statsd {:host "localhost"
:port [8125 :int]
Expand Down Expand Up @@ -523,14 +523,14 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur

- app-name - Refers to the name of the application. Used to namespace queues and metrics.
- nrepl-server - Port on which the repl server will be hosted
- enable-streams-uncaught-exception-handling - If set to true, it adds a handler to Kafka Streams for dealing with uncaught exceptions. Currently, this handler just logs the name of the stream thread and the exception message.
- default-api-timeout-ms-config - Specifies the timeout (in milliseconds) for client APIs. This configuration is used as the default timeout for all client operations that do not specify a timeout parameter. The recommended value for Ziggurat based apps is 600000 ms (10 minutes).
- stream-router - Configs related to all the Kafka streams the application is reading from

- stream-id - the identifier of a stream that was mentioned in main.clj. Hence each stream can read from different Kafka brokers and have different number of threads (depending on the throughput of the stream).
- application-id - The Kafka consumer group id. [Documentation](https://kafka.apache.org/intro#intro_consumers)
- bootstrap-servers - The Kafka brokers that the application will read from. It accepts a comma seperated value.
- stream-threads-count - The number of parallel threads that should read messages from Kafka. This can scale up to the number of partitions on the topic you wish to read from.
- stream-thread-exception-response - This describes what particular action will be triggered if an uncaught exception is handled. Possible values are :shutdown-client (default), :shutdowm-application and :replace-thread. The 3 responses are documented [here](https://kafka-tutorials.confluent.io/error-handling/kstreams.html?_ga=2.107379330.1454767099.1620795696-1044723812.1563788148).
- origin-topic - The topic that the stream should read from. This can be a regex that enables you to read from multiple streams and handle the messages in the same way. It is to be kept in mind that the messages from different streams will be passed to the same mapper-function.
- oldest-processed-messages-in-s - The oldest message which will be processed by stream in second. By default the value is 604800 (1 week)
- changelog-topic-replication-factor - the internal changelog topic replication factor. By default the value is 3
Expand Down
45 changes: 22 additions & 23 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
:port [8126 :int]
:enabled [false :bool]}
:metrics {:constructor "ziggurat.dropwizard-metrics-wrapper/->DropwizardMetrics"}

:alpha-features {}
:sentry {:enabled [false :bool]
:dsn "dummy"
Expand Down Expand Up @@ -43,6 +42,7 @@
:retry {:type [:linear :keyword]
:count [5 :int]
:enabled [true :bool]}}}
:stream-thread-exception-response :shutdown-client
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries 5
Expand All @@ -62,28 +62,27 @@
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002"
:bootstrap-servers "localhost:9092"
:max-poll-records [1000 :int]
:origin-topic "topic"
:commit-interval-ms [5000 :int]
:poll-timeout-ms-config [1000 :int]
:thread-count [2 :int]
:session-timeout-ms-config [60000 :int]
:default-api-timeout-ms-config [60000 :int]
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}
:consumer-2 {:consumer-group-id "test-consumer-2002"
:bootstrap-servers "localhost:9092"
:max-poll-records [2000 :int]
:origin-topic "topic"
:poll-timeout-ms-config [1000 :int]
:thread-count [4 :int]
:session-timeout-ms-config [60000 :int]
:default-api-timeout-ms-config [60000 :int]
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:batch-routes {:consumer-1 {:consumer-group-id "test-consumer-1002"
:bootstrap-servers "localhost:9092"
:max-poll-records [1000 :int]
:origin-topic "topic"
:commit-interval-ms [5000 :int]
:poll-timeout-ms-config [1000 :int]
:thread-count [2 :int]
:session-timeout-ms-config [60000 :int]
:default-api-timeout-ms-config [60000 :int]
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}
:consumer-2 {:consumer-group-id "test-consumer-2002"
:bootstrap-servers "localhost:9092"
:max-poll-records [2000 :int]
:origin-topic "topic"
:poll-timeout-ms-config [1000 :int]
:thread-count [4 :int]
:session-timeout-ms-config [60000 :int]
:default-api-timeout-ms-config [60000 :int]
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:tracer {:enabled [true :bool]
:custom-provider ""}
:new-relic {:report-errors false}}}

37 changes: 18 additions & 19 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(:require [clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[ziggurat.channel :as chl]
[ziggurat.config :refer [get-in-config ziggurat-config build-streams-config-properties]]
[ziggurat.config :refer [build-streams-config-properties get-in-config ziggurat-config]]
[ziggurat.header-transformer :as header-transformer]
[ziggurat.mapper :refer [mapper-func]]
[ziggurat.message-payload :refer [->MessagePayload]]
Expand All @@ -16,11 +16,9 @@
[java.time Duration]
[java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.common.utils SystemTime]
[org.apache.kafka.streams KafkaStreams KafkaStreams$State StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.errors StreamsUncaughtExceptionHandler StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse]
[org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier]
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))

(def default-config-for-stream
Expand Down Expand Up @@ -217,14 +215,14 @@
:stream-joins (assoc config :consumer-type (:consumer-type config))
(assoc config :consumer-type :default)))

(defn- handle-uncaught-exception
[^Thread thread ^Throwable error]
(log/infof "Ziggurat Uncaught Handler Invoked for Thread: [%s] because of this exception: [%s]"
(.getName thread) (.getMessage error)))

(defn- uncaught-exception-handling-enabled? []
(or (:enable-streams-uncaught-exception-handling (ziggurat-config))
false))
(defn handle-uncaught-exception
[stream-thread-exception-response ^Throwable error]
(log/infof "Ziggurat Streams Uncaught Exception Handler Invoked: [%s]"
(.getMessage error))
(case stream-thread-exception-response
:shutdown-application StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/SHUTDOWN_APPLICATION
:replace-thread StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/REPLACE_THREAD
StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/SHUTDOWN_CLIENT))

(defn start-streams
([stream-routes]
Expand All @@ -240,14 +238,15 @@
(merge-consumer-type-config)
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
(when-not (nil? stream)
(when (uncaught-exception-handling-enabled?)
(if-not (nil? stream)
(do
(.setUncaughtExceptionHandler stream
(reify Thread$UncaughtExceptionHandler
(^void uncaughtException [_ ^Thread thread, ^Throwable error]
(handle-uncaught-exception thread error topic-entity)))))
(.start stream)
(assoc streams topic-entity stream))))
(reify StreamsUncaughtExceptionHandler
(^StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse handle [_ ^Throwable error]
(handle-uncaught-exception (get stream-config :stream-thread-exception-response :shutdown-client) error))))
(.start stream)
(assoc streams topic-entity stream))
streams)))
{}
stream-routes)))

Expand Down
43 changes: 27 additions & 16 deletions test/ziggurat/streams_test.clj
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
(ns ziggurat.streams-test
(:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]]
[protobuf.core :as proto]
[mount.core :as mount]
[ziggurat.streams :refer [start-streams stop-streams stop-stream start-stream]]
[ziggurat.fixtures :as fix]
[protobuf.core :as proto]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.fixtures :as fix]
[ziggurat.middleware.default :as default-middleware]
[ziggurat.middleware.stream-joins :as stream-joins-middleware]
[ziggurat.middleware.json :as json-middleware]
[ziggurat.tracer :refer [tracer]]
[ziggurat.mapper :refer [mapper-func]]
[ziggurat.config :as config])
[ziggurat.middleware.stream-joins :as stream-joins-middleware]
[ziggurat.streams :refer [handle-uncaught-exception start-stream start-streams stop-stream stop-streams]]
[ziggurat.tracer :refer [tracer]])
(:import [flatland.protobuf.test Example$Photo]
[io.opentracing.tag Tags]
[java.util Properties]
[org.apache.kafka.clients.producer ProducerConfig]
(org.apache.kafka.common.utils MockTime)
[org.apache.kafka.streams KeyValue]
[org.apache.kafka.streams KafkaStreams$State]
[org.apache.kafka.streams.integration.utils IntegrationTestUtils]
[io.opentracing.tag Tags]
(org.apache.kafka.common.utils MockTime)))
[org.apache.kafka.streams.errors StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse]
[org.apache.kafka.streams.integration.utils IntegrationTestUtils]))

(use-fixtures :once (join-fixtures [fix/mount-config-with-tracer
fix/silence-logging
Expand Down Expand Up @@ -218,8 +217,8 @@
(deftest start-stream-joins-test-with-inner-join
(testing "stream joins using inner join"
(let [orig-config (ziggurat-config)]
(with-redefs [config/ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(with-redefs [ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
Expand Down Expand Up @@ -248,8 +247,8 @@
(deftest start-stream-joins-test-with-left-join
(testing "stream joins using left join"
(let [orig-config (ziggurat-config)]
(with-redefs [config/ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(with-redefs [ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
Expand Down Expand Up @@ -278,8 +277,8 @@
(deftest start-stream-joins-test-with-outer-join
(testing "stream joins using outer join"
(let [orig-config (ziggurat-config)]
(with-redefs [config/ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(with-redefs [ziggurat-config (fn [] (-> orig-config
(assoc-in [:alpha-features :stream-joins] true)))]
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
Expand Down Expand Up @@ -386,3 +385,15 @@
(Thread/sleep 5000) ;;wating for streams to consume messages
(stop-streams streams)
(is (= times @message-received-count))))

(deftest handle-uncaught-exception-test
(let [t (Throwable. "foobar")]
(testing "should return SHUTDOWN_CLIENT"
(let [r (handle-uncaught-exception :shutdown-client t)]
(is (= r StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/SHUTDOWN_CLIENT))))
(testing "should return SHUTDOWN_APPLICATION"
(let [r (handle-uncaught-exception :shutdown-application t)]
(is (= r StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/SHUTDOWN_APPLICATION))))
(testing "should return REPLACE_THREAD"
(let [r (handle-uncaught-exception :replace-thread t)]
(is (= r StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/REPLACE_THREAD))))))

0 comments on commit 35ff370

Please sign in to comment.