diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index c825690b..9a9457a0 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -4,7 +4,7 @@ [mount.core :as mount :refer [defstate]] [schema.core :as s] [ziggurat.config :refer [ziggurat-config] :as config] - [ziggurat.messaging.rabbitmq-wrapper :as rmqw] + [ziggurat.messaging.messaging :as messaging] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] [ziggurat.metrics :as metrics] @@ -27,15 +27,15 @@ (mount/with-args args) (mount/start)))) -(defn- start-rabbitmq-connection [args] - (start* #{#'rmqw/connection} args)) +(defn- start-messaging-connection [args] + (messaging/start-connection config/config (:stream-routes args))) -(defn- start-rabbitmq-consumers [args] - (start-rabbitmq-connection args) +(defn- start-messaging-consumers [args] + (start-messaging-connection args) (messaging-consumer/start-subscribers (get args :stream-routes) (ziggurat-config))) -(defn- start-rabbitmq-producers [args] - (start-rabbitmq-connection args) +(defn- start-messaging-producers [args] + (start-messaging-connection args) (messaging-producer/make-queues (get args :stream-routes))) (defn start-kafka-producers [] @@ -46,24 +46,24 @@ (defn start-stream [args] (start-kafka-producers) - (start-rabbitmq-producers args) + (start-messaging-producers args) (start-kafka-streams args)) (defn start-management-apis [args] - (start-rabbitmq-connection args) + (start-messaging-connection args) (start* #{#'server/server} (dissoc args :actor-routes))) (defn start-server [args] - (start-rabbitmq-connection args) + (start-messaging-connection args) (start* #{#'server/server} args)) (defn start-workers [args] (start-kafka-producers) - (start-rabbitmq-producers args) - (start-rabbitmq-consumers args)) + (start-messaging-producers args) + (start-messaging-consumers args)) -(defn- stop-rabbitmq-connection [] - (mount/stop #'rmqw/connection)) +(defn- stop-messaging [] + (messaging/stop-connection config/config (:stream-routes mount/args))) (defn stop-kafka-producers [] (mount/stop #'kafka-producers)) @@ -72,21 +72,21 @@ (mount/stop #'streams/stream)) (defn stop-workers [] - (stop-rabbitmq-connection) + (stop-messaging) (stop-kafka-producers)) (defn stop-server [] (mount/stop #'server/server) - (stop-rabbitmq-connection)) + (stop-messaging)) (defn stop-stream [] (stop-kafka-streams) - (stop-rabbitmq-connection) + (stop-messaging) (stop-kafka-producers)) (defn stop-management-apis [] (mount/stop #'server/server) - (stop-rabbitmq-connection)) + (stop-messaging)) (def valid-modes-fns {:api-server {:start-fn start-server :stop-fn stop-server} @@ -115,9 +115,9 @@ (defn stop-common-states [] (mount/stop #'config/config #'metrics/statsd-reporter - #'rmqw/connection #'nrepl-server/server - #'tracer/tracer)) + #'tracer/tracer) + (stop-messaging)) (defn start "Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc" diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index 8f3f0d59..eefead6a 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -2,7 +2,7 @@ (:require [ziggurat.mapper :as mpr] [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.config :refer [ziggurat-config get-in-config]] - [ziggurat.messaging.rabbitmq-wrapper :as rmqw] + [ziggurat.messaging.messaging :as messaging] [ziggurat.messaging.util :refer :all] [ziggurat.metrics :as metrics] [clojure.tools.logging :as log] @@ -40,7 +40,7 @@ (assoc message-payload :retry-count retry-count))))) (defn read-messages-from-queue [queue-name topic-entity ack? count] - (let [messages (rmqw/get-messages-from-queue queue-name ack? count)] + (let [messages (messaging/get-messages-from-queue queue-name ack? count)] (for [message messages] (if-not (nil? message) (convert-to-message-payload message topic-entity) @@ -56,7 +56,7 @@ (read-messages-from-queue (get-dead-set-queue-name topic-entity (ziggurat-config) channel) topic-entity false count)))) (defn process-messages-from-queue [queue-name count processing-fn] - (rmqw/process-messages-from-queue queue-name count processing-fn)) + (messaging/process-messages-from-queue queue-name count processing-fn)) (defn process-dead-set-messages "This method reads and processes `count` number of messages from RabbitMQ dead-letter queue for topic `topic-entity` and @@ -70,9 +70,9 @@ (when (get-in ziggurat-config [:retry :enabled]) (dotimes [_ (get-in ziggurat-config [:jobs :instant :worker-count])] (let [queue-name (get-instant-queue-name topic-entity ziggurat-config)] - (rmqw/start-subscriber (get-in ziggurat-config [:jobs :instant :prefetch-count]) - (mpr/mapper-func mapper-fn channels) - queue-name))))) + (messaging/start-subscriber (get-in ziggurat-config [:jobs :instant :prefetch-count]) + (mpr/mapper-func mapper-fn channels) + queue-name))))) (defn start-channels-subscriber [channels topic-entity ziggurat-config] (doseq [channel channels] @@ -80,9 +80,9 @@ channel-handler-fn (second channel)] (dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count] 0)] (let [queue-name (get-channel-instant-queue-name topic-entity channel-key ziggurat-config)] - (rmqw/start-subscriber 1 - (mpr/channel-mapper-func channel-handler-fn channel-key) - queue-name)))))) + (messaging/start-subscriber 1 + (mpr/channel-mapper-func channel-handler-fn channel-key) + queue-name)))))) (defn start-subscribers "Starts the subscriber to the instant queue of the rabbitmq" diff --git a/src/ziggurat/messaging/messaging.clj b/src/ziggurat/messaging/messaging.clj new file mode 100644 index 00000000..ac142fda --- /dev/null +++ b/src/ziggurat/messaging/messaging.clj @@ -0,0 +1,64 @@ +(ns ziggurat.messaging.messaging + (:require [ziggurat.config :refer [ziggurat-config]] + [ziggurat.messaging.rabbitmq-wrapper :as rmqw] + [ziggurat.messaging.messaging-interface :as messaging-interface] + [clojure.tools.logging :as log])) + +(def messaging-impl (atom nil)) + +(defn get-implementation [] + (if (nil? @messaging-impl) + (throw (Exception. "Messaging Library has not been initialized, please make sure the messaging library has been initialized")) + @messaging-impl)) + +(defn- get-messaging-implementor-constructor [] + (if-let [configured-metrics-class-constructor (get-in (ziggurat-config) [:messaging :constructor])] + (let [configured-constructor-symbol (symbol configured-metrics-class-constructor) + constructor-namespace (namespace configured-constructor-symbol) + _ (require [(symbol constructor-namespace)]) + metric-constructor (resolve configured-constructor-symbol)] + (if (nil? metric-constructor) + (throw (ex-info "Incorrect messaging_interface implementation constructor configured. Please fix it." {:constructor-configured configured-constructor-symbol})) + metric-constructor)) + rmqw/->RabbitMQMessaging)) + +(defn initialise-messaging-library [] + (let [messaging-impl-constructor (get-messaging-implementor-constructor)] + (reset! messaging-impl (messaging-impl-constructor)))) + +(defn start-connection [config stream-routes] + (initialise-messaging-library) + (log/info "Initialized Messaging Library") + (messaging-interface/start-connection (get-implementation) config stream-routes)) + +(defn stop-connection [config stream-routes] + (log/info "Stopping Messaging Library") + (messaging-interface/stop-connection (get-implementation) config stream-routes)) + +(defn create-and-bind-queue + ([queue-name exchange-name] + (messaging-interface/create-and-bind-queue (get-implementation) queue-name exchange-name)) + ([queue-name exchange-name dead-letter-exchange] + (messaging-interface/create-and-bind-queue (get-implementation) queue-name exchange-name dead-letter-exchange))) + +(defn publish + ([exchange message-payload] + (messaging-interface/publish (get-implementation) exchange message-payload)) + ([exchange message-payload expiration] + (messaging-interface/publish (get-implementation) exchange message-payload expiration))) + +(defn get-messages-from-queue + ([queue-name ack?] + (messaging-interface/get-messages-from-queue (get-implementation) queue-name ack?)) + ([queue-name ack? count] + (messaging-interface/get-messages-from-queue (get-implementation) queue-name ack? count))) + +(defn process-messages-from-queue [queue-name count processing-fn] + (messaging-interface/process-messages-from-queue (get-implementation) queue-name count processing-fn)) + +(defn start-subscriber [prefetch-count wrapped-mapper-fn queue-name] + (messaging-interface/start-subscriber (get-implementation) prefetch-count wrapped-mapper-fn queue-name)) + +(defn consume-message [ch meta payload ack?] + (messaging-interface/consume-message (get-implementation) ch meta payload ack?)) + diff --git a/src/ziggurat/messaging/messaging_interface.clj b/src/ziggurat/messaging/messaging_interface.clj new file mode 100644 index 00000000..4d02ec54 --- /dev/null +++ b/src/ziggurat/messaging/messaging_interface.clj @@ -0,0 +1,79 @@ +(ns ziggurat.messaging.messaging-interface) + +(defprotocol MessagingProtocol + "A protocol that defines the interface for queue-based retry implementation libraries. Any type or class that implements this protocol can be passed to the messaging namespace to be used for retrying messages. + Example implementations for this protocol: + + start-connection [config stream-routes] + This is used to initialize the messaging library so that it push messages to the retry backend. + Args: + - impl: the class object that implements this protocol, i.e. an instance of the deftype for example + - config: It is the config map defined in the `config.edn` file. It includes the entire config map + - stream-routes: It is a map containing topic entities and their handler functions, channels and their handler functions + + stop-connection [impl connection config stream-routes] + This is used to stop the messaging library and cleanup resources. + Args: + - impl: the class object that implements this protocol, i.e. an instance of the deftype for example + - connection: The connection object that is used which can be used to close and cleanup network connections to the retry backend + - config: It is the config map defined in the `config.edn` file. It includes the entire config map + - stream-routes: It is a map containing topic entities and their handler functions, channels and their handler functions + + create-and-bind-queue [impl queue-name exchange-name dead-letter-exchange] + This is used to create a queue and bind it to an exchange. + Args: + - impl: the class object that implements this protocol, i.e. an instance of the deftype for example + - queue-name: The name of the queue which will be bound the exchange + - exchange-name: The name of the exchange to bind the queue to, Example if `test_application_queue` is bound to `test_application_exchange` then all messages which need to be sent to `test_application_queue` + will have to be sent to the `test_application_exchange` which will route it to the bound queue. + - dead-letter-exchange: The name of the dead-letter-exchange if specified will be bound the queue which will in turn route all messages which have exceeded the max retries to the specified dead-letter-exchange + + get-messages-from-queue [impl queue-name ack? count] + This is used to fetch and deserialize them messages from the queue. + Args: + - impl: the class object that implements this protocol, i.e. an instance of the deftype for example + - queue-name: this is the name of the queue from where to fetch messages + - ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption. + - count: `count` number of messages will be fetched from the queue. If not provided it will default to an arbitrary value, ideally `1`. + + process-messages-from-queue [impl queue-name count processing-fn] + This is used to process messages from the queue. This method does not return the messages unlike `get-messages-from-queue`, instead it runs the `processing-fn` against every message consumed from the queue + Args: + - impl: the class object that implements this protocol, i.e. an instance of the deftype for example + - queue-name: the name of the queue to consume messages from + - ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption. + - count: `count` number of messages will be consumed from the queue. If not provided it will default to an arbitrary value, ideally `1`. + + start-subscriber [impl prefetch-count wrapped-mapper-fn queue-name] + This method is used to start subscribers for a specific queue + Args: + - impl: the class object that implements this protocol, i.e. an instance of the deftype for example + - prefetch-count: The maximum number of unacknowledged messages that a consumer can receive at once + - wrapped-mapper-fn: This is the processing-fn which will run against every messaged consumed from the queue + - queue-name: This is the name of the queue to which a subscriber will subscribe to + + consume-message [impl ch meta payload ack?] + ; Todo should be removed as it is only being used for testing + This method deserializes the message consumed from the queue + Args: + - impl: the class object that implements this protocol, i.e. an instance of the deftype for example + - ch: A Channel object + - meta: metadata containing information about the message payload + - payload: the message payload in byte-array format as fetched from the queue + - ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption. + " + + (start-connection [impl config stream-routes]) + (stop-connection [impl config stream-routes]) + (publish + [impl exchange message-payload] + [impl exchange message-payload expiration]) + (create-and-bind-queue + [impl queue-name exchange-name] + [impl queue-name exchange-name dead-letter-exchange]) + (get-messages-from-queue + [impl queue-name ack?] + [impl queue-name ack? count]) + (process-messages-from-queue [impl queue-name count processing-fn]) + (start-subscriber [impl prefetch-count wrapped-mapper-fn queue-name]) + (consume-message [impl ch meta payload ack?])) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 6a0480aa..c2edbb55 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -3,9 +3,8 @@ [ziggurat.config :refer [ziggurat-config rabbitmq-config channel-retry-config]] [ziggurat.messaging.util :refer :all] [ziggurat.retry :refer [with-retry]] - [ziggurat.messaging.rabbitmq-wrapper :as rmqw] + [ziggurat.messaging.messaging :as messaging] [ziggurat.messaging.util :as util] - [ziggurat.messaging.rabbitmq.connection :as rm-conn] [ziggurat.sentry :refer [sentry-reporter]])) (def MAX_EXPONENTIAL_RETRIES 25) @@ -62,8 +61,8 @@ (defn get-queue-timeout-ms [message-payload] "Calculate queue timeout for delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled." - (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms) - retry-count (-> (ziggurat-config) :retry :count) + (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms) + retry-count (-> (ziggurat-config) :retry :count) message-retry-count (:retry-count message-payload)] (if (= :exponential (-> (ziggurat-config) :retry :type)) (get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms) @@ -72,8 +71,8 @@ (defn get-channel-queue-timeout-ms [topic-entity channel message-payload] "Calculate queue timeout for channel delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled." (let [channel-queue-timeout-ms (get-channel-queue-timeout-or-default-timeout topic-entity channel) - message-retry-count (:retry-count message-payload) - channel-retry-count (get-channel-retry-count topic-entity channel)] + message-retry-count (:retry-count message-payload) + channel-retry-count (get-channel-retry-count topic-entity channel)] (if (= :exponential (channel-retry-type topic-entity channel)) (get-exponential-backoff-timeout-ms channel-retry-count message-retry-count channel-queue-timeout-ms) channel-queue-timeout-ms))) @@ -82,17 +81,17 @@ "This function return delay exchange name for retry when using flow without channel. It will return exchange name with retry count as suffix if exponential backoff enabled." (let [{:keys [exchange-name]} (:delay (rabbitmq-config)) exchange-name (prefixed-queue-name topic-entity exchange-name) - retry-count (-> (ziggurat-config) :retry :count)] + retry-count (-> (ziggurat-config) :retry :count)] (if (= :exponential (-> (ziggurat-config) :retry :type)) (let [message-retry-count (:retry-count message-payload) - backoff-exponent (get-backoff-exponent retry-count message-retry-count)] + backoff-exponent (get-backoff-exponent retry-count message-retry-count)] (prefixed-queue-name exchange-name backoff-exponent)) exchange-name))) (defn get-channel-delay-exchange-name [topic-entity channel message-payload] "This function return delay exchange name for retry when using channel flow. It will return exchange name with retry count as suffix if exponential backoff enabled." (let [{:keys [exchange-name]} (:delay (rabbitmq-config)) - exchange-name (prefixed-channel-name topic-entity channel exchange-name) + exchange-name (prefixed-channel-name topic-entity channel exchange-name) channel-retry-count (get-channel-retry-count topic-entity channel)] (if (= :exponential (channel-retry-type topic-entity channel)) (let [message-retry-count (:retry-count message-payload) @@ -101,40 +100,40 @@ exchange-name))) (defn publish-to-delay-queue [message-payload] - (let [topic-entity (:topic-entity message-payload) - exchange-name (get-delay-exchange-name topic-entity message-payload) + (let [topic-entity (:topic-entity message-payload) + exchange-name (get-delay-exchange-name topic-entity message-payload) queue-timeout-ms (get-queue-timeout-ms message-payload)] - (rmqw/publish exchange-name message-payload queue-timeout-ms))) + (messaging/publish exchange-name message-payload queue-timeout-ms))) (defn publish-to-dead-queue [message-payload] (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) topic-entity (:topic-entity message-payload) exchange-name (prefixed-queue-name topic-entity exchange-name)] - (rmqw/publish exchange-name message-payload))) + (messaging/publish exchange-name message-payload))) (defn publish-to-instant-queue [message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) topic-entity (:topic-entity message-payload) exchange-name (prefixed-queue-name topic-entity exchange-name)] - (rmqw/publish exchange-name message-payload))) + (messaging/publish exchange-name message-payload))) (defn publish-to-channel-delay-queue [channel message-payload] - (let [topic-entity (:topic-entity message-payload) - exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload) + (let [topic-entity (:topic-entity message-payload) + exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload) queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)] - (rmqw/publish exchange-name message-payload queue-timeout-ms))) + (messaging/publish exchange-name message-payload queue-timeout-ms))) (defn publish-to-channel-dead-queue [channel message-payload] (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) topic-entity (:topic-entity message-payload) exchange-name (prefixed-channel-name topic-entity channel exchange-name)] - (rmqw/publish exchange-name message-payload))) + (messaging/publish exchange-name message-payload))) (defn publish-to-channel-instant-queue [channel message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) - topic-entity (:topic-entity message-payload) + topic-entity (:topic-entity message-payload) exchange-name (prefixed-channel-name topic-entity channel exchange-name)] - (rmqw/publish exchange-name message-payload))) + (messaging/publish exchange-name message-payload))) (defn retry [{:keys [retry-count topic-entity] :as message-payload}] (when (-> (ziggurat-config) :retry :enabled) @@ -155,7 +154,7 @@ queue-name (delay-queue-name topic-entity queue-name) exchange-name (prefixed-queue-name topic-entity exchange-name) dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)] - (rmqw/create-and-bind-queue queue-name exchange-name dead-letter-exchange-name))) + (messaging/create-and-bind-queue queue-name exchange-name dead-letter-exchange-name))) (defn- make-delay-queue-with-retry-count [topic-entity retry-count] (let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config)) @@ -164,7 +163,7 @@ dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange) sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))] (doseq [s (range 1 sequence)] - (rmqw/create-and-bind-queue (prefixed-queue-name queue-name s) (prefixed-queue-name exchange-name s) dead-letter-exchange-name)))) + (messaging/create-and-bind-queue (prefixed-queue-name queue-name s) (prefixed-queue-name exchange-name s) dead-letter-exchange-name)))) (defn- make-channel-delay-queue-with-retry-count [topic-entity channel retry-count] (make-delay-queue-with-retry-count (with-channel-name topic-entity channel) retry-count)) @@ -176,7 +175,7 @@ (let [{:keys [queue-name exchange-name]} (queue-type (rabbitmq-config)) queue-name (prefixed-queue-name topic-identifier queue-name) exchange-name (prefixed-queue-name topic-identifier exchange-name)] - (rmqw/create-and-bind-queue queue-name exchange-name))) + (messaging/create-and-bind-queue queue-name exchange-name))) (defn- make-channel-queue [topic-entity channel-name queue-type] (make-queue (with-channel-name topic-entity channel-name) queue-type)) @@ -206,7 +205,7 @@ (defn make-queues [stream-routes] (when (util/is-connection-required? (ziggurat-config) stream-routes) (doseq [topic-entity (keys stream-routes)] - (let [channels (get-channel-names stream-routes topic-entity) + (let [channels (get-channel-names stream-routes topic-entity) retry-type (retry-type)] (make-channel-queues channels topic-entity) (when (-> (ziggurat-config) :retry :enabled) @@ -218,12 +217,12 @@ "Please use it only after understanding its risks and implications." "Its contract can change in the future releases of Ziggurat.") (make-delay-queue-with-retry-count topic-entity (-> (ziggurat-config) :retry :count))) - (= :linear retry-type) (make-delay-queue topic-entity) - (nil? retry-type) (do - (log/warn "[Deprecation Notice]: Please note that the configuration for retries has changed." - "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide" - "Use :type to specify the type of retry mechanism in the config.") - (make-delay-queue topic-entity)) + (= :linear retry-type) (make-delay-queue topic-entity) + (nil? retry-type) (do + (log/warn "[Deprecation Notice]: Please note that the configuration for retries has changed." + "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide" + "Use :type to specify the type of retry mechanism in the config.") + (make-delay-queue topic-entity)) :else (do (log/warn "Incorrect keyword for type passed, falling back to linear backoff for topic Entity: " topic-entity) (make-delay-queue topic-entity)))))))) diff --git a/src/ziggurat/messaging/rabbitmq_wrapper.clj b/src/ziggurat/messaging/rabbitmq_wrapper.clj index de2c3115..57bfcfad 100644 --- a/src/ziggurat/messaging/rabbitmq_wrapper.clj +++ b/src/ziggurat/messaging/rabbitmq_wrapper.clj @@ -1,6 +1,5 @@ (ns ziggurat.messaging.rabbitmq-wrapper (:require [ziggurat.config :refer [get-in-config]] - [mount.core :refer [defstate]] [ziggurat.config :refer [ziggurat-config]] [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.retry :refer [with-retry]] @@ -9,44 +8,71 @@ [ziggurat.messaging.rabbitmq.connection :as rmq-connection] [ziggurat.messaging.rabbitmq.producer :as rmq-producer] [ziggurat.messaging.rabbitmq.consumer :as rmq-consumer] - [mount.core :as mount])) + [ziggurat.messaging.messaging-interface :refer [MessagingProtocol]])) -(defn start-connection [config stream-routes] - (when (is-connection-required? (:ziggurat config) stream-routes) - (rmq-connection/start-connection config))) +(def connection (atom nil)) + +(defn get-connection [] @connection) -(defn stop-connection [connection config stream-routes] - (when (is-connection-required? (:ziggurat config) stream-routes) - (rmq-connection/stop-connection connection config))) +(defn start-connection [config stream-routes] + (when (and (is-connection-required? (:ziggurat config) stream-routes) + (nil? (get-connection))) + (reset! connection (rmq-connection/start-connection config)))) -(defstate connection - :start (start-connection ziggurat.config/config (:stream-routes (mount/args))) - :stop (stop-connection connection ziggurat.config/config (:stream-routes (mount/args)))) +(defn stop-connection [config stream-routes] + (when (and (is-connection-required? (:ziggurat config) stream-routes) + (not (nil? (get-connection)))) + (rmq-connection/stop-connection (get-connection) config) + (reset! connection nil))) (defn publish ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration] - (rmq-producer/publish connection exchange message-payload expiration))) + (rmq-producer/publish (get-connection) exchange message-payload expiration))) (defn create-and-bind-queue ([queue-name exchange-name] (create-and-bind-queue queue-name exchange-name nil)) ([queue-name exchange-name dead-letter-exchange] - (rmq-producer/create-and-bind-queue connection queue-name exchange-name dead-letter-exchange))) + (rmq-producer/create-and-bind-queue (get-connection) queue-name exchange-name dead-letter-exchange))) (defn get-messages-from-queue ([queue-name ack?] (get-messages-from-queue queue-name ack? 1)) ([queue-name ack? count] - (rmq-consumer/get-messages-from-queue connection queue-name ack? count))) + (rmq-consumer/get-messages-from-queue (get-connection) queue-name ack? count))) (defn process-messages-from-queue [queue-name count processing-fn] - (rmq-consumer/process-messages-from-queue connection queue-name count processing-fn)) + (rmq-consumer/process-messages-from-queue (get-connection) queue-name count processing-fn)) (defn start-subscriber [prefetch-count wrapped-mapper-fn queue-name] - (rmq-consumer/start-subscriber connection prefetch-count wrapped-mapper-fn queue-name)) + (rmq-consumer/start-subscriber (get-connection) prefetch-count wrapped-mapper-fn queue-name)) (defn consume-message [ch meta payload ack?] (rmq-consumer/consume-message ch meta payload ack?)) +(deftype RabbitMQMessaging [] MessagingProtocol + (start-connection [impl config stream-routes] + (start-connection config stream-routes)) + (stop-connection [impl config stream-routes] + (stop-connection config stream-routes)) + (create-and-bind-queue [impl queue-name exchange-name] + (create-and-bind-queue queue-name exchange-name)) + (create-and-bind-queue [impl queue-name exchange-name dead-letter-exchange] + (create-and-bind-queue queue-name exchange-name dead-letter-exchange)) + (publish [impl exchange message-payload] + (publish exchange message-payload)) + (publish [impl exchange message-payload expiration] + (publish exchange message-payload expiration)) + (get-messages-from-queue [impl queue-name ack?] + (get-messages-from-queue queue-name ack?)) + (get-messages-from-queue [impl queue-name ack? count] + (get-messages-from-queue queue-name ack? count)) + (process-messages-from-queue [impl queue-name count processing-fn] + (process-messages-from-queue queue-name count processing-fn)) + (start-subscriber [impl prefetch-count wrapped-mapper-fn queue-name] + (start-subscriber prefetch-count wrapped-mapper-fn queue-name)) + (consume-message [impl ch meta payload ack?] + (consume-message ch meta payload ack?))) + diff --git a/src/ziggurat/middleware/metrics/stream_joins_diff.clj b/src/ziggurat/middleware/metrics/stream_joins_diff.clj new file mode 100644 index 00000000..b9ddd48d --- /dev/null +++ b/src/ziggurat/middleware/metrics/stream_joins_diff.clj @@ -0,0 +1,24 @@ +(ns ziggurat.middleware.metrics.stream-joins-diff + (:require [ziggurat.metrics :as metrics] + [ziggurat.config :refer [ziggurat-config]])) + +(defn- publish-diff-between-joined-messages-helper + [message] + (let [keys (keys message) + values (vals message) + left-topic-key (first keys) + left (first values) + right-topic-key (second keys) + right (second values) + service-name (:app-name (ziggurat-config)) + leftEventTimeStamp (:nanos (:event-timestamp left)) + rightEventTimeStamp (:nanos (:event-timestamp right)) + diff (Math/abs ^Integer (- leftEventTimeStamp rightEventTimeStamp))] + (metrics/report-histogram [service-name "stream-joins-message-diff"] diff {:left (name left-topic-key) :right (name right-topic-key)}))) + +(defn publish-diff-between-joined-messages + [handler-fn] + (fn [message] + (do (publish-diff-between-joined-messages-helper message) + (handler-fn message)))) + diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index 92749194..09f0416d 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -5,7 +5,8 @@ [mount.core :as mount] [ziggurat.config :as config] [ziggurat.messaging.util :as util] - [ziggurat.messaging.rabbitmq-wrapper :refer [connection]] + [ziggurat.messaging.rabbitmq-wrapper :as rmqw :refer [connection]] + [ziggurat.messaging.messaging :as messaging] [ziggurat.server :refer [server]] [ziggurat.messaging.producer :as pr] [ziggurat.producer :as producer] @@ -61,10 +62,10 @@ (:exchange-name (exchange-type (config/rabbitmq-config)))) (defn delete-queues [stream-routes] - (with-open [ch (lch/open connection)] + (with-open [ch (lch/open @connection)] (doseq [topic-entity (keys stream-routes)] (let [topic-identifier (name topic-entity) - channels (util/get-channel-names stream-routes topic-entity)] + channels (util/get-channel-names stream-routes topic-entity)] (lq/delete ch (util/prefixed-queue-name topic-identifier (get-queue-name :instant))) (lq/delete ch (util/prefixed-queue-name topic-identifier (get-queue-name :dead-letter))) (lq/delete ch (pr/delay-queue-name topic-identifier (get-queue-name :delay))) @@ -74,10 +75,10 @@ (lq/delete ch (util/prefixed-channel-name topic-identifier channel (get-queue-name :delay)))))))) (defn delete-exchanges [stream-routes] - (with-open [ch (lch/open connection)] + (with-open [ch (lch/open @connection)] (doseq [topic-entity (keys stream-routes)] (let [topic-identifier (name topic-entity) - channels (util/get-channel-names stream-routes topic-entity)] + channels (util/get-channel-names stream-routes topic-entity)] (le/delete ch (util/prefixed-queue-name topic-identifier (get-exchange-name :instant))) (le/delete ch (util/prefixed-queue-name topic-identifier (get-exchange-name :dead-letter))) (le/delete ch (util/prefixed-queue-name topic-identifier (get-exchange-name :delay))) @@ -86,16 +87,15 @@ (lq/delete ch (util/prefixed-channel-name topic-identifier channel (get-exchange-name :dead-letter))) (lq/delete ch (util/prefixed-channel-name topic-identifier channel (get-exchange-name :delay)))))))) -(defn init-rabbit-mq [f] +(defn init-messaging [f] (let [stream-routes {:default {:handler-fn #(constantly nil) :channel-1 #(constantly nil)}}] (mount-config) (mount-tracer) - (-> - (mount/only [#'connection]) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) + (mount/start) ;;TODO move it to mapper_test.clj, removal of this causes mapper_test to fail + (messaging/start-connection config/config stream-routes) (f) + (messaging/stop-connection config/config stream-routes) (mount/stop))) (defn with-start-server* [stream-routes f] diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index b5029a12..098b4002 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -3,26 +3,35 @@ [mount.core :refer [defstate] :as mount] [ziggurat.config :as config] [ziggurat.init :as init] - [ziggurat.messaging.rabbitmq-wrapper :as rmqw] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] [ziggurat.streams :as streams :refer [stream]] [ziggurat.server.test-utils :as tu] - [ziggurat.tracer :as tracer]) + [ziggurat.tracer :as tracer] + [ziggurat.messaging.messaging :as messaging]) (:import (io.opentracing.mock MockTracer))) +(def valid-modes-count 4) + +(defn exp [x n] + (if (zero? n) 1 + (* x (exp x (dec n))))) + (deftest start-calls-actor-start-fn-test (testing "The actor start fn starts before the ziggurat state and can read config" - (let [result (atom 1)] - (with-redefs [streams/start-streams (fn [_ _] (reset! result (* @result 2))) - streams/stop-streams (constantly nil) - rmqw/start-connection (fn [_ _] (reset! result (* @result 2))) - rmqw/stop-connection (constantly nil) - config/config-file "config.test.edn" - tracer/create-tracer (fn [] (MockTracer.))] + (let [result (atom 1) + start-messaging-internal-call-count 2] + (with-redefs [streams/start-streams (fn [_ _] (reset! result (* @result 2))) + streams/stop-streams (constantly nil) + ;; will be called valid modes number of times + messaging/start-connection (fn [_ _] (reset! result (* @result 2))) + messaging/stop-connection (constantly nil) + config/config-file "config.test.edn" + tracer/create-tracer (fn [] (MockTracer.))] (init/start #(reset! result (+ @result 3)) {} [] nil) (init/stop #() nil) - (is (= 16 @result)))))) + ;; some of the functions which call start-messaging, are called again. + (is (= (* 4 (exp 2 (+ start-messaging-internal-call-count valid-modes-count))) @result)))))) (deftest stop-calls-actor-stop-fn-test (testing "The actor stop fn stops before the ziggurat state" @@ -35,17 +44,18 @@ (init/stop #(reset! result (+ @result 3)) nil) (is (= 8 @result)))))) -(deftest stop-calls-idempotentcy-test +(deftest stop-calls-idempotency-test (testing "The stop function should be idempotent" - (let [result (atom 1)] - (with-redefs [streams/start-streams (constantly nil) - streams/stop-streams (constantly nil) - rmqw/stop-connection (fn [_ _ _] (reset! result (* @result 2))) - config/config-file "config.test.edn" - tracer/create-tracer (fn [] (MockTracer.))] + (let [result (atom 1) + stop-connection-internal-call-count 1] + (with-redefs [streams/start-streams (constantly nil) + streams/stop-streams (constantly nil) + messaging/stop-connection (fn [_ _] (reset! result (* @result 2))) + config/config-file "config.test.edn" + tracer/create-tracer (fn [] (MockTracer.))] (init/start #() {} [] nil) (init/stop #(reset! result (+ @result 3)) nil) - (is (= 8 @result)))))) + (is (= (* 4 (exp 2 (+ stop-connection-internal-call-count valid-modes-count))) @result)))))) (deftest start-calls-make-queues-test (testing "Start calls make queues" @@ -58,7 +68,7 @@ (is (= stream-routes expected-stream-routes))) messaging-consumer/start-subscribers (constantly nil) config/config-file "config.test.edn" - tracer/create-tracer (fn [] (MockTracer.))] + tracer/create-tracer (fn [] (MockTracer.))] (init/start #() expected-stream-routes [] nil) (init/stop #() nil) (is (= 2 @make-queues-called)))))) @@ -74,7 +84,7 @@ (is (= stream-routes expected-stream-routes))) messaging-producer/make-queues (constantly nil) config/config-file "config.test.edn" - tracer/create-tracer (fn [] (MockTracer.))] + tracer/create-tracer (fn [] (MockTracer.))] (init/start #() expected-stream-routes [] nil) (init/stop #() nil) (is (= 1 @start-subscriber-called)))))) @@ -175,11 +185,11 @@ (is (thrown? clojure.lang.ExceptionInfo (init/validate-modes modes)))))) (deftest kafka-producers-should-start - (let [args {:actor-routes [] - :stream-routes []} + (let [args {:actor-routes [] + :stream-routes []} producer-has-started (atom false)] (with-redefs [init/start-kafka-producers (fn [] (reset! producer-has-started true)) - init/start-kafka-streams (constantly nil)] + init/start-kafka-streams (constantly nil)] (testing "Starting the streams should start kafka-producers as well" (init/start-stream args) (is (= true @producer-has-started))) @@ -191,7 +201,7 @@ (deftest kafka-producers-should-stop (let [producer-has-stopped (atom false)] (with-redefs [init/stop-kafka-producers (fn [] (reset! producer-has-stopped true)) - init/stop-kafka-streams (constantly nil)] + init/stop-kafka-streams (constantly nil)] (testing "Stopping the streams should stop kafka-producers as well" (init/stop-stream) (is (= true @producer-has-stopped))) @@ -201,7 +211,3 @@ (is (= true @producer-has-stopped))) (mount/stop)))) - - - - diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index ebdc17b4..fe686c36 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -10,10 +10,10 @@ [ziggurat.util.rabbitmq :as rmq] [langohr.basic :as lb])) -(use-fixtures :once (join-fixtures [fix/init-rabbit-mq +(use-fixtures :once (join-fixtures [fix/init-messaging fix/silence-logging])) -(deftest mapper-func-test +(deftest ^:integration mapper-func-test (let [service-name (:app-name (ziggurat-config)) stream-routes {:default {:handler-fn #(constantly nil)}} topic-entity (name (first (keys stream-routes))) @@ -27,16 +27,16 @@ (let [successfully-processed? (atom false) successfully-reported-time? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces expected-metric-namespaces) - (= metric-namespaces [expected-metric-namespace])) - (= metric expected-metric) - (= additional-tags expected-additional-tags)) - (reset! successfully-processed? true))) - metrics/report-histogram (fn [metric-namespaces _ _] - (when (or (= metric-namespaces expected-report-time-namespaces) - (= metric-namespaces [report-time-namespace])) - (reset! successfully-reported-time? true)))] + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [expected-metric-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) + (reset! successfully-processed? true))) + metrics/report-histogram (fn [metric-namespaces _ _] + (when (or (= metric-namespaces expected-report-time-namespaces) + (= metric-namespaces [report-time-namespace])) + (reset! successfully-reported-time? true)))] ((mapper-func (constantly :success) []) message-payload) (is @successfully-processed?) (is @successfully-reported-time?)))) @@ -122,8 +122,8 @@ (is @sentry-report-fn-called?)))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - expected-metric-namespace "handler-fn-execution-time" + (let [reported-execution-time? (atom false) + expected-metric-namespace "handler-fn-execution-time" expected-metric-namespaces [service-name "default" expected-metric-namespace]] (with-redefs [metrics/report-histogram (fn [metric-namespaces _ _] (when (or (= metric-namespaces expected-metric-namespaces) @@ -132,19 +132,19 @@ ((mapper-func (constantly :success) []) message-payload) (is @reported-execution-time?)))))) -(deftest channel-mapper-func-test - (let [channel :channel-1 - channel-name (name channel) - service-name (:app-name (ziggurat-config)) - stream-routes {:default {:handler-fn #(constantly nil) - channel #(constantly nil)}} - topic (first (keys stream-routes)) - message-payload {:message {:foo "bar"} - :retry-count (:count (:retry (ziggurat-config))) - :topic-entity topic} - expected-topic-entity-name (name topic) - expected-additional-tags {:topic_name expected-topic-entity-name :channel_name channel-name} - increment-count-namespace "message-processing" +(deftest ^:integration channel-mapper-func-test + (let [channel :channel-1 + channel-name (name channel) + service-name (:app-name (ziggurat-config)) + stream-routes {:default {:handler-fn #(constantly nil) + channel #(constantly nil)}} + topic (first (keys stream-routes)) + message-payload {:message {:foo "bar"} + :retry-count (:count (:retry (ziggurat-config))) + :topic-entity topic} + expected-topic-entity-name (name topic) + expected-additional-tags {:topic_name expected-topic-entity-name :channel_name channel-name} + increment-count-namespace "message-processing" expected-increment-count-namespaces [service-name topic channel-name increment-count-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) @@ -213,8 +213,8 @@ (is @sentry-report-fn-called?)))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - execution-time-namespace "execution-time" + (let [reported-execution-time? (atom false) + execution-time-namespace "execution-time" expected-execution-time-namespaces [service-name expected-topic-entity-name channel-name execution-time-namespace]] (with-redefs [metrics/report-histogram (fn [metric-namespaces _ _] (when (or (= metric-namespaces expected-execution-time-namespaces) diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index 817ae1dd..a66b35e9 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -3,18 +3,18 @@ [langohr.channel :as lch] [ziggurat.config :refer [ziggurat-config rabbitmq-config]] [ziggurat.fixtures :as fix] - [ziggurat.messaging.rabbitmq-wrapper :refer [connection]] + [ziggurat.messaging.rabbitmq-wrapper :as rmqw] [ziggurat.messaging.producer :as producer] [ziggurat.retry :as retry] [ziggurat.tracer :refer [tracer]] [ziggurat.util.rabbitmq :as util] [ziggurat.messaging.consumer :as consumer])) -(use-fixtures :once (join-fixtures [fix/init-rabbit-mq +(use-fixtures :once (join-fixtures [fix/init-messaging fix/silence-logging fix/mount-metrics])) (defn- gen-message-payload [topic-entity] - {:message {:gen-key (apply str (take 10 (repeatedly #(char (+ (rand 26) 65)))))} + {:message {:gen-key (apply str (take 10 (repeatedly #(char (+ (rand 26) 65)))))} :topic-entity topic-entity}) (defn- mock-mapper-fn [{:keys [retry-counter-atom @@ -45,7 +45,7 @@ (def topic-entity :default) (deftest ^:integration process-dead-set-messages-test - (let [message-payload (assoc (gen-message-payload topic-entity) :retry-count 0)] + (let [message-payload (assoc (gen-message-payload topic-entity) :retry-count 0)] (testing "it maps the process-message-from-queue over all the messages fetched from the queue for a topic" (fix/with-queues {topic-entity {:handler-fn (constantly nil)}} (let [count 5 @@ -74,7 +74,7 @@ (is (empty? (consumer/get-dead-set-messages topic-entity channel count)))))))) (deftest ^:integration get-dead-set-messages-test - (let [message-payload (assoc (gen-message-payload topic-entity) :retry-count 0)] + (let [message-payload (assoc (gen-message-payload topic-entity) :retry-count 0)] (testing "get the dead set messages from dead set queue and don't pop the messages from the queue" (fix/with-queues {topic-entity {:handler-fn (constantly nil)}} (let [count-of-messages 10 @@ -103,7 +103,7 @@ retry-count 5 message-payload (assoc (gen-message-payload topic-entity) :retry-count 2) original-zig-config (ziggurat-config) - rmq-ch (lch/open connection)] + rmq-ch (lch/open (rmqw/get-connection))] (with-redefs [ziggurat-config (fn [] (-> original-zig-config (update-in [:retry :count] (constantly retry-count)) @@ -132,7 +132,7 @@ retry-count 5 message-payload (assoc (assoc-in (gen-message-payload topic-entity) [:message :msg] "skip") :retry-count 2) original-zig-config (ziggurat-config) - rmq-ch (lch/open connection)] + rmq-ch (lch/open (rmqw/get-connection))] (with-redefs [ziggurat-config (fn [] (-> original-zig-config (update-in [:retry :count] (constantly retry-count)) @@ -160,7 +160,7 @@ retry-count 5 no-of-msgs 2 original-zig-config (ziggurat-config) - rmq-ch (lch/open connection)] + rmq-ch (lch/open (rmqw/get-connection))] (with-redefs [ziggurat-config (fn [] (-> original-zig-config (update-in [:retry :count] (constantly retry-count)) @@ -188,12 +188,12 @@ (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} (let [no-of-workers 3 original-zig-config (ziggurat-config) - ch (lch/open connection) + ch (lch/open (rmqw/get-connection)) counter (atom 0)] - (with-redefs [ziggurat-config (fn [] (-> original-zig-config - (update-in [:retry :enabled] (constantly true)) - (update-in [:jobs :instant :worker-count] (constantly no-of-workers)))) + (with-redefs [ziggurat-config (fn [] (-> original-zig-config + (update-in [:retry :enabled] (constantly true)) + (update-in [:jobs :instant :worker-count] (constantly no-of-workers)))) consumer/start-retry-subscriber* (fn [_ _ _ _] (swap! counter inc))] (consumer/start-subscribers nil (ziggurat-config)) @@ -204,13 +204,13 @@ (testing "start subscribers should call start-subscriber* according to the product of worker and mapper-fns in stream-routes" (let [no-of-workers 3 original-zig-config (ziggurat-config) - ch (lch/open connection) + ch (lch/open (rmqw/get-connection)) counter (atom 0) stream-routes {topic-entity {:handler-fn #(constantly nil)} - :test {:handler-fn #(constantly nil)}}] - (with-redefs [ziggurat-config (fn [] (-> original-zig-config - (update-in [:retry :enabled] (constantly true)) - (update-in [:jobs :instant :worker-count] (constantly no-of-workers)))) + :test {:handler-fn #(constantly nil)}}] + (with-redefs [ziggurat-config (fn [] (-> original-zig-config + (update-in [:retry :enabled] (constantly true)) + (update-in [:jobs :instant :worker-count] (constantly no-of-workers)))) consumer/start-retry-subscriber* (fn [_ _ _ _] (swap! counter inc))] (consumer/start-subscribers stream-routes (ziggurat-config)) (is (= (count stream-routes) @counter)) @@ -229,7 +229,7 @@ :retry-limit 2 :success-promise success-promise}) original-zig-config (ziggurat-config) - rmq-ch (lch/open connection)] + rmq-ch (lch/open (rmqw/get-connection))] (fix/with-queues {topic-entity {:handler-fn #(constantly nil) channel channel-fn}} (with-redefs [ziggurat-config (fn [] (-> original-zig-config @@ -256,7 +256,7 @@ :retry-limit 2 :success-promise success-promise}) original-zig-config (ziggurat-config) - rmq-ch (lch/open connection)] + rmq-ch (lch/open (rmqw/get-connection))] (fix/with-queues {topic-entity {:handler-fn #(constantly nil) channel channel-fn}} (with-redefs [ziggurat-config (fn [] (-> original-zig-config @@ -271,13 +271,13 @@ (deftest ^:integration start-retry-subscriber-test (testing "creates a span when tracer is enabled" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [retry-counter (atom 0) - call-counter (atom 0) - success-promise (promise) - retry-count 3 - message-payload (assoc (gen-message-payload topic-entity) :retry-count 3) + (let [retry-counter (atom 0) + call-counter (atom 0) + success-promise (promise) + retry-count 3 + message-payload (assoc (gen-message-payload topic-entity) :retry-count 3) original-zig-config (ziggurat-config) - rmq-ch (lch/open connection)] + rmq-ch (lch/open (rmqw/get-connection))] (.reset tracer) (with-redefs [ziggurat-config (fn [] (-> original-zig-config (update-in [:retry :count] (constantly retry-count)) diff --git a/test/ziggurat/messaging/dead_set_test.clj b/test/ziggurat/messaging/dead_set_test.clj index 61e0f24b..0ea947cc 100644 --- a/test/ziggurat/messaging/dead_set_test.clj +++ b/test/ziggurat/messaging/dead_set_test.clj @@ -5,7 +5,7 @@ [ziggurat.messaging.producer :as producer] [ziggurat.util.rabbitmq :as rmq])) -(use-fixtures :once (join-fixtures [fix/init-rabbit-mq +(use-fixtures :once (join-fixtures [fix/init-messaging fix/silence-logging fix/mount-metrics])) diff --git a/test/ziggurat/messaging/messaging_test.clj b/test/ziggurat/messaging/messaging_test.clj new file mode 100644 index 00000000..bbc0b74e --- /dev/null +++ b/test/ziggurat/messaging/messaging_test.clj @@ -0,0 +1,225 @@ +(ns ziggurat.messaging.messaging-test + (:require [clojure.test :refer :all] + [ziggurat.messaging.messaging :as messaging] + [ziggurat.util.mock-messaging-implementation] + [ziggurat.util.mock-messaging-implementation :as mock-messaging] + [ziggurat.config :refer [ziggurat-config]] + [ziggurat.config :as config]) + (:import (ziggurat.messaging.rabbitmq_wrapper RabbitMQMessaging) + (ziggurat.util.mock_messaging_implementation MockMessaging))) + +(defn reset-impl [] (reset! messaging/messaging-impl nil)) + +(def ziggurat-config-for-mock-impl (constantly {:messaging + {:constructor "ziggurat.util.mock-messaging-implementation/->MockMessaging"}})) + +(deftest initialise-messaging-library-test + (testing "it should default to RabbitMQMessaging library if no implementation is provided" + (with-redefs [ziggurat-config (constantly {:messaging {:constructor nil}})] + (messaging/initialise-messaging-library) + (is (instance? RabbitMQMessaging (deref messaging/messaging-impl))) + (reset-impl))) + + (testing "it should initialise the messaging library as provided in the config" + (with-redefs [ziggurat-config (constantly {:messaging + {:constructor "ziggurat.util.mock-messaging-implementation/->MockMessaging"}})] + (messaging/initialise-messaging-library) + (is (instance? MockMessaging (deref messaging/messaging-impl))) + (reset-impl))) + + (testing "It raises an exception when incorrect constructor has been configured" + (with-redefs [ziggurat-config (constantly {:messaging {:constructor "incorrect-constructor"}})] + (is (thrown? RuntimeException (messaging/initialise-messaging-library)))))) + +(deftest get-implementation-test + (testing "it should throw an Exception if `messaging-impl` atom is nil" + (with-redefs [messaging/messaging-impl (atom nil)] + (is (thrown? Exception (messaging/get-implementation))))) + + (testing "it should return `messaging-impl` atom if it is not nil" + (let [atom-val {:foo "bar"}] + (with-redefs [messaging/messaging-impl (atom atom-val)] + (is (= atom-val (messaging/get-implementation))) + (reset-impl))))) + +(deftest start-connection-test + (testing "it should call the mock-messaging/start-connection function with the correct arguments" + (let [test-stream-routes {:default {:handler-fn (constantly nil)}} + start-connection-called? (atom false)] + (with-redefs [ziggurat-config ziggurat-config-for-mock-impl + mock-messaging/start-connection (fn [config stream-routes] + (when (and (= config config/config) + (= test-stream-routes stream-routes)) + (reset! start-connection-called? true)))] + (messaging/start-connection config/config test-stream-routes) + (is (= true @start-connection-called?)) + (reset-impl))))) + +(deftest stop-connection-test + (testing "it should mock-messaging/stop-connection function with the correct arguments" + (let [test-stream-routes {:default {:handler-fn (constantly nil)}} + stop-connection-called? (atom false)] + (with-redefs [ziggurat-config ziggurat-config-for-mock-impl + mock-messaging/stop-connection (fn [config stream-routes] + (when (and (= config config/config) + (= test-stream-routes stream-routes)) + (reset! stop-connection-called? true)))] + (messaging/start-connection config/config test-stream-routes) + (messaging/stop-connection config/config test-stream-routes) + (is (= true @stop-connection-called?)) + (reset-impl))))) + +(deftest create-and-bind-queue-test + (testing "it should call the mock-messaging-create-and-bind-queue function when dead-letter-exchange is not passed" + (let [stream-routes {:default {:handler-fn (constantly nil)}} + test-queue-name "test-queue" + test-exchange-name "test-exchange-name" + create-and-bind-queue-called? (atom false)] + (with-redefs [ziggurat-config ziggurat-config-for-mock-impl + mock-messaging/create-and-bind-queue (fn [queue-name exchange-name] + (when (and (= queue-name test-queue-name) + (= exchange-name test-exchange-name)) + (reset! create-and-bind-queue-called? true)))] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/create-and-bind-queue test-queue-name test-exchange-name) + (is (= true @create-and-bind-queue-called?)) + (reset-impl)))) + + (testing "it should call the mock-messaging-create-and-bind-queue function when dead-letter-exchange is passed" + (let [stream-routes {:default {:handler-fn (constantly nil)}} + test-queue-name "test-queue" + test-exchange-name "test-exchange-name" + test-dead-letter-exchange "test-dlx" + create-and-bind-queue-called? (atom false)] + (with-redefs [ziggurat-config ziggurat-config-for-mock-impl + mock-messaging/create-and-bind-queue (fn [queue-name exchange-name dead-letter-exchange] + (when (and (= queue-name test-queue-name) + (= exchange-name test-exchange-name) + (= dead-letter-exchange test-dead-letter-exchange)) + (reset! create-and-bind-queue-called? true)))] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/create-and-bind-queue test-queue-name test-exchange-name test-dead-letter-exchange) + (is (= true @create-and-bind-queue-called?)) + (reset-impl))))) + +(deftest publish-test + (testing "it should call `mock-messaging/publish` without expiration" + (let [test-exchange-name "test-exchange" + stream-routes {:default {:handler-fn (constantly nil)}} + test-message-payload {:foo "bar"} + publish-called? (atom false)] + (with-redefs [mock-messaging/publish (fn [exchange message-payload] + (when (and (= exchange test-exchange-name) + (= message-payload test-message-payload)) + (reset! publish-called? true))) + ziggurat-config ziggurat-config-for-mock-impl] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/publish test-exchange-name test-message-payload) + (is (= @publish-called? true)) + (reset-impl)))) + + (testing "It should call `mock-messaging/publish` with expiration" + (let [test-exchange-name "test-exchange" + stream-routes {:default {:handler-fn (constantly nil)}} + test-message-payload {:foo "bar"} + test-expiration "42" + publish-called? (atom false)] + (with-redefs [mock-messaging/publish (fn [exchange message-payload expiration] + (when (and (= exchange test-exchange-name) + (= message-payload test-message-payload) + (= expiration test-expiration)) + (reset! publish-called? true))) + ziggurat-config ziggurat-config-for-mock-impl] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/publish test-exchange-name test-message-payload test-expiration) + (is (= @publish-called? true)) + (reset-impl))))) + +(deftest get-messages-from-queue-test + (testing "it should call `mock-messaging/get-messages-from-queue` without `count`" + (let [test-queue-name "test-queue" + stream-routes {:default {:handler-fn (constantly nil)}} + get-messages-from-queue-called? (atom false)] + (with-redefs [mock-messaging/get-messages-from-queue (fn [queue-name ack?] + (when (and (= test-queue-name queue-name) + (= ack? true))) + (reset! get-messages-from-queue-called? true)) + ziggurat-config ziggurat-config-for-mock-impl] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/get-messages-from-queue test-queue-name true) + (reset-impl)))) + + (testing "it should call `mock-messaging/get-messages-from-queue` when `count` is specified" + (let [test-queue-name "test-queue" + test-count 5 + stream-routes {:default {:handler-fn (constantly nil)}} + get-messages-from-queue-called? (atom false)] + (with-redefs [mock-messaging/get-messages-from-queue (fn [queue-name ack? count] + (when (and (= test-queue-name queue-name) + (= test-count count) + (= ack? true))) + (reset! get-messages-from-queue-called? true)) + ziggurat-config ziggurat-config-for-mock-impl] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/get-messages-from-queue test-queue-name true test-count) + (reset-impl))))) + +(deftest process-messages-from-queue-test + (testing "It should call `mock-messaging/process-messages-from-queue` with the correct arguments" + (let [test-queue "test-queue" + test-count 5 + test-processing-fn (constantly {:foo "bar"}) + stream-routes {:default {:handler-fn (constantly nil)}} + process-messages-from-queue-called? (atom false)] + (with-redefs [mock-messaging/process-messages-from-queue (fn [queue-name count processing-fn] + (when (and (= queue-name test-queue) + (= count test-count) + (= {:foo "bar"} (processing-fn))) + (reset! process-messages-from-queue-called? true))) + ziggurat-config ziggurat-config-for-mock-impl] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/process-messages-from-queue test-queue test-count test-processing-fn) + (is (= true @process-messages-from-queue-called?)) + (reset-impl))))) + +(deftest start-subscriber-test + (testing "it should call `mock-messaging/start-subscriber` with the right arguments" + (let [test-prefetch-count 5 + test-queue-name "test-queue" + test-mapper-fn (constantly {:foo "bar"}) + stream-routes {:default {:handler-fn (constantly nil)}} + start-subscriber-called? (atom false)] + (with-redefs [mock-messaging/start-subscriber (fn [prefetch-count wrapped-mapper-fn queue-name] + (when (and (= queue-name test-queue-name) + (= test-prefetch-count prefetch-count) + (= (wrapped-mapper-fn) {:foo "bar"})) + (reset! start-subscriber-called? true))) + ziggurat-config ziggurat-config-for-mock-impl] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/start-subscriber test-prefetch-count test-mapper-fn test-queue-name) + (reset-impl))))) + +(deftest consume-message-test + (testing "it should call `mock-messaging/consume-message` with the correct arguments" + (let [test-meta {:meta "bar"} + test-payload {:foo "bar"} + test-ack? false + stream-routes {:default {:handler-fn (constantly nil)}}] + (with-redefs [mock-messaging/consume-message (fn [_ meta payload ack?] + (when (and (= meta test-meta) + (= payload test-payload) + (= ack? test-ack?)))) + ziggurat-config ziggurat-config-for-mock-impl] + (messaging/start-connection config/config stream-routes) + (messaging/stop-connection config/config stream-routes) + (messaging/consume-message nil test-meta test-payload test-ack?) + (reset-impl))))) + diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index ab6a7126..39c081cc 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -6,6 +6,7 @@ [ziggurat.config :refer [rabbitmq-config ziggurat-config channel-retry-config]] [ziggurat.fixtures :as fix] [ziggurat.messaging.rabbitmq-wrapper :as rmqw] + [ziggurat.messaging.messaging :as messaging] [ziggurat.messaging.producer :as producer] [ziggurat.messaging.util :as util] [ziggurat.util.rabbitmq :as rmq] @@ -16,7 +17,7 @@ [mount.core :as mount]) (:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader])) -(use-fixtures :once (join-fixtures [fix/init-rabbit-mq +(use-fixtures :once (join-fixtures [fix/init-messaging fix/silence-logging])) (def topic-entity :default) @@ -31,9 +32,9 @@ :type :linear}))] (testing "it does not create queues when stream-routes are not passed" (let [counter (atom 0)] - (with-redefs [rmqw/create-and-bind-queue (fn - ([_ _] (swap! counter inc)) - ([_ _ _] (swap! counter inc)))] + (with-redefs [messaging/create-and-bind-queue (fn + ([_ _] (swap! counter inc)) + ([_ _ _] (swap! counter inc)))] (producer/make-queues nil) (producer/make-queues []) (is (= 0 @counter))))) @@ -42,9 +43,9 @@ (let [counter (atom 0) stream-routes {:test {:handler-fn #(constantly nil)} :test2 {:handler-fn #(constantly nil)}}] - (with-redefs [rmqw/create-and-bind-queue (fn - ([_ _] (swap! counter inc)) - ([_ _ _] (swap! counter inc)))] + (with-redefs [messaging/create-and-bind-queue (fn + ([_ _] (swap! counter inc)) + ([_ _ _] (swap! counter inc)))] (producer/make-queues stream-routes) (is (= (* (count stream-routes) 3) @counter))))) @@ -55,11 +56,11 @@ instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config))))] - (with-redefs [rmqw/create-and-bind-queue (fn - ([queue-name exchange-name] - (swap! counter inc)) - ([queue-name exchange-name dead-letter-exchange] - (swap! counter inc)))] + (with-redefs [messaging/create-and-bind-queue (fn + ([queue-name exchange-name] + (swap! counter inc)) + ([queue-name exchange-name dead-letter-exchange] + (swap! counter inc)))] (producer/make-queues stream-routes) (is (= (* (count stream-routes) 3) @counter))))) @@ -75,19 +76,19 @@ dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config))))] - (with-redefs [rmqw/create-and-bind-queue (fn - ([queue-name exchange-name] - (is - (or - (and - (= queue-name instant-queue-name) - (= exchange-name instant-exchange-name)) - (and (= queue-name dead-queue-name) - (= exchange-name dead-exchange-name))))) - ([queue-name exchange-name dead-letter-exchange] - (is (and (= queue-name delay-queue-name) - (= exchange-name delay-exchange-name) - (= dead-letter-exchange instant-exchange-name)))))] + (with-redefs [messaging/create-and-bind-queue (fn + ([queue-name exchange-name] + (is + (or + (and + (= queue-name instant-queue-name) + (= exchange-name instant-exchange-name)) + (and (= queue-name dead-queue-name) + (= exchange-name dead-exchange-name))))) + ([queue-name exchange-name dead-letter-exchange] + (is (and (= queue-name delay-queue-name) + (= exchange-name delay-exchange-name) + (= dead-letter-exchange instant-exchange-name)))))] (producer/make-queues stream-routes)))) @@ -105,23 +106,23 @@ exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) exponential-delay-exchange-name #(util/prefixed-queue-name delay-exchange-name %)] - (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:retry :type] :exponential)) - rmqw/create-and-bind-queue (fn - ([queue-name exchange-name] - (is - (or - (and - (= queue-name instant-queue-name) - (= exchange-name instant-exchange-name)) - (and (= queue-name dead-queue-name) - (= exchange-name dead-exchange-name))))) - ;; Verifying that delay queues with appropriate suffixes are created - ([queue-name exchange-name dead-letter-exchange] - (let [exponential-delay-queues (map exponential-delay-queue-name (range 1 (inc retry-count))) - exponential-delay-exchanges (map exponential-delay-exchange-name (range 1 (inc retry-count)))] - (is (and (some #{queue-name} exponential-delay-queues) - (some #{exchange-name} exponential-delay-exchanges) - (= dead-letter-exchange instant-exchange-name))))))] + (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:retry :type] :exponential)) + messaging/create-and-bind-queue (fn + ([queue-name exchange-name] + (is + (or + (and + (= queue-name instant-queue-name) + (= exchange-name instant-exchange-name)) + (and (= queue-name dead-queue-name) + (= exchange-name dead-exchange-name))))) + ;; Verifying that delay queues with appropriate suffixes are created + ([queue-name exchange-name dead-letter-exchange] + (let [exponential-delay-queues (map exponential-delay-queue-name (range 1 (inc retry-count))) + exponential-delay-exchanges (map exponential-delay-exchange-name (range 1 (inc retry-count)))] + (is (and (some #{queue-name} exponential-delay-queues) + (some #{exchange-name} exponential-delay-exchanges) + (= dead-letter-exchange instant-exchange-name))))))] (producer/make-queues stream-routes)))) (testing "it creates queues with suffixes in the range [1, 25] when exponential backoff is enabled and retry-count is more than 25" @@ -136,25 +137,25 @@ exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) exponential-delay-exchange-name #(util/prefixed-queue-name delay-exchange-name %)] - (with-redefs [config/ziggurat-config (constantly (-> ziggurat-config - (assoc-in [:retry :type] :exponential) - (assoc-in [:retry :count] 50))) - rmqw/create-and-bind-queue (fn - ([queue-name exchange-name] - (is - (or - (and - (= queue-name instant-queue-name) - (= exchange-name instant-exchange-name)) - (and (= queue-name dead-queue-name) - (= exchange-name dead-exchange-name))))) - ;; Verifying that delay queues with appropriate suffixes are created - ([queue-name exchange-name dead-letter-exchange] - (let [exponential-delay-queues (map exponential-delay-queue-name (range 1 (inc 25))) - exponential-delay-exchanges (map exponential-delay-exchange-name (range 1 (inc 25)))] - (is (and (some #{queue-name} exponential-delay-queues) - (some #{exchange-name} exponential-delay-exchanges) - (= dead-letter-exchange instant-exchange-name))))))] + (with-redefs [config/ziggurat-config (constantly (-> ziggurat-config + (assoc-in [:retry :type] :exponential) + (assoc-in [:retry :count] 50))) + messaging/create-and-bind-queue (fn + ([queue-name exchange-name] + (is + (or + (and + (= queue-name instant-queue-name) + (= exchange-name instant-exchange-name)) + (and (= queue-name dead-queue-name) + (= exchange-name dead-exchange-name))))) + ;; Verifying that delay queues with appropriate suffixes are created + ([queue-name exchange-name dead-letter-exchange] + (let [exponential-delay-queues (map exponential-delay-queue-name (range 1 (inc 25))) + exponential-delay-exchanges (map exponential-delay-exchange-name (range 1 (inc 25)))] + (is (and (some #{queue-name} exponential-delay-queues) + (some #{exchange-name} exponential-delay-exchanges) + (= dead-letter-exchange instant-exchange-name))))))] (producer/make-queues stream-routes)))) (testing "it creates delay queue for linear retries when retry type is not defined in the config" @@ -202,9 +203,9 @@ :retry {:enabled false}))] (testing "it does not create queues when stream-routes are not passed" (let [counter (atom 0)] - (with-redefs [rmqw/create-and-bind-queue (fn - ([_ _] (swap! counter inc)) - ([_ _ _] (swap! counter inc)))] + (with-redefs [messaging/create-and-bind-queue (fn + ([_ _] (swap! counter inc)) + ([_ _ _] (swap! counter inc)))] (producer/make-queues {:default {:handler-fn #(constantly :success)}}) (is (= 0 @counter))))) @@ -225,19 +226,19 @@ channel1-dead-exchange-name (util/prefixed-queue-name prefix-name dead-letter-exchange-suffix) expected-queue-status {:message-count 0 :consumer-count 0}] - (with-redefs [rmqw/create-and-bind-queue (fn - ([queue-name exchange-name] - (is - (or - (and - (= queue-name channel1-instant-queue-name) - (= exchange-name channel1-instant-exchange-name)) - (and (= queue-name channel1-dead-queue-name) - (= exchange-name channel1-dead-exchange-name))))) - ([queue-name exchange-name dead-letter-exchange] - (is (and (= queue-name channel1-delay-queue-name) - (= exchange-name channel1-delay-exchange-name) - (= dead-letter-exchange channel1-instant-exchange-name)))))] + (with-redefs [messaging/create-and-bind-queue (fn + ([queue-name exchange-name] + (is + (or + (and + (= queue-name channel1-instant-queue-name) + (= exchange-name channel1-instant-exchange-name)) + (and (= queue-name channel1-dead-queue-name) + (= exchange-name channel1-dead-exchange-name))))) + ([queue-name exchange-name dead-letter-exchange] + (is (and (= queue-name channel1-delay-queue-name) + (= exchange-name channel1-delay-exchange-name) + (= dead-letter-exchange channel1-instant-exchange-name)))))] (producer/make-queues stream-routes)))))) (testing "when retries are disabled" @@ -254,17 +255,17 @@ channel1-instant-exchange-name (util/prefixed-queue-name prefix-name instant-exchange-suffix) expected-queue-status {:message-count 0 :consumer-count 0}] - (with-redefs [rmqw/create-and-bind-queue (fn - ([queue-name exchange-name] - (is - (and - (= queue-name channel1-instant-queue-name) - (= exchange-name channel1-instant-exchange-name)))))]) + (with-redefs [messaging/create-and-bind-queue (fn + ([queue-name exchange-name] + (is + (and + (= queue-name channel1-instant-queue-name) + (= exchange-name channel1-instant-exchange-name)))))]) (producer/make-queues stream-routes))))))) (deftest ^:integration make-queues-integration-tests (testing "it creates queues with topic entity from stream routes" - (with-open [ch (lch/open rmqw/connection)] + (with-open [ch (lch/open (rmqw/get-connection))] (let [stream-routes {:default {:handler-fn #(constantly :success)}} instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) @@ -292,17 +293,17 @@ (le/delete ch dead-exchange-name)))) (testing "it creates queues with suffixes in the range [1, retry-count] when exponential backoff is enabled" - (with-open [ch (lch/open rmqw/connection)] - (let [stream-routes {:default {:handler-fn #(constantly :success)}} - retry-count (get-in (ziggurat-config) [:retry :count]) - instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) - instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) - delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) - delay-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:delay (rabbitmq-config)))) - dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) - dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config)))) - expected-queue-status {:message-count 0 :consumer-count 0} - exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) + (with-open [ch (lch/open (rmqw/get-connection))] + (let [stream-routes {:default {:handler-fn #(constantly :success)}} + retry-count (get-in (ziggurat-config) [:retry :count]) + instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) + instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) + delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) + delay-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:delay (rabbitmq-config)))) + dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) + dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config)))) + expected-queue-status {:message-count 0 :consumer-count 0} + exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) exponential-delay-exchange-name #(util/prefixed-queue-name delay-exchange-name %)] (with-redefs [config/ziggurat-config (constantly (assoc-in (ziggurat-config) [:retry :type] :exponential))] @@ -333,7 +334,7 @@ (is (= 0 @counter))))) (testing "it creates queues with topic entity for channels only" - (with-open [ch (lch/open rmqw/connection)] + (with-open [ch (lch/open (rmqw/get-connection))] (let [stream-routes {:default {:handler-fn #(constantly :success) :channel-1 #(constantly :success)}} instant-queue-suffix (:queue-name (:instant (rabbitmq-config))) instant-exchange-suffix (:exchange-name (:instant (rabbitmq-config))) @@ -368,7 +369,7 @@ :stream-router {:default {:channels {:channel-1 {:retry {:enabled false}}}}}))] (testing "it creates instant queues with topic entity for channels only" - (with-open [ch (lch/open rmqw/connection)] + (with-open [ch (lch/open (rmqw/get-connection))] (let [stream-routes {:default {:handler-fn #(constantly :success) :channel-1 #(constantly :success)}} instant-queue-suffix (:queue-name (:instant (rabbitmq-config))) instant-exchange-suffix (:exchange-name (:instant (rabbitmq-config))) @@ -644,7 +645,7 @@ (is (true? @prefixed-queue-name-called?)) (is (true? @publish-called?))))) (testing "An exception is raised, if publishing to RabbitMQ fails even after retries" - (mount/stop #'rmqw/connection) + (rmqw/stop-connection config/config (:stream-routes (mount/args))) (is (thrown? clojure.lang.ExceptionInfo (producer/publish-to-instant-queue message-payload))))) (deftest publish-to-delay-queue-test diff --git a/test/ziggurat/messaging/rabbitmq/connection_test.clj b/test/ziggurat/messaging/rabbitmq/connection_test.clj index 93bed7c8..5e81796f 100644 --- a/test/ziggurat/messaging/rabbitmq/connection_test.clj +++ b/test/ziggurat/messaging/rabbitmq/connection_test.clj @@ -3,7 +3,6 @@ [clojure.test :refer :all] [ziggurat.fixtures :as fix] [langohr.core :as rmq] - [mount.core :as mount] [ziggurat.config :as config :refer [ziggurat-config]] [ziggurat.messaging.rabbitmq-wrapper :as rmqw] [ziggurat.messaging.rabbitmq.connection :as rmq-conn])) @@ -12,26 +11,24 @@ (deftest ^:integration start-connection-test-with-tracer-disabled (testing "should provide the correct number of threads for the thread pool if channels are present" - (let [thread-count (atom 0) - orig-rmq-connect rmq/connect - rmq-connect-called? (atom false) - stream-routes {:default {:handler-fn (constantly :channel-1) - :channel-1 (constantly :success)}} + (let [thread-count (atom 0) + orig-rmq-connect rmq/connect + rmq-connect-called? (atom false) + stream-routes {:default {:handler-fn (constantly :channel-1) + :channel-1 (constantly :success)}} overriden-default-config (assoc config/config :ziggurat (assoc (ziggurat-config) :jobs {:instant {:worker-count 4}} :retry {:enabled true} :stream-router {:default {:channels {:channel-1 {:worker-count 10}}}} :tracer {}))] - (with-redefs [rmq/connect (fn [provided-config] - (reset! rmq-connect-called? true) - (reset! thread-count (.getCorePoolSize (:executor provided-config))) - (orig-rmq-connect provided-config)) - config/config overriden-default-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + (with-redefs [rmq/connect (fn [provided-config] + (reset! rmq-connect-called? true) + (reset! thread-count (.getCorePoolSize (:executor provided-config))) + (orig-rmq-connect provided-config)) + config/config overriden-default-config] + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is (= @thread-count 14)) (is @rmq-connect-called?)))) @@ -43,14 +40,12 @@ :ziggurat (assoc (ziggurat-config) :retry {:enabled true} :tracer {:enabled false}))] - (with-redefs [rmq/connect (fn [provided-config] - (reset! rmq-connect-called? true) - (orig-rmq-connect provided-config)) + (with-redefs [rmq/connect (fn [provided-config] + (reset! rmq-connect-called? true) + (orig-rmq-connect provided-config)) config/config overridden-default-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is @rmq-connect-called?)))) (testing "if retry is disabled and channels are not present it should not create connection" @@ -61,14 +56,12 @@ :ziggurat (-> (ziggurat-config) (assoc :retry {:enabled false}) (dissoc :tracer)))] - (with-redefs [rmq/connect (fn [provided-config] - (reset! rmq-connect-called? true) - (orig-rmq-connect provided-config)) + (with-redefs [rmq/connect (fn [provided-config] + (reset! rmq-connect-called? true) + (orig-rmq-connect provided-config)) config/config overridden-default-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is (not @rmq-connect-called?))))) (testing "if retry is disabled and channels are present it should create connection" @@ -86,10 +79,8 @@ (reset! rmq-connect-called? true) (orig-rmq-connect provided-config)) config/config overridden-default-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is @rmq-connect-called?)))) (testing "should provide the correct number of threads for the thread pool for multiple channels" @@ -108,10 +99,8 @@ (reset! thread-count (.getCorePoolSize (:executor provided-config))) (orig-rmq-connect provided-config)) config/config overriden-default-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is (= @thread-count 19))))) (testing "should provide the correct number of threads for the thread pool when channels are not present" @@ -122,13 +111,15 @@ :jobs {:instant {:worker-count 4}} :retry {:enabled true} :stream-router {:default {}} - :tracer {:enabled false}))] + :tracer {:enabled false})) + stream-routes {:default {}}] (with-redefs [rmq/connect (fn [provided-config] (reset! thread-count (.getCorePoolSize (:executor provided-config))) (orig-rmq-connect provided-config)) config/config overriden-config] - (mount/start (mount/only [#'rmqw/connection])) - (mount/stop #'rmqw/connection) + + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is (= @thread-count 4))))) (testing "should provide the correct number of threads for the thread pool for multiple stream routes" @@ -139,13 +130,14 @@ :retry {:enabled true} :stream-router {:default {:channels {:channel-1 {:worker-count 10}}} :default-1 {:channels {:channel-1 {:worker-count 8}}}} - :tracer {:enabled false}))] - (with-redefs [rmq/connect (fn [provided-config] - (reset! thread-count (.getCorePoolSize (:executor provided-config))) - (orig-rmq-connect provided-config)) + :tracer {:enabled false})) + stream-routes {}] + (with-redefs [rmq/connect (fn [provided-config] + (reset! thread-count (.getCorePoolSize (:executor provided-config))) + (orig-rmq-connect provided-config)) config/config overridden-config] - (mount/start (mount/only [#'rmqw/connection])) - (mount/stop #'rmqw/connection) + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is (= @thread-count 26)))))) (deftest ^:integration start-connection-test-with-tracer-enabled @@ -165,11 +157,9 @@ (reset! create-connect-called? true) (reset! thread-count (.getCorePoolSize (:executor provided-config))) (orig-create-conn provided-config tracer-enabled)) - config/config overriden-default-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + config/config overriden-default-config] + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is (= @thread-count 14)) (is @create-connect-called?)))) @@ -183,11 +173,9 @@ (with-redefs [rmq-conn/create-connection (fn [provided-config tracer-enabled] (reset! create-connect-called? true) (orig-create-conn provided-config tracer-enabled)) - config/config overridden-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + config/config overridden-config] + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is @create-connect-called?)))) (testing "if retry is disabled and channels are not present it should not create connection" @@ -201,11 +189,9 @@ (with-redefs [rmq-conn/create-connection (fn [provided-config tracer-enabled] (reset! create-connect-called? true) (orig-create-conn provided-config tracer-enabled)) - config/config overriden-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + config/config overriden-config] + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is (not @create-connect-called?))))) (testing "if retry is disabled and channels are present it should create connection" @@ -221,12 +207,9 @@ (with-redefs [rmq-conn/create-connection (fn [provided-config tracer-enabled] (reset! create-connect-called? true) (orig-create-conn provided-config tracer-enabled)) - config/config overridden-config] - - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + config/config overridden-config] + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is @create-connect-called?)))) (testing "should provide the correct number of threads for the thread pool for multiple channels" @@ -242,11 +225,9 @@ (with-redefs [rmq-conn/create-connection (fn [provided-config tracer-enabled] (reset! thread-count (.getCorePoolSize (:executor provided-config))) (orig-create-conn provided-config tracer-enabled)) - config/config overridden-config] - (-> (mount/only #{#'rmqw/connection}) - (mount/with-args {:stream-routes stream-routes}) - (mount/start)) - (mount/stop #'rmqw/connection) + config/config overridden-config] + (rmqw/start-connection config/config stream-routes) + (rmqw/stop-connection config/config stream-routes) (is (= @thread-count 19))))) (testing "should provide the correct number of threads for the thread pool when channels are not present" @@ -261,9 +242,10 @@ (with-redefs [rmq-conn/create-connection (fn [provided-config tracer-enabled] (reset! thread-count (.getCorePoolSize (:executor provided-config))) (orig-create-conn provided-config tracer-enabled)) - config/config overridden-default-config] - (mount/start (mount/only [#'rmqw/connection])) - (mount/stop #'rmqw/connection) + config/config overridden-default-config] + + (rmqw/start-connection config/config {}) + (rmqw/stop-connection config/config {}) (is (= @thread-count 4))))) (testing "should provide the correct number of threads for the thread pool for multiple stream routes" @@ -279,8 +261,9 @@ (with-redefs [rmq-conn/create-connection (fn [provided-config tracer-enabled] (reset! thread-count (.getCorePoolSize (:executor provided-config))) (orig-create-conn provided-config tracer-enabled)) - config/config overridden-config] - (mount/start (mount/only [#'rmqw/connection])) - (mount/stop #'rmqw/connection) + config/config overridden-config] + + (rmqw/start-connection config/config {}) + (rmqw/stop-connection config/config {}) (is (= @thread-count 26)))))) diff --git a/test/ziggurat/messaging/rabbitmq/consumer_test.clj b/test/ziggurat/messaging/rabbitmq/consumer_test.clj index 4ce7cbbb..76b1c6d5 100644 --- a/test/ziggurat/messaging/rabbitmq/consumer_test.clj +++ b/test/ziggurat/messaging/rabbitmq/consumer_test.clj @@ -16,13 +16,13 @@ [ziggurat.mapper :as mpr]) (:import (com.rabbitmq.client Channel Connection))) -(use-fixtures :once (join-fixtures [fix/init-rabbit-mq +(use-fixtures :once (join-fixtures [fix/init-messaging fix/silence-logging])) (def topic-entity :default) (defn- gen-message-payload [topic-entity] - {:message {:gen-key (apply str (take 10 (repeatedly #(char (+ (rand 26) 65)))))} + {:message {:gen-key (apply str (take 10 (repeatedly #(char (+ (rand 26) 65)))))} :topic-entity topic-entity}) (defn- create-mock-channel [] (reify Channel @@ -128,8 +128,8 @@ mock-mapper-fn (fn [message] (when (= message-payload message) (reset! is-mocked-mpr-fn-called? true)))] - (rmq-producer/create-and-bind-queue connection queue-name exchange-name false) - (rmq-producer/publish connection exchange-name message-payload nil) + (rmq-producer/create-and-bind-queue (rmqw/get-connection) queue-name exchange-name false) + (rmq-producer/publish (rmqw/get-connection) exchange-name message-payload nil) (rmqw/start-subscriber 1 mock-mapper-fn queue-name) (Thread/sleep 5000) (is (true? @is-mocked-mpr-fn-called?))))) @@ -137,33 +137,33 @@ (deftest ^:integration process-message-test (testing "process-message function should ack message after once processing finishes" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [message (gen-message-payload topic-entity) - processing-fn (fn [message-arg] - (is (= message-arg message))) + (let [message (gen-message-payload topic-entity) + processing-fn (fn [message-arg] + (is (= message-arg message))) topic-entity-name (name topic-entity)] (producer/publish-to-dead-queue message) - (with-open [ch (lch/open rmqw/connection)] + (with-open [ch (lch/open (rmqw/get-connection))] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) - [meta payload] (lb/get ch prefixed-queue-name false) + [meta payload] (lb/get ch prefixed-queue-name false) _ (rmq-cons/process-message-from-queue ch meta payload processing-fn) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= consumed-message nil))))))) (testing "process-message function not process a message if convert-message returns nil" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [message (gen-message-payload topic-entity) + (let [message (gen-message-payload topic-entity) processing-fn-called (atom false) - processing-fn (fn [message-arg] - (if (nil? message-arg) - (reset! processing-fn-called true))) - topic-entity-name (name topic-entity)] + processing-fn (fn [message-arg] + (if (nil? message-arg) + (reset! processing-fn-called true))) + topic-entity-name (name topic-entity)] (producer/publish-to-dead-queue message) (with-redefs [consumer/read-messages-from-queue (fn [_ _ _ _] nil)] - (with-open [ch (lch/open rmqw/connection)] + (with-open [ch (lch/open (rmqw/get-connection))] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) - [meta payload] (lb/get ch prefixed-queue-name false) + [meta payload] (lb/get ch prefixed-queue-name false) _ (rmq-cons/process-message-from-queue ch meta payload processing-fn) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= false @processing-fn-called)) @@ -171,31 +171,31 @@ (testing "process-message function should reject and re-queue a message if processing fails" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [message (gen-message-payload topic-entity) - processing-fn (fn [message-arg] - (is (= message-arg message)) - (throw (Exception. "exception message"))) + (let [message (gen-message-payload topic-entity) + processing-fn (fn [message-arg] + (is (= message-arg message)) + (throw (Exception. "exception message"))) topic-entity-name (name topic-entity)] (producer/publish-to-dead-queue message) - (with-open [ch (lch/open rmqw/connection)] + (with-open [ch (lch/open (rmqw/get-connection))] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) - [meta payload] (lb/get ch prefixed-queue-name false) + [meta payload] (lb/get ch prefixed-queue-name false) _ (rmq-cons/process-message-from-queue ch meta payload processing-fn) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= consumed-message message))))))) (testing "process-message function should reject and discard a message if message conversion fails" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [message (gen-message-payload topic-entity) - processing-fn (fn [_] ()) + (let [message (gen-message-payload topic-entity) + processing-fn (fn [_] ()) topic-entity-name (name topic-entity)] (producer/publish-to-dead-queue message) - (with-open [ch (lch/open rmqw/connection)] + (with-open [ch (lch/open (rmqw/get-connection))] (with-redefs [ziggurat.messaging.consumer/convert-to-message-payload (fn [] (throw (Exception. "exception message")))] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) - [meta payload] (lb/get ch prefixed-queue-name false) + [meta payload] (lb/get ch prefixed-queue-name false) _ (rmq-cons/process-message-from-queue ch meta payload processing-fn) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= consumed-message nil))))))))) @@ -204,12 +204,12 @@ (testing "While constructing a MessagePayload, adds topic-entity as a keyword and retry-count as 0 if message does not already has :retry-count" (let [message {:foo "bar"} expected-message-payload (assoc (mpr/->MessagePayload (dissoc message :retry-count) topic-entity) :retry-count 0) - consumed-message (rmq-cons/consume-message nil {:delivery-tag "delivery-tag"} (nippy/freeze message) false) + consumed-message (rmq-cons/consume-message nil {:delivery-tag "delivery-tag"} (nippy/freeze message) false) converted-message-payload (consumer/convert-to-message-payload consumed-message "default")] (is (= converted-message-payload expected-message-payload)))) (testing "While constructing a MessagePayload, adds topic-entity as a keyword and retry-count as it exists in the message" (let [message {:foo "bar" :retry-count 4} expected-message-payload (assoc (mpr/->MessagePayload (dissoc message :retry-count) topic-entity) :retry-count 4) - consumed-message (rmq-cons/consume-message nil {:delivery-tag "delivery-tag"} (nippy/freeze message) false) + consumed-message (rmq-cons/consume-message nil {:delivery-tag "delivery-tag"} (nippy/freeze message) false) converted-message-payload (consumer/convert-to-message-payload consumed-message "default")] (is (= converted-message-payload expected-message-payload))))) \ No newline at end of file diff --git a/test/ziggurat/messaging/rabbitmq/producer_test.clj b/test/ziggurat/messaging/rabbitmq/producer_test.clj index c1a82c1c..e55c04a0 100644 --- a/test/ziggurat/messaging/rabbitmq/producer_test.clj +++ b/test/ziggurat/messaging/rabbitmq/producer_test.clj @@ -10,7 +10,7 @@ (:import (com.rabbitmq.client Channel Connection) (org.apache.kafka.common.header Header))) -(use-fixtures :once (join-fixtures [fix/init-rabbit-mq +(use-fixtures :once (join-fixtures [fix/init-messaging fix/silence-logging])) (defn- create-mock-channel [] (reify Channel diff --git a/test/ziggurat/messaging/rabbitmq/wrapper_test.clj b/test/ziggurat/messaging/rabbitmq/wrapper_test.clj deleted file mode 100644 index ac95f6e4..00000000 --- a/test/ziggurat/messaging/rabbitmq/wrapper_test.clj +++ /dev/null @@ -1,2 +0,0 @@ -(ns ziggurat.messaging.rabbitmq.wrapper-test - (:require [clojure.test :refer :all])) diff --git a/test/ziggurat/messaging/rabbitmq_wrapper_test.clj b/test/ziggurat/messaging/rabbitmq_wrapper_test.clj new file mode 100644 index 00000000..8a12ec6a --- /dev/null +++ b/test/ziggurat/messaging/rabbitmq_wrapper_test.clj @@ -0,0 +1,210 @@ +(ns ziggurat.messaging.rabbitmq-wrapper-test + (:require [clojure.test :refer :all] + [ziggurat.config :as config] + [ziggurat.messaging.rabbitmq-wrapper :as rmqw] + [ziggurat.messaging.rabbitmq.connection :as rmq-connection] + [ziggurat.messaging.rabbitmq.producer :as rmq-producer] + [ziggurat.messaging.rabbitmq.consumer :as rmq-consumer] + [ziggurat.fixtures :as fix])) + +(use-fixtures :once fix/mount-only-config) + +(defn- create-mock-object [] (reify Object + (toString [this] ""))) + +(defn reset-connection-atom [] (reset! rmqw/connection nil)) + +(deftest start-connection-test + (testing "start-connection should call the `rmq-connection/start-connection` and set the connection atom" + (let [default-config config/config + start-connection-called-count (atom false) + stream-routes {:default {:handler-fn (constantly nil)}} + config (assoc default-config + :ziggurat {:retry {:enabled true}})] + (with-redefs [rmq-connection/start-connection (fn [_] (reset! start-connection-called-count true) {:foo "bar"})] + (rmqw/start-connection config stream-routes) + (is (= true @start-connection-called-count)) + (is (= {:foo "bar"} (rmqw/get-connection)))) + (reset-connection-atom))) + + (testing "start-connection should not call `rmq-connection/start-connection` function if retries are disabled and connection atom is nil" + (let [default-config config/config + start-connection-called? (atom false) + stream-routes {:default {:handler-fn (constantly nil)}} + config (assoc default-config + :ziggurat {:retry {:enabled false}})] + (with-redefs [rmq-connection/start-connection (fn [_] (reset! start-connection-called? true))] + (rmqw/start-connection config stream-routes) + (is (= false @start-connection-called?)) + (is (= (rmqw/get-connection) nil)))) + (reset-connection-atom))) + +(deftest stop-connection-test + (testing "stop-connection should stop the connection if retries are enabled and connection is not nil" + (let [default-config config/config + stop-connection-called? (atom false) + stream-routes {:default {:handler-fn (constantly nil)}} + config (assoc default-config + :ziggurat {:retry {:enabled true}})] + (with-redefs [rmq-connection/stop-connection (fn [_ _] (reset! stop-connection-called? true)) + rmqw/get-connection (constantly {:foo "bar"})] + (rmqw/stop-connection config stream-routes) + (is (= true @stop-connection-called?)))) + (reset-connection-atom)) + + (testing "stop-connection should not call the `rmq-connection/stop-connection` function if connection atom is nil" + (let [default-config config/config + stop-connection-called? (atom false) + stream-routes {:default {:handler-fn (constantly nil)}} + config (assoc default-config + :ziggurat {:retry {:enabled true}})] + (with-redefs [rmq-connection/stop-connection (fn [_ _] (reset! stop-connection-called? true)) + rmqw/get-connection (constantly nil)] + (rmqw/stop-connection config stream-routes) + (is (= false @stop-connection-called?)))) + (reset-connection-atom))) + +(deftest start-connection-idempotency-test + (testing "It should not set the connection atom if it has already been set" + (let [default-config config/config + start-connection-called-count (atom false) + mock-object (create-mock-object) + stream-routes {:default {:handler-fn (constantly nil)}} + config (assoc default-config + :ziggurat {:retry {:enabled true}})] + (with-redefs [rmq-connection/start-connection (fn [_] (reset! start-connection-called-count true) mock-object)] + (rmqw/start-connection config stream-routes) + (rmqw/start-connection config stream-routes) + (rmqw/start-connection config stream-routes) + (is (= true @start-connection-called-count)) + (is (= mock-object (rmqw/get-connection)))) + (reset-connection-atom)))) + +(deftest stop-connection-idempotency-test + (testing "It should not reset the connection atom if connection has already been stopped" + (let [default-config config/config + stop-connection-called-count (atom 0) + stream-routes {:default {:handler-fn (constantly nil)}} + config (assoc default-config + :ziggurat {:retry {:enabled true}})] + (with-redefs [rmqw/get-connection (constantly nil) + rmq-connection/stop-connection (fn [_ _] (swap! stop-connection-called-count inc))] + (rmqw/stop-connection config stream-routes) + (rmqw/stop-connection config stream-routes) + (rmqw/stop-connection config stream-routes) + (is (= 0 @stop-connection-called-count))) + (reset-connection-atom)))) + +(deftest create-and-bind-queue-test + (testing "It should call the create `rmq-producer/create-and-bind-queue` function without dead-letter-exchange" + (let [test-queue-name "test-queue" + test-exchange-name "test-exchange" + create-and-bind-queue-called? (atom false)] + (with-redefs [rmq-producer/create-and-bind-queue (fn [_ queue-name exchange-name dead-letter-exchange] + (when (and (= queue-name test-queue-name) + (= exchange-name test-exchange-name) + (= dead-letter-exchange nil)) + (reset! create-and-bind-queue-called? true)))] + (rmqw/create-and-bind-queue test-queue-name test-exchange-name) + (is (= @create-and-bind-queue-called? true))))) + + (testing "It should call the `create rmq-producer/create-and-bind-queue` function with dead-letter-exchange" + (let [test-queue-name "test-queue" + test-exchange-name "test-exchange" + dead-letter-exchange-name "test-dead-letter-exchange" + create-and-bind-queue-called? (atom false)] + (with-redefs [rmq-producer/create-and-bind-queue (fn [_ queue-name exchange-name dead-letter-exchange] + (when (and (= queue-name test-queue-name) + (= exchange-name test-exchange-name) + (= dead-letter-exchange dead-letter-exchange-name)) + (reset! create-and-bind-queue-called? true)))] + (rmqw/create-and-bind-queue test-queue-name test-exchange-name dead-letter-exchange-name) + (is (= @create-and-bind-queue-called? true)))))) + +(deftest publish-test + (testing "it should call `rmq-producer/publish` without expiration" + (let [test-exchange-name "test-exchange" + test-message-payload {:foo "bar"} + publish-called? (atom false)] + (with-redefs [rmq-producer/publish (fn [_ exchange message-payload expiration] + (when (and (= exchange test-exchange-name) + (= message-payload test-message-payload) + (= expiration nil)) + (reset! publish-called? true)))] + (rmqw/publish test-exchange-name test-message-payload) + (is (= @publish-called? true))))) + + (testing "It should call `rmq-producer/publish` with expiration" + (let [test-exchange-name "test-exchange" + test-message-payload {:foo "bar"} + test-expiration "42" + publish-called? (atom false)] + (with-redefs [rmq-producer/publish (fn [_ exchange message-payload expiration] + (when (and (= exchange test-exchange-name) + (= message-payload test-message-payload) + (= expiration test-expiration)) + (reset! publish-called? true)))] + (rmqw/publish test-exchange-name test-message-payload test-expiration) + (is (= @publish-called? true)))))) + +(deftest get-messages-from-queue-test + (testing "it should call `rmq-consumer/get-messages-from-queue` with a default `count` of 1 when count is not specified" + (let [test-queue-name "test-queue" + default-count 1 + get-messages-from-queue-called? (atom false)] + (with-redefs [rmq-consumer/get-messages-from-queue (fn [_ queue-name ack? count] + (when (and (= test-queue-name queue-name) + (= default-count count) + (= ack? true))) + (reset! get-messages-from-queue-called? true))] + (rmqw/get-messages-from-queue test-queue-name true)))) + + (testing "it should call `rmq-consumer/get-messages-from-queue` when `count` is specified" + (let [test-queue-name "test-queue" + test-count 5 + get-messages-from-queue-called? (atom false)] + (with-redefs [rmq-consumer/get-messages-from-queue (fn [_ queue-name ack? count] + (when (and (= test-queue-name queue-name) + (= test-count count) + (= ack? true))) + (reset! get-messages-from-queue-called? true))] + (rmqw/get-messages-from-queue test-queue-name true test-count))))) + +(deftest process-messages-from-queue-test + (testing "It should call `rmq-consumer/process-messages-from-queue` with the correct arguments" + (let [test-queue "test-queue" + test-count 5 + test-processing-fn (constantly {:foo "bar"}) + process-messages-from-queue-called? (atom false)] + (with-redefs [rmq-consumer/process-messages-from-queue (fn [_ queue-name count processing-fn] + (when (and (= queue-name test-queue) + (= count test-count) + (= {:foo "bar"} (processing-fn))) + (reset! process-messages-from-queue-called? true)))] + (rmqw/process-messages-from-queue test-queue test-count test-processing-fn) + (is (= true @process-messages-from-queue-called?)))))) + +(deftest start-subscriber-test + (testing "it should call `rmq-consumer/start-subscriber` with the right arguments" + (let [test-prefetch-count 5 + test-queue-name "test-queue" + test-mapper-fn (constantly {:foo "bar"}) + start-subscriber-called? (atom false)] + (with-redefs [rmq-consumer/start-subscriber (fn [_ prefetch-count wrapped-mapper-fn queue-name] + (when (and (= queue-name test-queue-name) + (= test-prefetch-count prefetch-count) + (= (wrapped-mapper-fn) {:foo "bar"})) + (reset! start-subscriber-called? true)))] + (rmqw/start-subscriber test-prefetch-count test-mapper-fn test-queue-name))))) + +(deftest consume-message-test + (testing "it should call `rmq-consumer/consume-message` with the correct arguments" + (let [test-meta {:meta "bar"} + test-payload {:foo "bar"} + test-ack? false] + (with-redefs [rmq-consumer/consume-message (fn [_ meta ^bytes payload ack?] + (when (and (= meta test-meta) + (= payload test-payload) + (= ack? test-ack?))))] + (rmqw/consume-message nil test-meta test-payload test-ack?))))) + diff --git a/test/ziggurat/middleware/metrics/stream_joins_diff_test.clj b/test/ziggurat/middleware/metrics/stream_joins_diff_test.clj new file mode 100644 index 00000000..7d42db3a --- /dev/null +++ b/test/ziggurat/middleware/metrics/stream_joins_diff_test.clj @@ -0,0 +1,27 @@ +(ns ziggurat.middleware.metrics.stream-joins-diff-test + (:require [clojure.test :refer :all] + [ziggurat.middleware.metrics.stream-joins-diff :as mw] + [ziggurat.metrics :as metrics] + [ziggurat.fixtures :as fix])) + +(use-fixtures :once (join-fixtures [fix/mount-only-config + fix/silence-logging])) + +(deftest stream-joins-diff-test + (testing "this middleware publishes the metrics and then calls the provided handler function" + (let [handler-fn-called? (atom false) + metrics-published? (atom false) + message {:topic-1 {:event-timestamp {:nanos 123}} :topic-2 {:event-timestamp {:nanos 456}}} + handler-fn (fn [msg] + (if (and (true? @metrics-published?) (= msg message)) + (reset! handler-fn-called? true)))] + (with-redefs [metrics/report-histogram (fn [metric-namespaces val tags] + (if (and (= "application_name" (first metric-namespaces)) + (= "stream-joins-message-diff" (second metric-namespaces)) + (= val (- 456 123)) + (= tags {:left "topic-1" :right "topic-2"}) + (false? @metrics-published?)) + (reset! metrics-published? true)))] + ((mw/publish-diff-between-joined-messages handler-fn) message) + (is (true? @handler-fn-called?)))))) + diff --git a/test/ziggurat/util/mock_messaging_implementation.clj b/test/ziggurat/util/mock_messaging_implementation.clj new file mode 100644 index 00000000..50e4b094 --- /dev/null +++ b/test/ziggurat/util/mock_messaging_implementation.clj @@ -0,0 +1,44 @@ +(ns ziggurat.util.mock-messaging-implementation + (:require [clojure.test :refer :all] + [ziggurat.messaging.messaging-interface :refer [MessagingProtocol]])) + +(defn start-connection [config stream-routes] nil) + +(defn stop-connection [config stream-routes] nil) + +(defn publish + ([exchange message-payload] + (publish exchange message-payload nil)) + ([exchange message-payload expiration] nil)) + +(defn create-and-bind-queue + ([queue-name exchange-name] + (create-and-bind-queue queue-name exchange-name nil)) + ([queue-name exchange-name dead-letter-exchange] nil)) + +(defn get-messages-from-queue + ([queue-name ack?] (get-messages-from-queue queue-name ack? 1)) + ([queue-name ack? count] nil)) + +(defn process-messages-from-queue [queue-name count processing-fn] nil) + +(defn start-subscriber [prefetch-count wrapped-mapper-fn queue-name] nil) + +(defn consume-message [ch meta payload ack?] nil) + +(deftype MockMessaging [] + MessagingProtocol + (start-connection [impl config stream-routes] (start-connection config stream-routes)) + (stop-connection [impl config stream-routes] (stop-connection config stream-routes)) + (create-and-bind-queue [impl queue-name exchange-name] + (create-and-bind-queue queue-name exchange-name)) + (create-and-bind-queue [impl queue-name exchange-name dead-letter-exchange] + (create-and-bind-queue queue-name exchange-name dead-letter-exchange)) + (publish [impl exchange message-payload] (publish exchange message-payload)) + (publish [impl exchange message-payload expiration] (publish exchange message-payload expiration)) + (get-messages-from-queue [impl queue-name ack?] (get-messages-from-queue queue-name ack?)) + (get-messages-from-queue [impl queue-name ack? count] (get-messages-from-queue queue-name ack? count)) + (process-messages-from-queue [impl queue-name count processing-fn] (process-messages-from-queue queue-name count processing-fn)) + (start-subscriber [impl prefetch-count wrapped-mapper-fn queue-name] (start-subscriber prefetch-count wrapped-mapper-fn queue-name)) + (consume-message [impl ch meta payload ack?] (consume-message ch meta payload ack?))) + diff --git a/test/ziggurat/util/rabbitmq.clj b/test/ziggurat/util/rabbitmq.clj index 14caf1a5..b9f1599c 100644 --- a/test/ziggurat/util/rabbitmq.clj +++ b/test/ziggurat/util/rabbitmq.clj @@ -12,7 +12,7 @@ (:import (com.rabbitmq.client AlreadyClosedException Channel))) (defn- get-msg-from-rabbitmq [queue-name topic-name] - (with-open [ch (lch/open connection)] + (with-open [ch (lch/open @connection)] (try (let [[meta payload] (lb/get ch queue-name false)] (when (seq payload) @@ -21,7 +21,7 @@ nil)))) (defn- get-msg-from-rabbitmq-without-ack [queue-name topic-name] - (with-open [ch (lch/open connection)] + (with-open [ch (lch/open @connection)] (try (let [[meta payload] (lb/get ch queue-name false)] (when (seq payload) @@ -67,13 +67,13 @@ (defn get-message-from-retry-queue [topic sequence] (let [{:keys [queue-name]} (:delay (rabbitmq-config)) delay-queue-name (delay-queue-name topic queue-name) - queue-name (rutil/prefixed-queue-name delay-queue-name sequence)] + queue-name (rutil/prefixed-queue-name delay-queue-name sequence)] (get-msg-from-rabbitmq queue-name topic))) (defn get-message-from-channel-retry-queue [topic channel sequence] (let [{:keys [queue-name]} (:delay (rabbitmq-config)) delay-queue-name (delay-queue-name (rutil/with-channel-name topic channel) queue-name) - queue-name (rutil/prefixed-queue-name delay-queue-name sequence)] + queue-name (rutil/prefixed-queue-name delay-queue-name sequence)] (get-msg-from-rabbitmq queue-name topic))) (defn close [^Channel channel]