Skip to content

Commit

Permalink
Running ziggurat in different modes (#46)
Browse files Browse the repository at this point in the history
* Extracts out states per mode

* Refactors initialize of states

Co-authored-by: Saptanto <saptanto.sindu@gmail.com>

* Starts and stops the state according to modes passed in main

Co-authored-by: Saptanto <saptanto.sindu@gmail.com>

* Refactors validates modes

Co-authored-by: Saptanto <saptanto.sindu@gmail.com>

* Makes sure that api-server is called management-api-server

Co-authored-by: Saptanto <saptanto.sindu@gmail.com>

* Throws a proper exception when invalid mode is passed

Co-authored-by: Saptanto <saptanto.sindu@gmail.com>

* Removed unused code and imports

Co-authored-by: Saptanto <saptanto.sindu@gmail.com>

* Makes start fn and stop fn vector as a map

* Uses ex-info instead of IllegalArgumentException

* Converts string literals to keywords for modes
  • Loading branch information
kartik7153 committed May 30, 2019
1 parent 2781286 commit b20e46d
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 58 deletions.
128 changes: 101 additions & 27 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
(:require [clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[schema.core :as s]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.metrics :as metrics]
[ziggurat.messaging.connection :as messaging-connection]
Expand All @@ -28,38 +27,101 @@
(mount/with-args args)
(mount/start))))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
[actor-start-fn stream-routes actor-routes]
(defn- start-rabbitmq-connection [args]
(start* #{#'messaging-connection/connection} args))

(defn- start-rabbitmq-consumers [args]
(start-rabbitmq-connection args)
(messaging-consumer/start-subscribers (get args :stream-routes)))

(defn- start-rabbitmq-producers [args]
(start-rabbitmq-connection args)
(messaging-producer/make-queues (get args :stream-routes)))

(defn start-stream [args]
(start-rabbitmq-producers args)
(start* #{#'streams/stream} args))

(defn start-management-apis [args]
(start-rabbitmq-connection args)
(start* #{#'server/server} (dissoc args :actor-routes)))

(defn start-server [args]
(start-rabbitmq-connection args)
(start* #{#'server/server} args))

(defn start-workers [args]
(start-rabbitmq-producers args)
(start-rabbitmq-consumers args))

(defn- stop-rabbitmq-connection []
(mount/stop #'messaging-connection/connection))

(defn stop-workers []
(stop-rabbitmq-connection))

(defn stop-server []
(mount/stop #'server/server)
(stop-rabbitmq-connection))

(defn stop-stream []
(mount/stop #'streams/stream)
(stop-rabbitmq-connection))

(defn stop-management-apis []
(mount/stop #'server/server)
(stop-rabbitmq-connection))

(def valid-modes-fns
{:api-server {:start-fn start-server :stop-fn stop-server}
:stream-worker {:start-fn start-stream :stop-fn stop-stream}
:worker {:start-fn start-workers :stop-fn stop-workers}
:management-api {:start-fn start-management-apis :stop-fn stop-management-apis}})

(defn- execute-function
([modes fnk]
(execute-function modes fnk nil))
([modes fnk args]
(doseq [mode (-> modes
(or (keys valid-modes-fns))
sort)]
(if (nil? args)
((fnk (get valid-modes-fns mode)))
((fnk (get valid-modes-fns mode)) args)))))

(defn start-common-states []
(start* #{#'config/config
#'statsd-reporter
#'sentry-reporter})
(actor-start-fn)
(start* #{#'messaging-connection/connection} {:stream-routes stream-routes})
(messaging-producer/make-queues stream-routes)
(messaging-consumer/start-subscribers stream-routes) ;; We want subscribers to start after creating queues on RabbitMQ.
(start* #{#'server/server
#'nrepl-server/server
#'streams/stream}
{:stream-routes stream-routes
:actor-routes actor-routes}))
#'sentry-reporter
#'nrepl-server/server}))

(defn stop
"Calls the Ziggurat's state stop fns and then actor-stop-fn."
[actor-stop-fn]
(defn stop-common-states []
(mount/stop #'config/config
#'statsd-reporter
#'messaging-connection/connection
#'server/server
#'nrepl-server/server
#'streams/stream)
(actor-stop-fn)
(mount/stop #'config/config))
#'nrepl-server/server))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
[actor-start-fn stream-routes actor-routes modes]
(start-common-states)
(actor-start-fn)
(execute-function modes
:start-fn
{:actor-routes actor-routes
:stream-routes stream-routes}))

(defn- add-shutdown-hook [actor-stop-fn]
(defn stop
"Calls the Ziggurat's state stop fns and then actor-stop-fn."
[actor-stop-fn modes]
(stop-common-states)
(execute-function modes :stop-fn)
(actor-stop-fn))

(defn- add-shutdown-hook [actor-stop-fn modes]
(.addShutdownHook
(Runtime/getRuntime)
(Thread. ^Runnable #(do (stop actor-stop-fn)
(Thread. ^Runnable #(do (stop actor-stop-fn modes)
(shutdown-agents))
"Shutdown-handler")))

Expand All @@ -73,6 +135,12 @@
(defn validate-stream-routes [stream-routes]
(s/validate StreamRoute stream-routes))

(defn validate-modes [modes]
(let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes)
invalid-modes-count (count invalid-modes)]
(when (pos? invalid-modes-count)
(throw (ex-info "Wrong modes arguement passed - " {:invalid-modes invalid-modes})))))

(defn main
"The entry point for your application.
Expand All @@ -88,11 +156,17 @@
([start-fn stop-fn stream-routes]
(main start-fn stop-fn stream-routes []))
([start-fn stop-fn stream-routes actor-routes]
(main {:start-fn start-fn :stop-fn stop-fn :stream-routes stream-routes :actor-routes actor-routes}))
([{:keys [start-fn stop-fn stream-routes actor-routes modes]}]
(try
(validate-modes modes)
(validate-stream-routes stream-routes)
(add-shutdown-hook stop-fn)
(start start-fn stream-routes actor-routes)
(add-shutdown-hook stop-fn modes)
(start start-fn stream-routes actor-routes modes)
(catch clojure.lang.ExceptionInfo e
(log/error e)
(System/exit 1))
(catch Exception e
(log/error e)
(stop stop-fn)
(stop stop-fn modes)
(System/exit 1)))))
8 changes: 4 additions & 4 deletions src/ziggurat/resource/dead_set.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
(ns ziggurat.resource.dead-set
(:require [schema.core :as s]
[clojure.tools.logging :as log]
(:require [clojure.tools.logging :as log]
[mount.core :as mount]
[schema.core :as s]
[ziggurat.config :refer [get-in-config channel-retry-config]]
[ziggurat.messaging.dead-set :as r]
[mount.core :as mount]))
[ziggurat.messaging.dead-set :as r]))
(def not-found-for-retry
{:status 404
:body {:error "Retry is not enabled"}})
Expand Down
1 change: 0 additions & 1 deletion src/ziggurat/retry.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns ziggurat.retry
(:require [clojure.set :as set]
[sentry-clj.async :as sentry]
[ziggurat.sentry :refer [sentry-reporter]]))

(def default-wait 100)
Expand Down
2 changes: 1 addition & 1 deletion src/ziggurat/server/middleware.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

(defn wrap-hyphenate [handler & args]
(fn [request]
(let [{:keys [skip-hyphenation] :as response}
(let [response
(handler (update request
:params
#(umap/nested-map-keys (fn [k] (apply csk/->kebab-case-keyword k args)) %)))]
Expand Down
79 changes: 54 additions & 25 deletions test/ziggurat/init_test.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns ziggurat.init-test
(:require [clojure.test :refer :all]
[clojure.tools.logging :as log]
[ziggurat.config :as config]
[ziggurat.init :as init]
[ziggurat.messaging.connection :as rmqc]
Expand All @@ -17,8 +16,8 @@
rmqc/start-connection (fn [] (reset! result (* @result 2)))
rmqc/stop-connection (constantly nil)
config/config-file "config.test.edn"]
(init/start #(reset! result (+ @result 3)) {} [])
(init/stop #())
(init/start #(reset! result (+ @result 3)) {} [] nil)
(init/stop #() nil)
(is (= 16 @result))))))

(deftest stop-calls-actor-stop-fn-test
Expand All @@ -27,46 +26,57 @@
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (fn [_] (reset! result (* @result 2)))
config/config-file "config.test.edn"]
(init/start #() {} [])
(init/stop #(reset! result (+ @result 3)))
(init/start #() {} [] nil)
(init/stop #(reset! result (+ @result 3)) nil)
(is (= 5 @result))))))

(deftest stop-calls-idempotentcy-test
(testing "The stop function should be idempotent"
(let [result (atom 1)]
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
rmqc/stop-connection (fn [_] (reset! result (* @result 2)))
config/config-file "config.test.edn"]
(init/start #() {} [] nil)
(init/stop #(reset! result (+ @result 3)) nil)
(is (= 5 @result))))))

(deftest start-calls-make-queues-test
(testing "Start calls make queues"
(let [make-queues-called (atom false)
(let [make-queues-called (atom 0)
expected-stream-routes {:default {:handler-fn #()}}]
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
messaging-producer/make-queues (fn [stream-routes]
(swap! make-queues-called not)
(swap! make-queues-called + 1)
(is (= stream-routes expected-stream-routes)))
messaging-consumer/start-subscribers (constantly nil)
config/config-file "config.test.edn"]
(init/start #() expected-stream-routes [])
(init/stop #())
(is @make-queues-called)))))
(init/start #() expected-stream-routes [] nil)
(init/stop #() nil)
(is (= 2 @make-queues-called))))))

(deftest start-calls-start-subscribers-test
(testing "Start calls start subscribers"
(let [start-subscriber-called (atom false)
(let [start-subscriber-called (atom 0)
expected-stream-routes {:default {:handler-fn #()}}]
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
messaging-consumer/start-subscribers (fn [stream-routes]
(swap! start-subscriber-called not)
(swap! start-subscriber-called + 1)
(is (= stream-routes expected-stream-routes)))
messaging-producer/make-queues (constantly nil)
config/config-file "config.test.edn"]
(init/start #() expected-stream-routes [])
(init/stop #())
(is @start-subscriber-called)))))
(init/start #() expected-stream-routes [] nil)
(init/stop #() nil)
(is (= 1 @start-subscriber-called))))))

(deftest main-test
(testing "Main function should call start"
(let [start-was-called (atom false)
expected-stream-routes {:default {:handler-fn #(constantly nil)}}]
(with-redefs [init/add-shutdown-hook (fn [_] (constantly nil))
init/start (fn [_ stream-router _]
(with-redefs [init/add-shutdown-hook (fn [_ _] (constantly nil))
init/start (fn [_ stream-router _ _]
(swap! start-was-called not)
(is (= expected-stream-routes stream-router)))]
(init/main #() #() expected-stream-routes)
Expand Down Expand Up @@ -101,28 +111,47 @@
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
(init/start #() {} [["test-ping" (fn [_request] {:status 200
:body "pong"})]])
:body "pong"})]] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)
status-actor status
{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #() nil)
(is (= 200 status-actor))
(is (= 200 status)))))

(testing "Deadset management and server api modes should run both actor and deadset management routes"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
(init/start #() {} [["test-ping" (fn [_request] {:status 200
:body "pong"})]] [:management-api :api-server])
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)
status-actor status
{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #())
(init/stop #() nil)
(is (= 200 status-actor))
(is (= 200 status)))))

(testing "The routes not added by actor should return 404"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
(init/start #() {} [])
(let [{:keys [status body] :as response} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)]
(init/stop #())
(init/start #() {} [] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)]
(init/stop #() nil)
(is (= 404 status)))))

(testing "The ziggurat routes should work fine when actor routes are not provided"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
(init/start #() {} [])
(let [{:keys [status body] :as response} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #())
(init/start #() {} [] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #() nil)
(is (= 200 status))))))

(deftest validate-modes-test
(testing "Validate modes should raise exception if modes have any invalid element"
(let [modes [:invalid-modes :api-server :second-invalid]]
(is (thrown? clojure.lang.ExceptionInfo (init/validate-modes modes))))))

0 comments on commit b20e46d

Please sign in to comment.