Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running ziggurat in different modes #46

Merged
merged 10 commits into from
May 30, 2019
129 changes: 102 additions & 27 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
(:require [clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[schema.core :as s]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.metrics :as metrics]
[ziggurat.messaging.connection :as messaging-connection]
Expand All @@ -28,38 +27,101 @@
(mount/with-args args)
(mount/start))))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
[actor-start-fn stream-routes actor-routes]
(defn- start-rabbitmq-connection [args]
(start* #{#'messaging-connection/connection} args))

(defn- start-rabbitmq-consumers [args]
(start-rabbitmq-connection args)
(messaging-consumer/start-subscribers (get args :stream-routes)))

(defn- start-rabbitmq-producers [args]
(start-rabbitmq-connection args)
(messaging-producer/make-queues (get args :stream-routes)))

(defn start-stream [args]
(start-rabbitmq-producers args)
(start* #{#'streams/stream} args))

(defn start-management-apis [args]
(start-rabbitmq-connection args)
(start* #{#'server/server} (dissoc args :actor-routes)))

(defn start-server [args]
(start-rabbitmq-connection args)
(start* #{#'server/server} args))

(defn start-workers [args]
(start-rabbitmq-producers args)
(start-rabbitmq-consumers args))

(defn- stop-rabbitmq-connection []
(mount/stop #'messaging-connection/connection))

(defn stop-workers []
(stop-rabbitmq-connection))

(defn stop-server []
(mount/stop #'server/server)
(stop-rabbitmq-connection))

(defn stop-stream []
(mount/stop #'streams/stream)
(stop-rabbitmq-connection))

(defn stop-management-apis []
(mount/stop #'server/server)
(stop-rabbitmq-connection))

(def valid-modes-fns
kartik7153 marked this conversation as resolved.
Show resolved Hide resolved
{"api-server" [start-server stop-server]
kartik7153 marked this conversation as resolved.
Show resolved Hide resolved
"stream-worker" [start-stream stop-stream]
"worker" [start-workers stop-workers]
"management-api" [start-management-apis stop-management-apis]})

(defn- execute-function
([modes fnk]
(execute-function modes fnk nil))
([modes fnk args]
(doseq [mode (-> modes
(or (keys valid-modes-fns))
sort)]
(if (nil? args)
((fnk (get valid-modes-fns mode)))
((fnk (get valid-modes-fns mode)) args)))))

(defn start-common-states []
(start* #{#'config/config
#'statsd-reporter
#'sentry-reporter})
(actor-start-fn)
(start* #{#'messaging-connection/connection} {:stream-routes stream-routes})
(messaging-producer/make-queues stream-routes)
(messaging-consumer/start-subscribers stream-routes) ;; We want subscribers to start after creating queues on RabbitMQ.
(start* #{#'server/server
#'nrepl-server/server
#'streams/stream}
{:stream-routes stream-routes
:actor-routes actor-routes}))
#'sentry-reporter
#'nrepl-server/server}))

(defn stop
"Calls the Ziggurat's state stop fns and then actor-stop-fn."
[actor-stop-fn]
(defn stop-common-states []
(mount/stop #'config/config
#'statsd-reporter
#'messaging-connection/connection
#'server/server
#'nrepl-server/server
#'streams/stream)
(actor-stop-fn)
(mount/stop #'config/config))
#'nrepl-server/server))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
[actor-start-fn stream-routes actor-routes modes]
(start-common-states)
(actor-start-fn)
(execute-function modes
first
kartik7153 marked this conversation as resolved.
Show resolved Hide resolved
{:actor-routes actor-routes
:stream-routes stream-routes}))

(defn- add-shutdown-hook [actor-stop-fn]
(defn stop
"Calls the Ziggurat's state stop fns and then actor-stop-fn."
[actor-stop-fn modes]
(stop-common-states)
(execute-function modes second)
(actor-stop-fn))

(defn- add-shutdown-hook [actor-stop-fn modes]
(.addShutdownHook
(Runtime/getRuntime)
(Thread. ^Runnable #(do (stop actor-stop-fn)
(Thread. ^Runnable #(do (stop actor-stop-fn modes)
(shutdown-agents))
"Shutdown-handler")))

Expand All @@ -73,6 +135,13 @@
(defn validate-stream-routes [stream-routes]
(s/validate StreamRoute stream-routes))

(defn validate-modes [modes]
(let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes)
invalid-modes-count (count invalid-modes)]
(when (pos? invalid-modes-count)
(throw (IllegalArgumentException. ^String (reduce (fn [acc mode]
kartik7153 marked this conversation as resolved.
Show resolved Hide resolved
(str acc " " mode)) "Invalid modes passed:" invalid-modes))))))

(defn main
"The entry point for your application.

Expand All @@ -88,11 +157,17 @@
([start-fn stop-fn stream-routes]
(main start-fn stop-fn stream-routes []))
([start-fn stop-fn stream-routes actor-routes]
(main {:start-fn start-fn :stop-fn stop-fn :stream-routes stream-routes :actor-routes actor-routes}))
([{:keys [start-fn stop-fn stream-routes actor-routes modes]}]
(try
(validate-modes modes)
(validate-stream-routes stream-routes)
(add-shutdown-hook stop-fn)
(start start-fn stream-routes actor-routes)
(add-shutdown-hook stop-fn modes)
(start start-fn stream-routes actor-routes modes)
(catch IllegalArgumentException e
(log/error e)
(System/exit 1))
(catch Exception e
(log/error e)
(stop stop-fn)
(stop stop-fn modes)
(System/exit 1)))))
8 changes: 4 additions & 4 deletions src/ziggurat/resource/dead_set.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
(ns ziggurat.resource.dead-set
(:require [schema.core :as s]
[clojure.tools.logging :as log]
(:require [clojure.tools.logging :as log]
[mount.core :as mount]
[schema.core :as s]
[ziggurat.config :refer [get-in-config channel-retry-config]]
[ziggurat.messaging.dead-set :as r]
[mount.core :as mount]))
[ziggurat.messaging.dead-set :as r]))
(def not-found-for-retry
{:status 404
:body {:error "Retry is not enabled"}})
Expand Down
1 change: 0 additions & 1 deletion src/ziggurat/retry.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns ziggurat.retry
(:require [clojure.set :as set]
[sentry-clj.async :as sentry]
[ziggurat.sentry :refer [sentry-reporter]]))

(def default-wait 100)
Expand Down
2 changes: 1 addition & 1 deletion src/ziggurat/server/middleware.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

(defn wrap-hyphenate [handler & args]
(fn [request]
(let [{:keys [skip-hyphenation] :as response}
(let [response
(handler (update request
:params
#(umap/nested-map-keys (fn [k] (apply csk/->kebab-case-keyword k args)) %)))]
Expand Down
80 changes: 55 additions & 25 deletions test/ziggurat/init_test.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns ziggurat.init-test
(:require [clojure.test :refer :all]
[clojure.tools.logging :as log]
[ziggurat.config :as config]
[ziggurat.init :as init]
[ziggurat.messaging.connection :as rmqc]
Expand All @@ -17,8 +16,8 @@
rmqc/start-connection (fn [] (reset! result (* @result 2)))
rmqc/stop-connection (constantly nil)
config/config-file "config.test.edn"]
(init/start #(reset! result (+ @result 3)) {} [])
(init/stop #())
(init/start #(reset! result (+ @result 3)) {} [] nil)
(init/stop #() nil)
(is (= 16 @result))))))

(deftest stop-calls-actor-stop-fn-test
Expand All @@ -27,46 +26,57 @@
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (fn [_] (reset! result (* @result 2)))
config/config-file "config.test.edn"]
(init/start #() {} [])
(init/stop #(reset! result (+ @result 3)))
(init/start #() {} [] nil)
(init/stop #(reset! result (+ @result 3)) nil)
(is (= 5 @result))))))

(deftest stop-calls-idempotentcy-test
(testing "The stop function should be idempotent"
(let [result (atom 1)]
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
rmqc/stop-connection (fn [_] (reset! result (* @result 2)))
config/config-file "config.test.edn"]
(init/start #() {} [] nil)
(init/stop #(reset! result (+ @result 3)) nil)
(is (= 5 @result))))))

(deftest start-calls-make-queues-test
(testing "Start calls make queues"
(let [make-queues-called (atom false)
(let [make-queues-called (atom 0)
expected-stream-routes {:default {:handler-fn #()}}]
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
messaging-producer/make-queues (fn [stream-routes]
(swap! make-queues-called not)
(swap! make-queues-called + 1)
(is (= stream-routes expected-stream-routes)))
messaging-consumer/start-subscribers (constantly nil)
config/config-file "config.test.edn"]
(init/start #() expected-stream-routes [])
(init/stop #())
(is @make-queues-called)))))
(init/start #() expected-stream-routes [] nil)
(init/stop #() nil)
(is (= 2 @make-queues-called))))))

(deftest start-calls-start-subscribers-test
(testing "Start calls start subscribers"
(let [start-subscriber-called (atom false)
(let [start-subscriber-called (atom 0)
expected-stream-routes {:default {:handler-fn #()}}]
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
messaging-consumer/start-subscribers (fn [stream-routes]
(swap! start-subscriber-called not)
(swap! start-subscriber-called + 1)
(is (= stream-routes expected-stream-routes)))
messaging-producer/make-queues (constantly nil)
config/config-file "config.test.edn"]
(init/start #() expected-stream-routes [])
(init/stop #())
(is @start-subscriber-called)))))
(init/start #() expected-stream-routes [] nil)
(init/stop #() nil)
(is (= 1 @start-subscriber-called))))))

(deftest main-test
(testing "Main function should call start"
(let [start-was-called (atom false)
expected-stream-routes {:default {:handler-fn #(constantly nil)}}]
(with-redefs [init/add-shutdown-hook (fn [_] (constantly nil))
init/start (fn [_ stream-router _]
(with-redefs [init/add-shutdown-hook (fn [_ _] (constantly nil))
init/start (fn [_ stream-router _ _]
(swap! start-was-called not)
(is (= expected-stream-routes stream-router)))]
(init/main #() #() expected-stream-routes)
Expand Down Expand Up @@ -101,28 +111,48 @@
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
(init/start #() {} [["test-ping" (fn [_request] {:status 200
:body "pong"})]])
:body "pong"})]] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)
status-actor status
{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #() nil)
(is (= 200 status-actor))
(is (= 200 status)))))

(testing "Deadset management and server api modes should run both actor and deadset management routes"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
(init/start #() {} [["test-ping" (fn [_request] {:status 200
:body "pong"})]] ["management-api" "api-server"])
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)
status-actor status
{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #())
(init/stop #() nil)
(is (= 200 status-actor))
(is (= 200 status)))))

(testing "The routes not added by actor should return 404"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
(init/start #() {} [])
(let [{:keys [status body] :as response} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)]
(init/stop #())
(init/start #() {} [] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)]
(init/stop #() nil)
(is (= 404 status)))))

(testing "The ziggurat routes should work fine when actor routes are not provided"
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
config/config-file "config.test.edn"]
(init/start #() {} [])
(let [{:keys [status body] :as response} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #())
(init/start #() {} [] nil)
(let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)]
(init/stop #() nil)
(is (= 200 status))))))

(deftest validate-modes-test
(let [exception-message "Invalid modes passed"]
(testing "Validate modes should raise exception if modes have any invalid element"
(let [modes ["invalid-modes" "api-server" "second-invalid"]]
(is (thrown-with-msg? IllegalArgumentException #"invalid-modes second-invalid" (init/validate-modes modes)))))))