diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a84f719..e9f723ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,38 +3,52 @@ All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/). +## 4.9.1 + +- Adds Graceful shutdown to the http server + ## 4.9.0 + - Improvises the publishing logic during consumption via subscribers. - Upgrades the state management for rabbitmq subscribers. ## 4.8.0 + - `rabbitmq-retry-count` is now available in `metadata` provided in user handler function. ## 4.7.6 + - Fixed a bug where kafka header with null values, throws Null Pointer Exception upon publishing to rabbitmq ## 4.7.5 + - Publishes metric to gauge time taken to send messages to rabbitmq ## 4.7.4 + - Updated dead-set APIs to replay and delete dead-set messages asynchronously ## 4.7.3 + - Fixed a bug where instantiation of channel pool leads to null pointer exception when stream route does not have :stream-threads-count defined ## 4.7.2 + - Releasing a new tag because the version 4.7.0 was already present in clojars. ## 4.7.0 + - Added a feature to retry non-recoverable exceptions during publishing messages on rabbitmq ## 4.6.4 -- user can provide `:prefetch-count` for RabbitMQ channel threads in `[:stream-router :channels :]` + +- user can provide `:prefetch-count` for RabbitMQ channel threads in `[:stream-router :channels :]` section of the config - Fixed a bug for overriding the default channel-pool configuration with the user provided config ## 4.6.3 + - RabbitMQ's connections use a DNS IP resolver to resolve DNS based hosts - Setting of HA policies from within ziggurat have been removed diff --git a/README.md b/README.md index cbe59c8c..3a5dfb8d 100644 --- a/README.md +++ b/README.md @@ -524,6 +524,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} :http-server {:port [8010 :int] + :graceful-shutdown-timeout-ms [30000 :int] :new-relic {:report-errors [false :bool]}}}} ``` @@ -556,7 +557,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur - rabbit-mq - The queues that are part of the retry mechanism - retry - The number of times the message should be retried and if retry flow should be enabled or not - jobs - The number of consumers that should be reading from the retry queues and the prefetch count of each consumer -- http-server - Ziggurat starts an http server by default and gives apis for ping health-check and deadset management. This defines the port and the number of threads of the http server. +- http-server - Ziggurat starts an http server by default and gives apis for ping health-check and deadset management. This defines the port and the number of threads of the http server. It also controls the graceful shutdown timeout of the HTTP server. Default is `30000ms` - new-relic - If report-errors is true, whenever a :failure keyword is returned from the mapper-function or an exception is raised while executing it, an error is reported to new-relic. You can skip this flow by disabling it. ## Contribution diff --git a/src/ziggurat/server.clj b/src/ziggurat/server.clj index ed511b80..41e6d558 100644 --- a/src/ziggurat/server.clj +++ b/src/ziggurat/server.clj @@ -5,11 +5,26 @@ [ring.adapter.jetty :as ring] [ziggurat.config :refer [ziggurat-config]] [ziggurat.server.routes :as routes]) - (:import (org.eclipse.jetty.server Server) - (java.time Instant))) + (:import (java.util.concurrent TimeoutException) + (org.eclipse.jetty.server Server) + (java.time Instant) + (org.eclipse.jetty.server.handler StatisticsHandler))) (add-encoder Instant encode-str) +(def default-stop-timeout-ms 30000) + +(defn configure-jetty [^Server server] + (let [stats-handler (StatisticsHandler.) + timeout-ms (get-in (ziggurat-config) + [:http-server :graceful-shutdown-timeout-ms] + default-stop-timeout-ms) + default-handler (.getHandler server)] + (.setHandler stats-handler default-handler) + (.setHandler server stats-handler) + (.setStopTimeout server timeout-ms) + (.setStopAtShutdown server true))) + (defn- start [handler] (let [conf (:http-server (ziggurat-config)) port (:port conf) @@ -19,11 +34,14 @@ :min-threads thread-count :max-threads thread-count :join? false - :send-server-version? false}))) + :send-server-version? false + :configurator configure-jetty}))) (defn- stop [^Server server] - (.stop server) - (log/info "Stopped server")) + (try + (.stop server) + (catch TimeoutException _ (log/info "Graceful shutdown timed out"))) + (log/info "Stopped server gracefully")) (defstate server :start (start (routes/handler (:actor-routes (mount/args)))) diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index 08ccc155..d7b97c95 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -6,8 +6,10 @@ [ziggurat.messaging.connection-helper :as rmqc] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] + [ziggurat.server :as server] [ziggurat.messaging.channel-pool :as cpool] [ziggurat.streams :as streams] + [ziggurat.nrepl-server :as nrs] [ziggurat.server.test-utils :as tu] [ziggurat.tracer :as tracer] [ziggurat.fixtures :refer [with-config]] @@ -26,6 +28,10 @@ streams/stop-streams (constantly nil) ;; will be called valid modes number of times cpool/create-channel-pool (fn [_] (reset! result (* @result 3))) + server/start (constantly nil) + server/stop (constantly nil) + nrs/start (constantly nil) + nrs/stop (constantly nil) rmqc/start-connection (fn [_] (do (reset! result (* @result 2)) nil)) rmqc/stop-connection (constantly nil) cpool/destroy-channel-pool (constantly nil) @@ -39,6 +45,10 @@ (testing "The actor stop fn stops before the ziggurat state" (let [result (atom 1)] (with-redefs [streams/start-streams (constantly nil) + server/start (constantly nil) + server/stop (constantly nil) + nrs/start (constantly nil) + nrs/stop (constantly nil) streams/stop-streams (fn [_] (reset! result (* @result 2))) tracer/create-tracer (fn [] (MockTracer.))] (with-config diff --git a/test/ziggurat/server/routes_test.clj b/test/ziggurat/server/routes_test.clj index 737e0993..d144b87c 100644 --- a/test/ziggurat/server/routes_test.clj +++ b/test/ziggurat/server/routes_test.clj @@ -13,13 +13,13 @@ (deftest router-dead-set-channel-disabled-test (let [stream-routes {:default {:handler-fn (fn []) - :channel-1 (fn [])}}] + :channel-1 (fn [])}}] (with-redefs [retry-enabled? (fn [] false)] (fix/with-start-server stream-routes (testing "should return 404 when /v1/dead_set/replay for channel is called and channel retry is disabled" (with-redefs [channel-retry-enabled? (constantly false)] - (let [params {:count "10" :topic-entity "default" :channel "channel-1"} + (let [params {:count "10" :topic-entity "default" :channel "channel-1"} {:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" params) expected-body {:error "Retry is not enabled"}] (is (= 404 status)) @@ -27,7 +27,7 @@ (testing "should return 404 when /v1/dead_set for channel is called and channel retry is disabled" (with-redefs [channel-retry-enabled? (constantly false)] - (let [params {:count "10" :topic-entity "default" :channel "channel-1"} + (let [params {:count "10" :topic-entity "default" :channel "channel-1"} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true true {} params) expected-body {:error "Retry is not enabled"}] (is (= 404 status)) @@ -35,7 +35,7 @@ (testing "should return 404 when delete /v1/dead_set for channel is called and channel retry is disabled" (with-redefs [channel-retry-enabled? (constantly false)] - (let [params {:count "10" :topic-entity "default" :channel "channel-1"} + (let [params {:count "10" :topic-entity "default" :channel "channel-1"} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true true {} params) expected-body {:error "Retry is not enabled"}] (is (= 404 status)) @@ -77,15 +77,15 @@ (testing "should return 200 when /v1/dead_set/replay is called with valid count val" (with-redefs [ds/replay (fn [_ _ _] nil)] - (let [count "10" + (let [count "10" params {:count count :topic-entity "default"} {:keys [status _]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" params)] (is (= 200 status))))) (testing "should return 400 when /v1/dead_set/replay is called with invalid count val" (with-redefs [ds/replay (fn [_ _ _] nil)] - (let [count "invalid-val" - topic-entity "default" + (let [count "invalid-val" + topic-entity "default" expected-body {:error "Count should be positive integer"} {:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" {:count count :topic-entity topic-entity})] (is (= 400 status)) @@ -93,7 +93,7 @@ (testing "should return 400 when /v1/dead_set/replay is called with no topic entity" (with-redefs [ds/replay (fn [_ _ _] nil)] - (let [count "10" + (let [count "10" expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} {:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" {:count count})] (is (= 400 status)) @@ -101,9 +101,9 @@ (testing "should return 400 when get /v1/dead_set is called with invalid count val" (with-redefs [ds/view (fn [_ _ _] nil)] - (let [count "avasdas" + (let [count "avasdas" topic-entity "default" - params {:count count :topic-name topic-entity} + params {:count count :topic-name topic-entity} {:keys [status _]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -114,9 +114,9 @@ (testing "should return 400 when get /v1/dead_set is called with negative count val" (with-redefs [ds/view (fn [_ _ _] nil)] - (let [count "-10" + (let [count "-10" topic-entity "default" - params {:count count :topic-name topic-entity} + params {:count count :topic-name topic-entity} {:keys [status _]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -128,8 +128,8 @@ (testing "should return 400 when get /v1/dead_set is called without topic entity" (with-redefs [ds/view (fn [_ _ _] nil)] (let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} - count "10" - params {:count count} + count "10" + params {:count count} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -143,8 +143,8 @@ (with-redefs [channel-retry-enabled? (constantly true)] (with-redefs [ds/view (fn [_ _ _] nil)] (let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} - count "10" - params {:count count :topic-entity "default" :channel "invalid"} + count "10" + params {:count count :topic-entity "default" :channel "invalid"} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -158,8 +158,8 @@ (with-redefs [channel-retry-enabled? (constantly true)] (with-redefs [ds/view (fn [_ _ _] nil)] (let [expected-body {:error "Count should be positive integer"} - count "-10" - params {:count count :topic-entity "default"} + count "-10" + params {:count count :topic-entity "default"} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -171,7 +171,7 @@ (testing "should return 200 when get /v1/dead_set is called with valid count val" (with-redefs [ds/view (fn [_ _ _] {:foo "bar"})] - (let [count "10" + (let [count "10" params {:count count :topic-entity "default"} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" @@ -184,7 +184,7 @@ (testing "should return 200 when delete /v1/dead_set is called with valid parameters" (with-redefs [ds/delete (fn [_ _ _] {:foo "bar"})] - (let [count "10" + (let [count "10" params {:count count :topic-entity "default"} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" @@ -196,7 +196,7 @@ (testing "should return 400 when delete /v1/dead_set is called with invalid count val" (with-redefs [ds/delete (fn [_ _ _] {:foo "bar"})] - (let [count "-10" + (let [count "-10" params {:count count :topic-entity "default"} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" @@ -209,9 +209,9 @@ (testing "should return 400 when delete /v1/dead_set is called with invalid count val" (with-redefs [ds/delete (fn [_ _ _] nil)] - (let [count "avasdas" + (let [count "avasdas" topic-entity "default" - params {:count count :topic-name topic-entity} + params {:count count :topic-name topic-entity} {:keys [status _]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -222,9 +222,9 @@ (testing "should return 400 when delete /v1/dead_set is called with negative count val" (with-redefs [ds/delete (fn [_ _ _] nil)] - (let [count "-10" + (let [count "-10" topic-entity "default" - params {:count count :topic-name topic-entity} + params {:count count :topic-name topic-entity} {:keys [status _]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -236,8 +236,8 @@ (testing "should return 400 when delete /v1/dead_set is called without topic entity" (with-redefs [ds/delete (fn [_ _ _] nil)] (let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} - count "10" - params {:count count} + count "10" + params {:count count} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -251,8 +251,8 @@ (with-redefs [channel-retry-enabled? (constantly true)] (with-redefs [ds/view (fn [_ _ _] nil)] (let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} - count "10" - params {:count count :topic-entity "default" :channel "invalid"} + count "10" + params {:count count :topic-entity "default" :channel "invalid"} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true diff --git a/test/ziggurat/server/shutdown_test.clj b/test/ziggurat/server/shutdown_test.clj new file mode 100644 index 00000000..0c989024 --- /dev/null +++ b/test/ziggurat/server/shutdown_test.clj @@ -0,0 +1,49 @@ +(ns ziggurat.server.shutdown-test + (:require [clojure.test :refer :all] + [mount.core :as mount] + [ziggurat.config :refer [ziggurat-config]] + [ziggurat.fixtures :as fix] + [ziggurat.server :refer [server]] + [ziggurat.server.test-utils :as tu]) + (:import (org.eclipse.jetty.server Server) + (org.eclipse.jetty.server.handler StatisticsHandler))) + +(deftest http-server-graceful-shutdown-test + (testing "server should process existing requests within 30000ms when it is stopped" + (with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 3000) {:body "pong"}))] + (fix/mount-config) + (mount/start [#'server]) + (let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))] + ;; 1000 is the minimum sleep required for the future to run + (Thread/sleep 1000) + (mount/stop [#'server]) + (is (:body @http-fut) "pong") + (is (:status @http-fut) 200)))) + + (testing "server should discard new requests after the server is stopped, but should process the old request" + (with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 3000) {:body "pong"}))] + (fix/mount-config) + (mount/start [#'server]) + (let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))] + ;; 1000 is the minimum sleep required for the future to run + (Thread/sleep 1000) + (mount/stop [#'server]) + (is (thrown? Exception (tu/get (-> (ziggurat-config) :http-server :port) "/ping" false false))) + (is (:body @http-fut) "pong") + (is (:status @http-fut) 200)))) + (testing "server should stop after the graceful shutdown timeout and discard requests in progress" + (with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 4000) {:body "pong"})) + ziggurat.server/configure-jetty (fn [^Server server] + (let [stats-handler (StatisticsHandler.) + default-handler (.getHandler server)] + (.setHandler stats-handler default-handler) + (.setHandler server stats-handler) + (.setStopTimeout server 2000) + (.setStopAtShutdown server true)))] + (fix/mount-config) + (mount/start [#'server]) + (let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))] + ;; 1000 is the minimum sleep required for the future to run + (Thread/sleep 1000) + (mount/stop [#'server]) + (is (thrown? Exception @http-fut))))))