diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a84f719..e9f723ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,38 +3,52 @@ All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/). +## 4.9.1 + +- Adds Graceful shutdown to the http server + ## 4.9.0 + - Improvises the publishing logic during consumption via subscribers. - Upgrades the state management for rabbitmq subscribers. ## 4.8.0 + - `rabbitmq-retry-count` is now available in `metadata` provided in user handler function. ## 4.7.6 + - Fixed a bug where kafka header with null values, throws Null Pointer Exception upon publishing to rabbitmq ## 4.7.5 + - Publishes metric to gauge time taken to send messages to rabbitmq ## 4.7.4 + - Updated dead-set APIs to replay and delete dead-set messages asynchronously ## 4.7.3 + - Fixed a bug where instantiation of channel pool leads to null pointer exception when stream route does not have :stream-threads-count defined ## 4.7.2 + - Releasing a new tag because the version 4.7.0 was already present in clojars. ## 4.7.0 + - Added a feature to retry non-recoverable exceptions during publishing messages on rabbitmq ## 4.6.4 -- user can provide `:prefetch-count` for RabbitMQ channel threads in `[:stream-router :channels :]` + +- user can provide `:prefetch-count` for RabbitMQ channel threads in `[:stream-router :channels :]` section of the config - Fixed a bug for overriding the default channel-pool configuration with the user provided config ## 4.6.3 + - RabbitMQ's connections use a DNS IP resolver to resolve DNS based hosts - Setting of HA policies from within ziggurat have been removed diff --git a/src/ziggurat/server.clj b/src/ziggurat/server.clj index ed511b80..41e6d558 100644 --- a/src/ziggurat/server.clj +++ b/src/ziggurat/server.clj @@ -5,11 +5,26 @@ [ring.adapter.jetty :as ring] [ziggurat.config :refer [ziggurat-config]] [ziggurat.server.routes :as routes]) - (:import (org.eclipse.jetty.server Server) - (java.time Instant))) + (:import (java.util.concurrent TimeoutException) + (org.eclipse.jetty.server Server) + (java.time Instant) + (org.eclipse.jetty.server.handler StatisticsHandler))) (add-encoder Instant encode-str) +(def default-stop-timeout-ms 30000) + +(defn configure-jetty [^Server server] + (let [stats-handler (StatisticsHandler.) + timeout-ms (get-in (ziggurat-config) + [:http-server :graceful-shutdown-timeout-ms] + default-stop-timeout-ms) + default-handler (.getHandler server)] + (.setHandler stats-handler default-handler) + (.setHandler server stats-handler) + (.setStopTimeout server timeout-ms) + (.setStopAtShutdown server true))) + (defn- start [handler] (let [conf (:http-server (ziggurat-config)) port (:port conf) @@ -19,11 +34,14 @@ :min-threads thread-count :max-threads thread-count :join? false - :send-server-version? false}))) + :send-server-version? false + :configurator configure-jetty}))) (defn- stop [^Server server] - (.stop server) - (log/info "Stopped server")) + (try + (.stop server) + (catch TimeoutException _ (log/info "Graceful shutdown timed out"))) + (log/info "Stopped server gracefully")) (defstate server :start (start (routes/handler (:actor-routes (mount/args)))) diff --git a/test/ziggurat/server/routes_test.clj b/test/ziggurat/server/routes_test.clj index 737e0993..d144b87c 100644 --- a/test/ziggurat/server/routes_test.clj +++ b/test/ziggurat/server/routes_test.clj @@ -13,13 +13,13 @@ (deftest router-dead-set-channel-disabled-test (let [stream-routes {:default {:handler-fn (fn []) - :channel-1 (fn [])}}] + :channel-1 (fn [])}}] (with-redefs [retry-enabled? (fn [] false)] (fix/with-start-server stream-routes (testing "should return 404 when /v1/dead_set/replay for channel is called and channel retry is disabled" (with-redefs [channel-retry-enabled? (constantly false)] - (let [params {:count "10" :topic-entity "default" :channel "channel-1"} + (let [params {:count "10" :topic-entity "default" :channel "channel-1"} {:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" params) expected-body {:error "Retry is not enabled"}] (is (= 404 status)) @@ -27,7 +27,7 @@ (testing "should return 404 when /v1/dead_set for channel is called and channel retry is disabled" (with-redefs [channel-retry-enabled? (constantly false)] - (let [params {:count "10" :topic-entity "default" :channel "channel-1"} + (let [params {:count "10" :topic-entity "default" :channel "channel-1"} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true true {} params) expected-body {:error "Retry is not enabled"}] (is (= 404 status)) @@ -35,7 +35,7 @@ (testing "should return 404 when delete /v1/dead_set for channel is called and channel retry is disabled" (with-redefs [channel-retry-enabled? (constantly false)] - (let [params {:count "10" :topic-entity "default" :channel "channel-1"} + (let [params {:count "10" :topic-entity "default" :channel "channel-1"} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true true {} params) expected-body {:error "Retry is not enabled"}] (is (= 404 status)) @@ -77,15 +77,15 @@ (testing "should return 200 when /v1/dead_set/replay is called with valid count val" (with-redefs [ds/replay (fn [_ _ _] nil)] - (let [count "10" + (let [count "10" params {:count count :topic-entity "default"} {:keys [status _]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" params)] (is (= 200 status))))) (testing "should return 400 when /v1/dead_set/replay is called with invalid count val" (with-redefs [ds/replay (fn [_ _ _] nil)] - (let [count "invalid-val" - topic-entity "default" + (let [count "invalid-val" + topic-entity "default" expected-body {:error "Count should be positive integer"} {:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" {:count count :topic-entity topic-entity})] (is (= 400 status)) @@ -93,7 +93,7 @@ (testing "should return 400 when /v1/dead_set/replay is called with no topic entity" (with-redefs [ds/replay (fn [_ _ _] nil)] - (let [count "10" + (let [count "10" expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} {:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" {:count count})] (is (= 400 status)) @@ -101,9 +101,9 @@ (testing "should return 400 when get /v1/dead_set is called with invalid count val" (with-redefs [ds/view (fn [_ _ _] nil)] - (let [count "avasdas" + (let [count "avasdas" topic-entity "default" - params {:count count :topic-name topic-entity} + params {:count count :topic-name topic-entity} {:keys [status _]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -114,9 +114,9 @@ (testing "should return 400 when get /v1/dead_set is called with negative count val" (with-redefs [ds/view (fn [_ _ _] nil)] - (let [count "-10" + (let [count "-10" topic-entity "default" - params {:count count :topic-name topic-entity} + params {:count count :topic-name topic-entity} {:keys [status _]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -128,8 +128,8 @@ (testing "should return 400 when get /v1/dead_set is called without topic entity" (with-redefs [ds/view (fn [_ _ _] nil)] (let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} - count "10" - params {:count count} + count "10" + params {:count count} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -143,8 +143,8 @@ (with-redefs [channel-retry-enabled? (constantly true)] (with-redefs [ds/view (fn [_ _ _] nil)] (let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} - count "10" - params {:count count :topic-entity "default" :channel "invalid"} + count "10" + params {:count count :topic-entity "default" :channel "invalid"} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -158,8 +158,8 @@ (with-redefs [channel-retry-enabled? (constantly true)] (with-redefs [ds/view (fn [_ _ _] nil)] (let [expected-body {:error "Count should be positive integer"} - count "-10" - params {:count count :topic-entity "default"} + count "-10" + params {:count count :topic-entity "default"} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -171,7 +171,7 @@ (testing "should return 200 when get /v1/dead_set is called with valid count val" (with-redefs [ds/view (fn [_ _ _] {:foo "bar"})] - (let [count "10" + (let [count "10" params {:count count :topic-entity "default"} {:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" @@ -184,7 +184,7 @@ (testing "should return 200 when delete /v1/dead_set is called with valid parameters" (with-redefs [ds/delete (fn [_ _ _] {:foo "bar"})] - (let [count "10" + (let [count "10" params {:count count :topic-entity "default"} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" @@ -196,7 +196,7 @@ (testing "should return 400 when delete /v1/dead_set is called with invalid count val" (with-redefs [ds/delete (fn [_ _ _] {:foo "bar"})] - (let [count "-10" + (let [count "-10" params {:count count :topic-entity "default"} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" @@ -209,9 +209,9 @@ (testing "should return 400 when delete /v1/dead_set is called with invalid count val" (with-redefs [ds/delete (fn [_ _ _] nil)] - (let [count "avasdas" + (let [count "avasdas" topic-entity "default" - params {:count count :topic-name topic-entity} + params {:count count :topic-name topic-entity} {:keys [status _]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -222,9 +222,9 @@ (testing "should return 400 when delete /v1/dead_set is called with negative count val" (with-redefs [ds/delete (fn [_ _ _] nil)] - (let [count "-10" + (let [count "-10" topic-entity "default" - params {:count count :topic-name topic-entity} + params {:count count :topic-name topic-entity} {:keys [status _]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -236,8 +236,8 @@ (testing "should return 400 when delete /v1/dead_set is called without topic entity" (with-redefs [ds/delete (fn [_ _ _] nil)] (let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} - count "10" - params {:count count} + count "10" + params {:count count} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true @@ -251,8 +251,8 @@ (with-redefs [channel-retry-enabled? (constantly true)] (with-redefs [ds/view (fn [_ _ _] nil)] (let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"} - count "10" - params {:count count :topic-entity "default" :channel "invalid"} + count "10" + params {:count count :topic-entity "default" :channel "invalid"} {:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true diff --git a/test/ziggurat/server/shutdown_test.clj b/test/ziggurat/server/shutdown_test.clj new file mode 100644 index 00000000..0c989024 --- /dev/null +++ b/test/ziggurat/server/shutdown_test.clj @@ -0,0 +1,49 @@ +(ns ziggurat.server.shutdown-test + (:require [clojure.test :refer :all] + [mount.core :as mount] + [ziggurat.config :refer [ziggurat-config]] + [ziggurat.fixtures :as fix] + [ziggurat.server :refer [server]] + [ziggurat.server.test-utils :as tu]) + (:import (org.eclipse.jetty.server Server) + (org.eclipse.jetty.server.handler StatisticsHandler))) + +(deftest http-server-graceful-shutdown-test + (testing "server should process existing requests within 30000ms when it is stopped" + (with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 3000) {:body "pong"}))] + (fix/mount-config) + (mount/start [#'server]) + (let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))] + ;; 1000 is the minimum sleep required for the future to run + (Thread/sleep 1000) + (mount/stop [#'server]) + (is (:body @http-fut) "pong") + (is (:status @http-fut) 200)))) + + (testing "server should discard new requests after the server is stopped, but should process the old request" + (with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 3000) {:body "pong"}))] + (fix/mount-config) + (mount/start [#'server]) + (let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))] + ;; 1000 is the minimum sleep required for the future to run + (Thread/sleep 1000) + (mount/stop [#'server]) + (is (thrown? Exception (tu/get (-> (ziggurat-config) :http-server :port) "/ping" false false))) + (is (:body @http-fut) "pong") + (is (:status @http-fut) 200)))) + (testing "server should stop after the graceful shutdown timeout and discard requests in progress" + (with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 4000) {:body "pong"})) + ziggurat.server/configure-jetty (fn [^Server server] + (let [stats-handler (StatisticsHandler.) + default-handler (.getHandler server)] + (.setHandler stats-handler default-handler) + (.setHandler server stats-handler) + (.setStopTimeout server 2000) + (.setStopAtShutdown server true)))] + (fix/mount-config) + (mount/start [#'server]) + (let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))] + ;; 1000 is the minimum sleep required for the future to run + (Thread/sleep 1000) + (mount/stop [#'server]) + (is (thrown? Exception @http-fut))))))