Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/gojek/ziggurat
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang.balkundi committed Jun 29, 2020
2 parents dbc257f + 5278d43 commit ae952f5
Show file tree
Hide file tree
Showing 22 changed files with 1,065 additions and 379 deletions.
40 changes: 20 additions & 20 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[mount.core :as mount :refer [defstate]]
[schema.core :as s]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.messaging.rabbitmq-wrapper :as rmqw]
[ziggurat.messaging.messaging :as messaging]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
[ziggurat.metrics :as metrics]
Expand All @@ -27,15 +27,15 @@
(mount/with-args args)
(mount/start))))

(defn- start-rabbitmq-connection [args]
(start* #{#'rmqw/connection} args))
(defn- start-messaging-connection [args]
(messaging/start-connection config/config (:stream-routes args)))

(defn- start-rabbitmq-consumers [args]
(start-rabbitmq-connection args)
(defn- start-messaging-consumers [args]
(start-messaging-connection args)
(messaging-consumer/start-subscribers (get args :stream-routes) (ziggurat-config)))

(defn- start-rabbitmq-producers [args]
(start-rabbitmq-connection args)
(defn- start-messaging-producers [args]
(start-messaging-connection args)
(messaging-producer/make-queues (get args :stream-routes)))

(defn start-kafka-producers []
Expand All @@ -46,24 +46,24 @@

(defn start-stream [args]
(start-kafka-producers)
(start-rabbitmq-producers args)
(start-messaging-producers args)
(start-kafka-streams args))

(defn start-management-apis [args]
(start-rabbitmq-connection args)
(start-messaging-connection args)
(start* #{#'server/server} (dissoc args :actor-routes)))

(defn start-server [args]
(start-rabbitmq-connection args)
(start-messaging-connection args)
(start* #{#'server/server} args))

(defn start-workers [args]
(start-kafka-producers)
(start-rabbitmq-producers args)
(start-rabbitmq-consumers args))
(start-messaging-producers args)
(start-messaging-consumers args))

(defn- stop-rabbitmq-connection []
(mount/stop #'rmqw/connection))
(defn- stop-messaging []
(messaging/stop-connection config/config (:stream-routes mount/args)))

(defn stop-kafka-producers []
(mount/stop #'kafka-producers))
Expand All @@ -72,21 +72,21 @@
(mount/stop #'streams/stream))

(defn stop-workers []
(stop-rabbitmq-connection)
(stop-messaging)
(stop-kafka-producers))

(defn stop-server []
(mount/stop #'server/server)
(stop-rabbitmq-connection))
(stop-messaging))

(defn stop-stream []
(stop-kafka-streams)
(stop-rabbitmq-connection)
(stop-messaging)
(stop-kafka-producers))

(defn stop-management-apis []
(mount/stop #'server/server)
(stop-rabbitmq-connection))
(stop-messaging))

(def valid-modes-fns
{:api-server {:start-fn start-server :stop-fn stop-server}
Expand Down Expand Up @@ -115,9 +115,9 @@
(defn stop-common-states []
(mount/stop #'config/config
#'metrics/statsd-reporter
#'rmqw/connection
#'nrepl-server/server
#'tracer/tracer))
#'tracer/tracer)
(stop-messaging))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
Expand Down
18 changes: 9 additions & 9 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(:require [ziggurat.mapper :as mpr]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.config :refer [ziggurat-config get-in-config]]
[ziggurat.messaging.rabbitmq-wrapper :as rmqw]
[ziggurat.messaging.messaging :as messaging]
[ziggurat.messaging.util :refer :all]
[ziggurat.metrics :as metrics]
[clojure.tools.logging :as log]
Expand Down Expand Up @@ -40,7 +40,7 @@
(assoc message-payload :retry-count retry-count)))))

(defn read-messages-from-queue [queue-name topic-entity ack? count]
(let [messages (rmqw/get-messages-from-queue queue-name ack? count)]
(let [messages (messaging/get-messages-from-queue queue-name ack? count)]
(for [message messages]
(if-not (nil? message)
(convert-to-message-payload message topic-entity)
Expand All @@ -56,7 +56,7 @@
(read-messages-from-queue (get-dead-set-queue-name topic-entity (ziggurat-config) channel) topic-entity false count))))

(defn process-messages-from-queue [queue-name count processing-fn]
(rmqw/process-messages-from-queue queue-name count processing-fn))
(messaging/process-messages-from-queue queue-name count processing-fn))

(defn process-dead-set-messages
"This method reads and processes `count` number of messages from RabbitMQ dead-letter queue for topic `topic-entity` and
Expand All @@ -70,19 +70,19 @@
(when (get-in ziggurat-config [:retry :enabled])
(dotimes [_ (get-in ziggurat-config [:jobs :instant :worker-count])]
(let [queue-name (get-instant-queue-name topic-entity ziggurat-config)]
(rmqw/start-subscriber (get-in ziggurat-config [:jobs :instant :prefetch-count])
(mpr/mapper-func mapper-fn channels)
queue-name)))))
(messaging/start-subscriber (get-in ziggurat-config [:jobs :instant :prefetch-count])
(mpr/mapper-func mapper-fn channels)
queue-name)))))

(defn start-channels-subscriber [channels topic-entity ziggurat-config]
(doseq [channel channels]
(let [channel-key (first channel)
channel-handler-fn (second channel)]
(dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count] 0)]
(let [queue-name (get-channel-instant-queue-name topic-entity channel-key ziggurat-config)]
(rmqw/start-subscriber 1
(mpr/channel-mapper-func channel-handler-fn channel-key)
queue-name))))))
(messaging/start-subscriber 1
(mpr/channel-mapper-func channel-handler-fn channel-key)
queue-name))))))

(defn start-subscribers
"Starts the subscriber to the instant queue of the rabbitmq"
Expand Down
64 changes: 64 additions & 0 deletions src/ziggurat/messaging/messaging.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
(ns ziggurat.messaging.messaging
(:require [ziggurat.config :refer [ziggurat-config]]
[ziggurat.messaging.rabbitmq-wrapper :as rmqw]
[ziggurat.messaging.messaging-interface :as messaging-interface]
[clojure.tools.logging :as log]))

(def messaging-impl (atom nil))

(defn get-implementation []
(if (nil? @messaging-impl)
(throw (Exception. "Messaging Library has not been initialized, please make sure the messaging library has been initialized"))
@messaging-impl))

(defn- get-messaging-implementor-constructor []
(if-let [configured-metrics-class-constructor (get-in (ziggurat-config) [:messaging :constructor])]
(let [configured-constructor-symbol (symbol configured-metrics-class-constructor)
constructor-namespace (namespace configured-constructor-symbol)
_ (require [(symbol constructor-namespace)])
metric-constructor (resolve configured-constructor-symbol)]
(if (nil? metric-constructor)
(throw (ex-info "Incorrect messaging_interface implementation constructor configured. Please fix it." {:constructor-configured configured-constructor-symbol}))
metric-constructor))
rmqw/->RabbitMQMessaging))

(defn initialise-messaging-library []
(let [messaging-impl-constructor (get-messaging-implementor-constructor)]
(reset! messaging-impl (messaging-impl-constructor))))

(defn start-connection [config stream-routes]
(initialise-messaging-library)
(log/info "Initialized Messaging Library")
(messaging-interface/start-connection (get-implementation) config stream-routes))

(defn stop-connection [config stream-routes]
(log/info "Stopping Messaging Library")
(messaging-interface/stop-connection (get-implementation) config stream-routes))

(defn create-and-bind-queue
([queue-name exchange-name]
(messaging-interface/create-and-bind-queue (get-implementation) queue-name exchange-name))
([queue-name exchange-name dead-letter-exchange]
(messaging-interface/create-and-bind-queue (get-implementation) queue-name exchange-name dead-letter-exchange)))

(defn publish
([exchange message-payload]
(messaging-interface/publish (get-implementation) exchange message-payload))
([exchange message-payload expiration]
(messaging-interface/publish (get-implementation) exchange message-payload expiration)))

(defn get-messages-from-queue
([queue-name ack?]
(messaging-interface/get-messages-from-queue (get-implementation) queue-name ack?))
([queue-name ack? count]
(messaging-interface/get-messages-from-queue (get-implementation) queue-name ack? count)))

(defn process-messages-from-queue [queue-name count processing-fn]
(messaging-interface/process-messages-from-queue (get-implementation) queue-name count processing-fn))

(defn start-subscriber [prefetch-count wrapped-mapper-fn queue-name]
(messaging-interface/start-subscriber (get-implementation) prefetch-count wrapped-mapper-fn queue-name))

(defn consume-message [ch meta payload ack?]
(messaging-interface/consume-message (get-implementation) ch meta payload ack?))

79 changes: 79 additions & 0 deletions src/ziggurat/messaging/messaging_interface.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
(ns ziggurat.messaging.messaging-interface)

(defprotocol MessagingProtocol
"A protocol that defines the interface for queue-based retry implementation libraries. Any type or class that implements this protocol can be passed to the messaging namespace to be used for retrying messages.
Example implementations for this protocol:
start-connection [config stream-routes]
This is used to initialize the messaging library so that it push messages to the retry backend.
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- config: It is the config map defined in the `config.edn` file. It includes the entire config map
- stream-routes: It is a map containing topic entities and their handler functions, channels and their handler functions
stop-connection [impl connection config stream-routes]
This is used to stop the messaging library and cleanup resources.
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- connection: The connection object that is used which can be used to close and cleanup network connections to the retry backend
- config: It is the config map defined in the `config.edn` file. It includes the entire config map
- stream-routes: It is a map containing topic entities and their handler functions, channels and their handler functions
create-and-bind-queue [impl queue-name exchange-name dead-letter-exchange]
This is used to create a queue and bind it to an exchange.
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- queue-name: The name of the queue which will be bound the exchange
- exchange-name: The name of the exchange to bind the queue to, Example if `test_application_queue` is bound to `test_application_exchange` then all messages which need to be sent to `test_application_queue`
will have to be sent to the `test_application_exchange` which will route it to the bound queue.
- dead-letter-exchange: The name of the dead-letter-exchange if specified will be bound the queue which will in turn route all messages which have exceeded the max retries to the specified dead-letter-exchange
get-messages-from-queue [impl queue-name ack? count]
This is used to fetch and deserialize them messages from the queue.
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- queue-name: this is the name of the queue from where to fetch messages
- ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption.
- count: `count` number of messages will be fetched from the queue. If not provided it will default to an arbitrary value, ideally `1`.
process-messages-from-queue [impl queue-name count processing-fn]
This is used to process messages from the queue. This method does not return the messages unlike `get-messages-from-queue`, instead it runs the `processing-fn` against every message consumed from the queue
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- queue-name: the name of the queue to consume messages from
- ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption.
- count: `count` number of messages will be consumed from the queue. If not provided it will default to an arbitrary value, ideally `1`.
start-subscriber [impl prefetch-count wrapped-mapper-fn queue-name]
This method is used to start subscribers for a specific queue
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- prefetch-count: The maximum number of unacknowledged messages that a consumer can receive at once
- wrapped-mapper-fn: This is the processing-fn which will run against every messaged consumed from the queue
- queue-name: This is the name of the queue to which a subscriber will subscribe to
consume-message [impl ch meta payload ack?]
; Todo should be removed as it is only being used for testing
This method deserializes the message consumed from the queue
Args:
- impl: the class object that implements this protocol, i.e. an instance of the deftype for example
- ch: A Channel object
- meta: metadata containing information about the message payload
- payload: the message payload in byte-array format as fetched from the queue
- ack?: a boolean flag which indicates if the message will be acknowledged upon consumption, ack-ing the message should discard it from the queue preventing a re-consumption.
"

(start-connection [impl config stream-routes])
(stop-connection [impl config stream-routes])
(publish
[impl exchange message-payload]
[impl exchange message-payload expiration])
(create-and-bind-queue
[impl queue-name exchange-name]
[impl queue-name exchange-name dead-letter-exchange])
(get-messages-from-queue
[impl queue-name ack?]
[impl queue-name ack? count])
(process-messages-from-queue [impl queue-name count processing-fn])
(start-subscriber [impl prefetch-count wrapped-mapper-fn queue-name])
(consume-message [impl ch meta payload ack?]))
Loading

0 comments on commit ae952f5

Please sign in to comment.