From 744d4766a7780e34829e512f36babfe37c459790 Mon Sep 17 00:00:00 2001 From: Michael Angelo Calimlim Date: Wed, 22 Sep 2021 11:36:40 +0800 Subject: [PATCH] change the order of the components when being stopped --- src/ziggurat/init.clj | 25 ++++++++++++++----------- test/ziggurat/init_test.clj | 33 ++++++++++++++++++++------------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index d1cb4151..ef9e6184 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -3,10 +3,13 @@ (:require [cambium.codec :as codec] [cambium.logback.json.flat-layout :as flat] [clojure.set :as set] + [clojure.string :as str] [clojure.tools.logging :as log] - [mount.core :as mount :refer [defstate]] + [mount.core :as mount] [schema.core :as s] [ziggurat.config :refer [ziggurat-config] :as config] + [ziggurat.kafka-consumer.consumer-driver :as consumer-driver] + [ziggurat.kafka-consumer.executor-service :as executor-service] [ziggurat.messaging.connection :as messaging-connection :refer [connection]] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] @@ -17,9 +20,7 @@ [ziggurat.server :as server] [ziggurat.streams :as streams] [ziggurat.tracer :as tracer] - [ziggurat.util.java-util :as util] - [ziggurat.kafka-consumer.executor-service :as executor-service] - [ziggurat.kafka-consumer.consumer-driver :as consumer-driver]) + [ziggurat.util.java-util :as util]) (:gen-class :methods [^{:static true} [init [java.util.Map] void]] :name tech.gojek.ziggurat.internal.Init)) @@ -47,7 +48,7 @@ (messaging-producer/make-queues (event-routes args))) (defn- set-properties-for-structured-logging [] - (if (= (:log-format (ziggurat-config)) "json") + (when (= (:log-format (ziggurat-config)) "json") (flat/set-decoder! codec/destringify-val))) (defn start-kafka-producers [] @@ -166,9 +167,9 @@ (defn stop "Calls the Ziggurat's state stop fns and then actor-stop-fn." [actor-stop-fn modes] + (execute-function modes :stop-fn) (actor-stop-fn) - (stop-common-states) - (execute-function modes :stop-fn)) + (stop-common-states)) (defn- add-shutdown-hook [actor-stop-fn modes] (.addShutdownHook @@ -186,6 +187,8 @@ {s/Keyword {:handler-fn (s/pred #(fn? %)) s/Keyword (s/pred #(fn? %))}})) +(declare BatchRoute) + (s/defschema BatchRoute (s/conditional #(and (seq %) @@ -206,7 +209,7 @@ (throw (IllegalArgumentException. (format "Error! Route %s isn't present in the %s config" topic-entity route-type))) (when-not (set/subset? channels config-channels) (let [diff (set/difference channels config-channels)] - (throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (clojure.string/join "," diff) route-type)))))))))) + (throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (str/join "," diff) route-type)))))))))) (defn validate-routes [stream-routes batch-routes modes] (when (contains? (set modes) :stream-worker) @@ -218,12 +221,12 @@ (defn- derive-modes [stream-routes batch-routes actor-routes] (let [base-modes [:management-api :worker]] - (if (and (nil? stream-routes) (nil? batch-routes)) + (when (and (nil? stream-routes) (nil? batch-routes)) (throw (IllegalArgumentException. "Either :stream-routes or :batch-routes should be present in init args"))) (cond-> base-modes (some? stream-routes) (conj :stream-worker) - (some? batch-routes) (conj :batch-worker) - (some? actor-routes) (conj :api-server)))) + (some? batch-routes) (conj :batch-worker) + (some? actor-routes) (conj :api-server)))) (defn validate-modes [modes stream-routes batch-routes actor-routes] (let [derived-modes (if-not (empty? modes) diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index 4ff482e2..5fde85c7 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -1,20 +1,17 @@ (ns ziggurat.init-test - (:require [clojure.test :refer :all] - [mount.core :refer [defstate] :as mount] + (:require [clojure.test :refer [deftest is testing]] + [mount.core :as mount] [ziggurat.config :as config] [ziggurat.init :as init] [ziggurat.messaging.connection :as rmqc] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] - [ziggurat.streams :as streams :refer [stream]] + [ziggurat.streams :as streams] [ziggurat.server.test-utils :as tu] [ziggurat.tracer :as tracer] [ziggurat.fixtures :refer [with-config]] [cambium.logback.json.flat-layout :as flat] - [cambium.codec :as codec] - [cambium.core :as clog] - [clojure.tools.logging :as log]) - + [cambium.codec :as codec]) (:import (io.opentracing.mock MockTracer))) (def valid-modes-count 4) @@ -25,8 +22,7 @@ (deftest start-calls-actor-start-fn-test (testing "The actor start fn starts before the ziggurat state and can read config" - (let [result (atom 1) - start-messaging-internal-call-count 2] + (let [result (atom 1)] (with-redefs [streams/start-streams (fn [_ _] (reset! result (* @result 2))) streams/stop-streams (constantly nil) ;; will be called valid modes number of times @@ -47,12 +43,11 @@ (with-config (do (init/start #() {} {} [] nil) (init/stop #(reset! result (+ @result 3)) nil) - (is (= 8 @result)))))))) + (is (= 5 @result)))))))) (deftest stop-calls-idempotentcy-test (testing "The stop function should be idempotent" - (let [result (atom 1) - stop-connection-internal-call-count 1] + (let [result (atom 1)] (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) rmqc/stop-connection (fn [_] (reset! result (* @result 2))) @@ -60,7 +55,7 @@ (with-config (do (init/start #() {} {} [] nil) (init/stop #(reset! result (+ @result 3)) nil) - (is (= 8 @result)))))))) + (is (= 5 @result)))))))) (deftest start-calls-make-queues-with-both-streams-and-batch-routes-test (testing "Start calls make queues with both streams and batch routes" @@ -327,3 +322,15 @@ :channel-2 #()}}] (is (thrown? IllegalArgumentException (init/validate-routes stream-routes batch-routes modes))))))))) +(deftest stop-test + (testing "the following components execute-function -> actor-stop-fn -> stop-common-states should be stopped" + (let [is-execute-function-called? (atom false) + is-actor-stop-fn-called? (atom false) + is-stop-common-states-called? (atom false) + actor-stop-fn (fn [] (reset! is-actor-stop-fn-called? true))] + (with-redefs [init/execute-function (fn [_ _] (reset! is-execute-function-called? true)) + init/stop-common-states (fn [] (reset! is-stop-common-states-called? true))] + (init/stop actor-stop-fn {}) + (is (true? @is-execute-function-called?)) + (is (true? @is-actor-stop-fn-called?)) + (is (true? @is-stop-common-states-called?))))))