diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index d1cb4151..603adf64 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -3,10 +3,13 @@ (:require [cambium.codec :as codec] [cambium.logback.json.flat-layout :as flat] [clojure.set :as set] + [clojure.string :as str] [clojure.tools.logging :as log] - [mount.core :as mount :refer [defstate]] + [mount.core :as mount] [schema.core :as s] [ziggurat.config :refer [ziggurat-config] :as config] + [ziggurat.kafka-consumer.consumer-driver :as consumer-driver] + [ziggurat.kafka-consumer.executor-service :as executor-service] [ziggurat.messaging.connection :as messaging-connection :refer [connection]] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] @@ -17,9 +20,7 @@ [ziggurat.server :as server] [ziggurat.streams :as streams] [ziggurat.tracer :as tracer] - [ziggurat.util.java-util :as util] - [ziggurat.kafka-consumer.executor-service :as executor-service] - [ziggurat.kafka-consumer.consumer-driver :as consumer-driver]) + [ziggurat.util.java-util :as util]) (:gen-class :methods [^{:static true} [init [java.util.Map] void]] :name tech.gojek.ziggurat.internal.Init)) @@ -47,7 +48,7 @@ (messaging-producer/make-queues (event-routes args))) (defn- set-properties-for-structured-logging [] - (if (= (:log-format (ziggurat-config)) "json") + (when (= (:log-format (ziggurat-config)) "json") (flat/set-decoder! codec/destringify-val))) (defn start-kafka-producers [] @@ -186,6 +187,8 @@ {s/Keyword {:handler-fn (s/pred #(fn? %)) s/Keyword (s/pred #(fn? %))}})) +(declare BatchRoute) + (s/defschema BatchRoute (s/conditional #(and (seq %) @@ -206,7 +209,7 @@ (throw (IllegalArgumentException. (format "Error! Route %s isn't present in the %s config" topic-entity route-type))) (when-not (set/subset? channels config-channels) (let [diff (set/difference channels config-channels)] - (throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (clojure.string/join "," diff) route-type)))))))))) + (throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (str/join "," diff) route-type)))))))))) (defn validate-routes [stream-routes batch-routes modes] (when (contains? (set modes) :stream-worker) @@ -218,12 +221,12 @@ (defn- derive-modes [stream-routes batch-routes actor-routes] (let [base-modes [:management-api :worker]] - (if (and (nil? stream-routes) (nil? batch-routes)) + (when (and (nil? stream-routes) (nil? batch-routes)) (throw (IllegalArgumentException. "Either :stream-routes or :batch-routes should be present in init args"))) (cond-> base-modes (some? stream-routes) (conj :stream-worker) - (some? batch-routes) (conj :batch-worker) - (some? actor-routes) (conj :api-server)))) + (some? batch-routes) (conj :batch-worker) + (some? actor-routes) (conj :api-server)))) (defn validate-modes [modes stream-routes batch-routes actor-routes] (let [derived-modes (if-not (empty? modes)