Skip to content

Commit

Permalink
Merge 6581fa2 into 951ee6c
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang93 committed Jun 27, 2020
2 parents 951ee6c + 6581fa2 commit 769250c
Show file tree
Hide file tree
Showing 18 changed files with 827 additions and 363 deletions.
37 changes: 18 additions & 19 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[mount.core :as mount :refer [defstate]]
[schema.core :as s]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.messaging.rabbitmq-wrapper :as rmqw]
[ziggurat.messaging.messaging :as messaging]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
[ziggurat.metrics :as metrics]
Expand All @@ -27,15 +27,15 @@
(mount/with-args args)
(mount/start))))

(defn- start-rabbitmq-connection [args]
(start* #{#'rmqw/connection} args))
(defn- start-messaging [args]
(messaging/start-connection config/config (:stream-routes args)))

(defn- start-rabbitmq-consumers [args]
(start-rabbitmq-connection args)
(defn- start-messaging-consumers [args]
(start-messaging args)
(messaging-consumer/start-subscribers (get args :stream-routes) (ziggurat-config)))

(defn- start-rabbitmq-producers [args]
(start-rabbitmq-connection args)
(defn- start-messaging-producers [args]
(start-messaging args)
(messaging-producer/make-queues (get args :stream-routes)))

(defn start-kafka-producers []
Expand All @@ -46,24 +46,24 @@

(defn start-stream [args]
(start-kafka-producers)
(start-rabbitmq-producers args)
(start-messaging-producers args)
(start-kafka-streams args))

(defn start-management-apis [args]
(start-rabbitmq-connection args)
(start-messaging args)
(start* #{#'server/server} (dissoc args :actor-routes)))

(defn start-server [args]
(start-rabbitmq-connection args)
(start-messaging args)
(start* #{#'server/server} args))

(defn start-workers [args]
(start-kafka-producers)
(start-rabbitmq-producers args)
(start-rabbitmq-consumers args))
(start-messaging-producers args)
(start-messaging-consumers args))

(defn- stop-rabbitmq-connection []
(mount/stop #'rmqw/connection))
(defn- stop-messaging []
(messaging/stop-connection config/config (:stream-routes mount/args)))

(defn stop-kafka-producers []
(mount/stop #'kafka-producers))
Expand All @@ -72,21 +72,21 @@
(mount/stop #'streams/stream))

(defn stop-workers []
(stop-rabbitmq-connection)
(stop-messaging)
(stop-kafka-producers))

(defn stop-server []
(mount/stop #'server/server)
(stop-rabbitmq-connection))
(stop-messaging))

(defn stop-stream []
(stop-kafka-streams)
(stop-rabbitmq-connection)
(stop-messaging)
(stop-kafka-producers))

(defn stop-management-apis []
(mount/stop #'server/server)
(stop-rabbitmq-connection))
(stop-messaging))

(def valid-modes-fns
{:api-server {:start-fn start-server :stop-fn stop-server}
Expand Down Expand Up @@ -115,7 +115,6 @@
(defn stop-common-states []
(mount/stop #'config/config
#'metrics/statsd-reporter
#'rmqw/connection
#'nrepl-server/server
#'tracer/tracer))

Expand Down
65 changes: 65 additions & 0 deletions src/ziggurat/messaging/messaging.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
(ns ziggurat.messaging.messaging
(:require [ziggurat.config :refer [ziggurat-config]]
[ziggurat.messaging.rabbitmq-wrapper :as rmqw]
[ziggurat.messaging.messaging-interface :as messaging-interface]
[clojure.tools.logging :as log]))

(def messaging-impl (atom nil))

(defn get-implementation []
(if (nil? @messaging-impl)
(throw (Exception. "Messaging Library is not initialized"))
@messaging-impl))

(defn- get-messaging-implementor-constructor []
(if-let [configured-metrics-class-constructor (get-in (ziggurat-config) [:messaging :constructor])]
(let [configured-constructor-symbol (symbol configured-metrics-class-constructor)
constructor-namespace (namespace configured-constructor-symbol)
_ (require [(symbol constructor-namespace)])
metric-constructor (resolve configured-constructor-symbol)]

(if (nil? metric-constructor)
(throw (ex-info "Incorrect messaging_interface implementation constructor configured. Please fix it." {:constructor-configured configured-constructor-symbol}))
metric-constructor))
rmqw/->RabbitMQMessaging))

(defn initialise-messaging-library []
(let [messaging-impl-constructor (get-messaging-implementor-constructor)]
(reset! messaging-impl (messaging-impl-constructor))))

(defn start-connection [config stream-routes]
(do (initialise-messaging-library)
(log/info "Initializing Messaging Library")
(messaging-interface/start-connection (get-implementation) config stream-routes)))

(defn stop-connection [config stream-routes]
(do (log/info "Stopping Messaging Library")
(messaging-interface/stop-connection (get-implementation) config stream-routes)))

(defn create-and-bind-queue
([queue-name exchange-name]
(create-and-bind-queue queue-name exchange-name nil))
([queue-name exchange-name dead-letter-exchange]
(messaging-interface/create-and-bind-queue (get-implementation) queue-name exchange-name dead-letter-exchange)))

(defn publish
([exchange message-payload]
(messaging-interface/publish (get-implementation) exchange message-payload))
([exchange message-payload expiration]
(messaging-interface/publish (get-implementation) exchange message-payload expiration)))

(defn get-messages-from-queue
([queue-name ack?]
(messaging-interface/get-messages-from-queue (get-implementation) queue-name ack?))
([queue-name ack? count]
(messaging-interface/get-messages-from-queue (get-implementation) queue-name ack? count)))

(defn process-messages-from-queue [queue-name count processing-fn]
(messaging-interface/process-messages-from-queue (get-implementation) queue-name count processing-fn))

(defn start-subscriber [prefetch-count wrapped-mapper-fn queue-name]
(messaging-interface/start-subscriber (get-implementation) prefetch-count wrapped-mapper-fn queue-name))

(defn consume-message [ch meta payload ack?]
(messaging-interface/consume-message (get-implementation) ch meta payload ack?))

79 changes: 79 additions & 0 deletions src/ziggurat/messaging/messaging_interface.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
(ns ziggurat.messaging.messaging-interface)

(defprotocol MessagingProtocol
"A protocol that defines the interface for queue-based retry implementation libraries. Any type or class that implements this protocol can be passed to the messaging namespace to be used for retrying messages.
Example implementations for this protocol:
start-connection [config stream-routes]
This is used to initialize the messaging library so that it push messages to the retry backend.
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- config: It is the config map defined in the `config.edn` file. It includes the entire config map
- stream-routes: It is a map containing topic entities and their handler functions, channels and their handler functions
stop-connection [impl connection config stream-routes]
This is used to stop the messaging library and cleanup resources.
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- connection: The connection object that is used which can be used to close and cleanup network connections to the retry backend
- config: It is the config map defined in the `config.edn` file. It includes the entire config map
- stream-routes: It is a map containing topic entities and their handler functions, channels and their handler functions
create-and-bind-queue [impl queue-name exchange-name dead-letter-exchange]
This is used to create a queue and bind it to an exchange.
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- queue-name: The name of the queue which will be bound the exchange
- exchange-name: The name of the exchange to bind the queue to, Example if `test_application_queue` is bound to `test_application_exchange` then all messages which need to be sent to `test_application_queue`
will have to be sent to the `test_application_exchange` which will route it to the bound queue.
- dead-letter-exchange: The name of the dead-letter-exchange if specified will be bound the queue which will in turn route all messages which have exceeded the max retries to the specified dead-letter-exchange
get-messages-from-queue [impl queue-name ack? count]
This is used to fetch and deserialize them messages from the queue.
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- queue-name: this is the name of the queue from where to fetch messages
- ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption.
- count: `count` number of messages will be fetched from the queue. If not provided it will default to an arbitrary value, ideally `1`.
process-messages-from-queue [impl queue-name count processing-fn]
This is used to process messages from the queue. This method does not return the messages unlike `get-messages-from-queue`, instead it runs the `processing-fn` against every message consumed from the queue
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- queue-name: the name of the queue to consume messages from
- ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption.
- count: `count` number of messages will be consumed from the queue. If not provided it will default to an arbitrary value, ideally `1`.
start-subscriber [impl prefetch-count wrapped-mapper-fn queue-name]
This method is used to start subscribers for a specific queue
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- prefetch-count: The maximum number of unacknowledged messages that a consumer can receive at once
- wrapped-mapper-fn: This is the processing-fn which will run against every messaged consumed from the queue
- queue-name: This is the name of the queue to which a subscriber will subscribe to
consume-message [impl ch meta payload ack?]
; Todo should be removed as it is only being used for testing
This method deserializes the message consumed from the queue
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- ch: A Channel object
- meta: metadata containing information about the message payload
- payload: the message payload in byte-array format as fetched from the queue
- ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption.
"

(start-connection [impl config stream-routes])
(stop-connection [impl config stream-routes])
(publish
[impl exchange message-payload]
[impl exchange message-payload expiration])
(create-and-bind-queue
[impl queue-name exchange-name]
[impl queue-name exchange-name dead-letter-exchange])
(get-messages-from-queue
[impl queue-name ack?]
[impl queue-name ack? count])
(process-messages-from-queue [impl queue-name count processing-fn])
(start-subscriber [impl prefetch-count wrapped-mapper-fn queue-name])
(consume-message [impl ch meta payload ack?]))
Loading

0 comments on commit 769250c

Please sign in to comment.