Skip to content

Commit

Permalink
change the order of the components when being stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Sep 22, 2021
1 parent 7b95b30 commit dbef2ac
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
(:require [cambium.codec :as codec]
[cambium.logback.json.flat-layout :as flat]
[clojure.set :as set]
[clojure.string :as str]
[clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[mount.core :as mount]
[schema.core :as s]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.kafka-consumer.consumer-driver :as consumer-driver]
[ziggurat.kafka-consumer.executor-service :as executor-service]
[ziggurat.messaging.connection :as messaging-connection :refer [connection]]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
Expand All @@ -17,9 +20,7 @@
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.tracer :as tracer]
[ziggurat.util.java-util :as util]
[ziggurat.kafka-consumer.executor-service :as executor-service]
[ziggurat.kafka-consumer.consumer-driver :as consumer-driver])
[ziggurat.util.java-util :as util])
(:gen-class
:methods [^{:static true} [init [java.util.Map] void]]
:name tech.gojek.ziggurat.internal.Init))
Expand Down Expand Up @@ -47,7 +48,7 @@
(messaging-producer/make-queues (event-routes args)))

(defn- set-properties-for-structured-logging []
(if (= (:log-format (ziggurat-config)) "json")
(when (= (:log-format (ziggurat-config)) "json")
(flat/set-decoder! codec/destringify-val)))

(defn start-kafka-producers []
Expand Down Expand Up @@ -186,6 +187,8 @@
{s/Keyword {:handler-fn (s/pred #(fn? %))
s/Keyword (s/pred #(fn? %))}}))

(declare BatchRoute)

(s/defschema BatchRoute
(s/conditional
#(and (seq %)
Expand All @@ -206,7 +209,7 @@
(throw (IllegalArgumentException. (format "Error! Route %s isn't present in the %s config" topic-entity route-type)))
(when-not (set/subset? channels config-channels)
(let [diff (set/difference channels config-channels)]
(throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (clojure.string/join "," diff) route-type))))))))))
(throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (str/join "," diff) route-type))))))))))

(defn validate-routes [stream-routes batch-routes modes]
(when (contains? (set modes) :stream-worker)
Expand All @@ -218,12 +221,12 @@

(defn- derive-modes [stream-routes batch-routes actor-routes]
(let [base-modes [:management-api :worker]]
(if (and (nil? stream-routes) (nil? batch-routes))
(when (and (nil? stream-routes) (nil? batch-routes))
(throw (IllegalArgumentException. "Either :stream-routes or :batch-routes should be present in init args")))
(cond-> base-modes
(some? stream-routes) (conj :stream-worker)
(some? batch-routes) (conj :batch-worker)
(some? actor-routes) (conj :api-server))))
(some? batch-routes) (conj :batch-worker)
(some? actor-routes) (conj :api-server))))

(defn validate-modes [modes stream-routes batch-routes actor-routes]
(let [derived-modes (if-not (empty? modes)
Expand Down

0 comments on commit dbef2ac

Please sign in to comment.