-
Notifications
You must be signed in to change notification settings - Fork 64
/
init.clj
98 lines (86 loc) · 3.41 KB
/
init.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
(ns ziggurat.init
"Contains the entry point for your application."
(:require [clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[schema.core :as s]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.metrics :as metrics]
[ziggurat.messaging.connection :as messaging-connection]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
[ziggurat.nrepl-server :as nrepl-server]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]))
(defstate statsd-reporter
:start (metrics/start-statsd-reporter (:datadog (ziggurat-config))
(:env (ziggurat-config))
(:app-name (ziggurat-config)))
:stop (metrics/stop-statsd-reporter statsd-reporter))
(defn- start*
([states]
(start* states nil))
([states args]
(-> (mount/only states)
(mount/with-args args)
(mount/start))))
(defn start
"Starts up Ziggurat's config, actor fn, rabbitmq connection and then streams, server etc"
[actor-start-fn stream-routes actor-routes]
(start* #{#'config/config})
(actor-start-fn)
(start* #{#'messaging-connection/connection} {:stream-routes stream-routes})
(messaging-producer/make-queues stream-routes)
(messaging-consumer/start-subscribers stream-routes) ;; We want subscribers to start after creating queues on RabbitMQ.
(start* #{#'statsd-reporter
#'server/server
#'nrepl-server/server
#'streams/stream
#'sentry-reporter}
{:stream-routes stream-routes
:actor-routes actor-routes}))
(defn stop
"Calls the Ziggurat's state stop fns and then actor-stop-fn."
[actor-stop-fn]
(mount/stop #'config/config
#'statsd-reporter
#'messaging-connection/connection
#'server/server
#'nrepl-server/server
#'streams/stream)
(actor-stop-fn)
(mount/stop #'config/config))
(defn- add-shutdown-hook [actor-stop-fn]
(.addShutdownHook
(Runtime/getRuntime)
(Thread. ^Runnable #(do (stop actor-stop-fn)
(shutdown-agents))
"Shutdown-handler")))
(s/defschema StreamRoute
(s/conditional
#(and (seq %)
(map? %))
{s/Keyword {:handler-fn (s/pred #(fn? %))
s/Keyword (s/pred #(fn? %))}}))
(defn validate-stream-routes [stream-routes]
(s/validate StreamRoute stream-routes))
(defn main
"The entry point for your application.
Accepts stream-routes as a nested map keyed by the topic entities.
Each topic entity is a map with a handler-fn described. For eg.,
{:default {:handler-fn (fn [message] :success)}}
:handler-fn must return :success, :retry or :skip
start-fn takes no parameters, and will be run on application startup.
stop-fn takes no parameters, and will be run on application shutdown."
([start-fn stop-fn stream-routes]
(main start-fn stop-fn stream-routes []))
([start-fn stop-fn stream-routes actor-routes]
(try
(validate-stream-routes stream-routes)
(add-shutdown-hook stop-fn)
(start start-fn stream-routes actor-routes)
(catch Exception e
(log/error e)
(stop stop-fn)
(System/exit 1)))))