Skip to content

Commit

Permalink
Merge b9961fe into d95d0bd
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang93 committed Feb 14, 2023
2 parents d95d0bd + b9961fe commit 61f27a2
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 35 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,52 @@
All notable changes to this project will be documented in this file. This change log follows the conventions
of [keepachangelog.com](http://keepachangelog.com/).

## 4.9.1

- Adds Graceful shutdown to the http server

## 4.9.0

- Improvises the publishing logic during consumption via subscribers.
- Upgrades the state management for rabbitmq subscribers.

## 4.8.0

- `rabbitmq-retry-count` is now available in `metadata` provided in user handler function.

## 4.7.6

- Fixed a bug where kafka header with null values, throws Null Pointer Exception upon publishing to rabbitmq

## 4.7.5

- Publishes metric to gauge time taken to send messages to rabbitmq

## 4.7.4

- Updated dead-set APIs to replay and delete dead-set messages asynchronously

## 4.7.3

- Fixed a bug where instantiation of channel pool leads to null pointer exception when stream route does not have
:stream-threads-count defined

## 4.7.2

- Releasing a new tag because the version 4.7.0 was already present in clojars.

## 4.7.0

- Added a feature to retry non-recoverable exceptions during publishing messages on rabbitmq

## 4.6.4
- user can provide `:prefetch-count` for RabbitMQ channel threads in `[:stream-router :channels :<channel_key>]`

- user can provide `:prefetch-count` for RabbitMQ channel threads in `[:stream-router :channels :<channel_key>]`
section of the config
- Fixed a bug for overriding the default channel-pool configuration with the user provided config

## 4.6.3

- RabbitMQ's connections use a DNS IP resolver to resolve DNS based hosts
- Setting of HA policies from within ziggurat have been removed

Expand Down
28 changes: 23 additions & 5 deletions src/ziggurat/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,26 @@
[ring.adapter.jetty :as ring]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.server.routes :as routes])
(:import (org.eclipse.jetty.server Server)
(java.time Instant)))
(:import (java.util.concurrent TimeoutException)
(org.eclipse.jetty.server Server)
(java.time Instant)
(org.eclipse.jetty.server.handler StatisticsHandler)))

(add-encoder Instant encode-str)

(def default-stop-timeout-ms 30000)

(defn configure-jetty [^Server server]
(let [stats-handler (StatisticsHandler.)
timeout-ms (get-in (ziggurat-config)
[:http-server :graceful-shutdown-timeout-ms]
default-stop-timeout-ms)
default-handler (.getHandler server)]
(.setHandler stats-handler default-handler)
(.setHandler server stats-handler)
(.setStopTimeout server timeout-ms)
(.setStopAtShutdown server true)))

(defn- start [handler]
(let [conf (:http-server (ziggurat-config))
port (:port conf)
Expand All @@ -19,11 +34,14 @@
:min-threads thread-count
:max-threads thread-count
:join? false
:send-server-version? false})))
:send-server-version? false
:configurator configure-jetty})))

(defn- stop [^Server server]
(.stop server)
(log/info "Stopped server"))
(try
(.stop server)
(catch TimeoutException _ (log/info "Graceful shutdown timed out")))
(log/info "Stopped server gracefully"))

(defstate server
:start (start (routes/handler (:actor-routes (mount/args))))
Expand Down
58 changes: 29 additions & 29 deletions test/ziggurat/server/routes_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,29 @@

(deftest router-dead-set-channel-disabled-test
(let [stream-routes {:default {:handler-fn (fn [])
:channel-1 (fn [])}}]
:channel-1 (fn [])}}]
(with-redefs [retry-enabled? (fn [] false)]
(fix/with-start-server
stream-routes
(testing "should return 404 when /v1/dead_set/replay for channel is called and channel retry is disabled"
(with-redefs [channel-retry-enabled? (constantly false)]
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
{:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" params)
expected-body {:error "Retry is not enabled"}]
(is (= 404 status))
(is (= expected-body (json/decode body true))))))

(testing "should return 404 when /v1/dead_set for channel is called and channel retry is disabled"
(with-redefs [channel-retry-enabled? (constantly false)]
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true true {} params)
expected-body {:error "Retry is not enabled"}]
(is (= 404 status))
(is (= expected-body body)))))

(testing "should return 404 when delete /v1/dead_set for channel is called and channel retry is disabled"
(with-redefs [channel-retry-enabled? (constantly false)]
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
(let [params {:count "10" :topic-entity "default" :channel "channel-1"}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port) "/v1/dead_set" true true {} params)
expected-body {:error "Retry is not enabled"}]
(is (= 404 status))
Expand Down Expand Up @@ -77,33 +77,33 @@

(testing "should return 200 when /v1/dead_set/replay is called with valid count val"
(with-redefs [ds/replay (fn [_ _ _] nil)]
(let [count "10"
(let [count "10"
params {:count count :topic-entity "default"}
{:keys [status _]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" params)]
(is (= 200 status)))))

(testing "should return 400 when /v1/dead_set/replay is called with invalid count val"
(with-redefs [ds/replay (fn [_ _ _] nil)]
(let [count "invalid-val"
topic-entity "default"
(let [count "invalid-val"
topic-entity "default"
expected-body {:error "Count should be positive integer"}
{:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" {:count count :topic-entity topic-entity})]
(is (= 400 status))
(is (= expected-body (json/decode body true))))))

(testing "should return 400 when /v1/dead_set/replay is called with no topic entity"
(with-redefs [ds/replay (fn [_ _ _] nil)]
(let [count "10"
(let [count "10"
expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
{:keys [status body]} (tu/post (-> (ziggurat-config) :http-server :port) "/v1/dead_set/replay" {:count count})]
(is (= 400 status))
(is (= expected-body (json/decode body true))))))

(testing "should return 400 when get /v1/dead_set is called with invalid count val"
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [count "avasdas"
(let [count "avasdas"
topic-entity "default"
params {:count count :topic-name topic-entity}
params {:count count :topic-name topic-entity}
{:keys [status _]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -114,9 +114,9 @@

(testing "should return 400 when get /v1/dead_set is called with negative count val"
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [count "-10"
(let [count "-10"
topic-entity "default"
params {:count count :topic-name topic-entity}
params {:count count :topic-name topic-entity}
{:keys [status _]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -128,8 +128,8 @@
(testing "should return 400 when get /v1/dead_set is called without topic entity"
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
count "10"
params {:count count}
count "10"
params {:count count}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -143,8 +143,8 @@
(with-redefs [channel-retry-enabled? (constantly true)]
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
count "10"
params {:count count :topic-entity "default" :channel "invalid"}
count "10"
params {:count count :topic-entity "default" :channel "invalid"}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -158,8 +158,8 @@
(with-redefs [channel-retry-enabled? (constantly true)]
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [expected-body {:error "Count should be positive integer"}
count "-10"
params {:count count :topic-entity "default"}
count "-10"
params {:count count :topic-entity "default"}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -171,7 +171,7 @@

(testing "should return 200 when get /v1/dead_set is called with valid count val"
(with-redefs [ds/view (fn [_ _ _] {:foo "bar"})]
(let [count "10"
(let [count "10"
params {:count count :topic-entity "default"}
{:keys [status body]} (tu/get (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
Expand All @@ -184,7 +184,7 @@

(testing "should return 200 when delete /v1/dead_set is called with valid parameters"
(with-redefs [ds/delete (fn [_ _ _] {:foo "bar"})]
(let [count "10"
(let [count "10"
params {:count count :topic-entity "default"}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
Expand All @@ -196,7 +196,7 @@

(testing "should return 400 when delete /v1/dead_set is called with invalid count val"
(with-redefs [ds/delete (fn [_ _ _] {:foo "bar"})]
(let [count "-10"
(let [count "-10"
params {:count count :topic-entity "default"}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
Expand All @@ -209,9 +209,9 @@

(testing "should return 400 when delete /v1/dead_set is called with invalid count val"
(with-redefs [ds/delete (fn [_ _ _] nil)]
(let [count "avasdas"
(let [count "avasdas"
topic-entity "default"
params {:count count :topic-name topic-entity}
params {:count count :topic-name topic-entity}
{:keys [status _]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -222,9 +222,9 @@

(testing "should return 400 when delete /v1/dead_set is called with negative count val"
(with-redefs [ds/delete (fn [_ _ _] nil)]
(let [count "-10"
(let [count "-10"
topic-entity "default"
params {:count count :topic-name topic-entity}
params {:count count :topic-name topic-entity}
{:keys [status _]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -236,8 +236,8 @@
(testing "should return 400 when delete /v1/dead_set is called without topic entity"
(with-redefs [ds/delete (fn [_ _ _] nil)]
(let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
count "10"
params {:count count}
count "10"
params {:count count}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand All @@ -251,8 +251,8 @@
(with-redefs [channel-retry-enabled? (constantly true)]
(with-redefs [ds/view (fn [_ _ _] nil)]
(let [expected-body {:error "Topic entity/channel should be provided and must be present in stream routes"}
count "10"
params {:count count :topic-entity "default" :channel "invalid"}
count "10"
params {:count count :topic-entity "default" :channel "invalid"}
{:keys [status body]} (tu/delete (-> (ziggurat-config) :http-server :port)
"/v1/dead_set"
true
Expand Down
49 changes: 49 additions & 0 deletions test/ziggurat/server/shutdown_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
(ns ziggurat.server.shutdown-test
(:require [clojure.test :refer :all]
[mount.core :as mount]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.fixtures :as fix]
[ziggurat.server :refer [server]]
[ziggurat.server.test-utils :as tu])
(:import (org.eclipse.jetty.server Server)
(org.eclipse.jetty.server.handler StatisticsHandler)))

(deftest http-server-graceful-shutdown-test
(testing "server should process existing requests within 30000ms when it is stopped"
(with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 3000) {:body "pong"}))]
(fix/mount-config)
(mount/start [#'server])
(let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))]
;; 1000 is the minimum sleep required for the future to run
(Thread/sleep 1000)
(mount/stop [#'server])
(is (:body @http-fut) "pong")
(is (:status @http-fut) 200))))

(testing "server should discard new requests after the server is stopped, but should process the old request"
(with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 3000) {:body "pong"}))]
(fix/mount-config)
(mount/start [#'server])
(let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))]
;; 1000 is the minimum sleep required for the future to run
(Thread/sleep 1000)
(mount/stop [#'server])
(is (thrown? Exception (tu/get (-> (ziggurat-config) :http-server :port) "/ping" false false)))
(is (:body @http-fut) "pong")
(is (:status @http-fut) 200))))
(testing "server should stop after the graceful shutdown timeout and discard requests in progress"
(with-redefs [ziggurat.server.routes/handler (fn [_] (fn [_] (Thread/sleep 4000) {:body "pong"}))
ziggurat.server/configure-jetty (fn [^Server server]
(let [stats-handler (StatisticsHandler.)
default-handler (.getHandler server)]
(.setHandler stats-handler default-handler)
(.setHandler server stats-handler)
(.setStopTimeout server 2000)
(.setStopAtShutdown server true)))]
(fix/mount-config)
(mount/start [#'server])
(let [http-fut (future (tu/get (-> (ziggurat-config) :http-server :port) "/ping" true false))]
;; 1000 is the minimum sleep required for the future to run
(Thread/sleep 1000)
(mount/stop [#'server])
(is (thrown? Exception @http-fut))))))

0 comments on commit 61f27a2

Please sign in to comment.