Skip to content

Commit

Permalink
Merge 23165fc into 3fc8d25
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang93 committed Apr 13, 2022
2 parents 3fc8d25 + 23165fc commit d7c1aa4
Show file tree
Hide file tree
Showing 16 changed files with 546 additions and 264 deletions.
5 changes: 3 additions & 2 deletions project.clj
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.5.3"
(defproject tech.gojek/ziggurat "4.6.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down Expand Up @@ -57,7 +57,8 @@
[ch.qos.logback/logback-classic "1.2.9"]
[ch.qos.logback.contrib/logback-json-classic "0.1.5"]
[ch.qos.logback.contrib/logback-jackson "0.1.5"]
[net.logstash.logback/logstash-logback-encoder "6.6"]]
[net.logstash.logback/logstash-logback-encoder "6.6"]
[org.apache.commons/commons-pool2 "2.11.1"]]
:deploy-repositories [["clojars" {:url "https://clojars.org/repo"
:username :env/clojars_username
:password :env/clojars_password
Expand Down
50 changes: 50 additions & 0 deletions src/com/gojek/rabbitmq/channel_pool/RabbitMQChannelFactory.java
@@ -0,0 +1,50 @@
package gojek.rabbitmq.channel_pool;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.DestroyMode;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitMQChannelFactory extends BasePooledObjectFactory<Channel> {
Logger log = LoggerFactory.getLogger(this.getClass());

private final Connection connection;

public RabbitMQChannelFactory(Connection connection) {
this.connection = connection;
}

@Override
public Channel create() throws Exception {
Channel channel = connection.createChannel();
log.info("Created a new channel with id: " + channel.getChannelNumber());
return channel;
}


@Override
public boolean validateObject(PooledObject<Channel> p) {
boolean open = p.getObject().isOpen();
if (!open) {
log.info("Channel is closed, invalidating channel with id " + p.getObject().getChannelNumber());
}
return open;
}

@Override
public PooledObject<Channel> wrap(Channel obj) {
return new DefaultPooledObject<>(obj);
}


@Override
public void destroyObject(PooledObject<Channel> p, DestroyMode destroyMode) throws Exception {
super.destroyObject(p, destroyMode);
log.info("Closing the channel with id: " + p.getObject().getChannelNumber());
p.getObject().close();
}
}
6 changes: 3 additions & 3 deletions src/ziggurat/config.clj
Expand Up @@ -167,7 +167,7 @@
(str/trim
(cond
(keyword? v) (name v)
:else (str v))))
:else (str v))))

(defn set-property
[mapping-table p k v]
Expand All @@ -191,8 +191,8 @@
(defn- add-jaas-properties
[properties jaas-config]
(if (some? jaas-config)
(let [username (get jaas-config :username)
password (get jaas-config :password)
(let [username (get jaas-config :username)
password (get jaas-config :password)
mechanism (get jaas-config :mechanism)]
(doto properties
(.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties username password mechanism))))
Expand Down
24 changes: 16 additions & 8 deletions src/ziggurat/init.clj
Expand Up @@ -8,22 +8,24 @@
[mount.core :as mount]
[schema.core :as s]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.messaging.channel_pool :as cpool]
[ziggurat.kafka-consumer.consumer-driver :as consumer-driver]
[ziggurat.kafka-consumer.executor-service :as executor-service]
[ziggurat.messaging.connection :as messaging-connection :refer [connection]]
[ziggurat.messaging.connection :as messaging-connection :refer [producer-connection, consumer-connection]]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
[ziggurat.metrics :as metrics]
[ziggurat.nrepl-server :as nrepl-server]
[ziggurat.producer :as producer :refer [kafka-producers]]
[ziggurat.producer :refer [kafka-producers]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.tracer :as tracer]
[ziggurat.util.java-util :as util])
(:gen-class
:methods [^{:static true} [init [java.util.Map] void]]
:name tech.gojek.ziggurat.internal.Init))
:name tech.gojek.ziggurat.internal.Init)
(:import (clojure.lang ExceptionInfo)))

(defn- event-routes [args]
(merge (:stream-routes args) (:batch-routes args)))
Expand All @@ -37,7 +39,9 @@
(mount/start))))

(defn- start-rabbitmq-connection [args]
(start* #{#'messaging-connection/connection} args))
(start* #{#'consumer-connection
#'producer-connection
#'cpool/channel-pool} args))

(defn- start-rabbitmq-consumers [args]
(start-rabbitmq-connection args)
Expand Down Expand Up @@ -76,7 +80,10 @@
(start-rabbitmq-consumers args))

(defn- stop-rabbitmq-connection []
(mount/stop #'connection))
(-> (mount/only #{#'producer-connection
#'cpool/channel-pool
#'consumer-connection})
(mount/stop)))

(defn stop-kafka-producers []
(mount/stop #'kafka-producers))
Expand Down Expand Up @@ -149,7 +156,8 @@
(defn stop-common-states []
(mount/stop #'config/config
#'metrics/statsd-reporter
#'connection
#'producer-connection
#'consumer-connection
#'nrepl-server/server
#'tracer/tracer))

Expand Down Expand Up @@ -225,8 +233,8 @@
(throw (IllegalArgumentException. "Either :stream-routes or :batch-routes should be present in init args")))
(cond-> base-modes
(some? stream-routes) (conj :stream-worker)
(some? batch-routes) (conj :batch-worker)
(some? actor-routes) (conj :api-server))))
(some? batch-routes) (conj :batch-worker)
(some? actor-routes) (conj :api-server))))

(defn validate-modes [modes stream-routes batch-routes actor-routes]
(let [derived-modes (if-not (empty? modes)
Expand Down
51 changes: 51 additions & 0 deletions src/ziggurat/messaging/channel_pool.clj
@@ -0,0 +1,51 @@
(ns ziggurat.messaging.channel_pool
(:require [mount.core :refer [defstate]]
[ziggurat.config :as zc]
[ziggurat.messaging.connection :as c]
[cambium.core :as clog]
[clojure.tools.logging :as log])
(:import (com.rabbitmq.client Connection)
(org.apache.commons.pool2.impl GenericObjectPool GenericObjectPoolConfig)
(java.time Duration)
(gojek.rabbitmq.channel_pool RabbitMQChannelFactory)))

(defn calc-total-thread-count []
(let [rmq-thread-count (c/total-thread-count)
stream-router-config (get (zc/ziggurat-config) :stream-router)
batch-routes-config (get (zc/ziggurat-config) :batch-routes)
batch-consumer-thread-count (reduce (fn [sum config]
(+ sum (:thread-count config))) 0 (vals batch-routes-config))
stream-thread-count (reduce (fn [sum config]
(+ sum (:stream-threads-count config))) 0 (vals stream-router-config))]
(clog/info {:channel-threads rmq-thread-count
:batch-consumer-threads batch-consumer-thread-count
:stream-threads stream-thread-count} "Thread counts")
(+ stream-thread-count rmq-thread-count batch-consumer-thread-count)))

(defn create-object-pool-config [config]
(let [standby-size 10
total-thread-count (calc-total-thread-count)
merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle total-thread-count} config)]
(doto (GenericObjectPoolConfig.)
(.setMaxWait (Duration/ofMillis (:max-wait-ms merged-config)))
(.setMinIdle (:min-idle merged-config))
(.setMaxIdle (:max-idle merged-config))
(.setMaxTotal (+ (:min-idle merged-config) total-thread-count))
(.setTestOnBorrow true)
(.setJmxNamePrefix "zig-rabbitmq-ch-pool"))))

(defn create-channel-pool [^Connection connection]
(let [pool-config (create-object-pool-config (get-in zc/ziggurat-config [:rabbit-mq-connection :channel-pool]))
rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)]
rmq-chan-pool))

(defn destroy-channel-pool [channel-pool]
(.close channel-pool))

(declare channel-pool)

(defstate channel-pool
:start (do (log/info "Creating channel pool")
(create-channel-pool c/producer-connection))
:stop (do (log/info "Stopping channel pool")
(destroy-channel-pool channel-pool)))
59 changes: 45 additions & 14 deletions src/ziggurat/messaging/connection.clj
Expand Up @@ -28,27 +28,50 @@
(reduce (fn [sum [_ channel-config]]
(+ sum (:worker-count channel-config))) 0 channels))

(defn- total-thread-count []
(let [stream-routes (:stream-router (ziggurat-config))
worker-count (get-in (ziggurat-config) [:jobs :instant :worker-count])]
(defn total-thread-count []
(let [stream-routes (:stream-router (ziggurat-config))
batch-route-count (count (:batch-routes (ziggurat-config)))
worker-count (get-in (ziggurat-config) [:jobs :instant :worker-count] 0)
batch-routes-instant-workers (* batch-route-count worker-count)]
(reduce (fn [sum [_ route-config]]
(+ sum (channel-threads (:channels route-config)) worker-count)) 0 stream-routes)))
(+ sum (channel-threads (:channels route-config)) worker-count))
batch-routes-instant-workers
stream-routes)))

(defn- create-traced-connection [config]
(let [connection-factory (TracingConnectionFactory. tracer)]
(.setCredentialsProvider connection-factory (DefaultCredentialsProvider. (:username config) (:password config)))
(.newConnection connection-factory ^ExecutorService (:executor config) ^ListAddressResolver (ListAddressResolver. (map #(Address. %) (util/list-of-hosts config))))))
(if (some? (:executor config))
(.newConnection connection-factory ^ExecutorService (:executor config) ^ListAddressResolver (ListAddressResolver. (map #(Address. %) (util/list-of-hosts config))))
(.newConnection connection-factory ^ListAddressResolver (ListAddressResolver. (map #(Address. %) (util/list-of-hosts config)))))))

(defn- get-tracer-config []
(get-in (ziggurat-config) [:tracer :enabled]))

(defn create-connection [config tracer-enabled]
(if tracer-enabled
(create-traced-connection config)
(rmq/connect (assoc config :hosts (util/list-of-hosts config)))))

(defn- start-connection []
(defn- get-connection-config
[is-producer?]
(if is-producer?
(assoc (:rabbit-mq-connection (ziggurat-config))
:connection-name "producer")
(assoc (:rabbit-mq-connection (ziggurat-config))
:executor (Executors/newFixedThreadPool (total-thread-count))
:connection-name "consumer")))

(defn- start-connection
"is-producer? - defines whether the connection is being created for producers or consumers
producer connections do not require the :executor option"
[is-producer?]
(log/info "Connecting to RabbitMQ")
(when (is-connection-required?)
(try
(let [connection (create-connection (assoc (:rabbit-mq-connection (ziggurat-config)) :executor (Executors/newFixedThreadPool (total-thread-count))) (get-in (ziggurat-config) [:tracer :enabled]))]
(let
[connection (create-connection (get-connection-config is-producer?) (get-tracer-config))]
(println "Connection created " connection)
(doto connection
(.addShutdownListener
(reify ShutdownListener
Expand All @@ -61,11 +84,19 @@

(defn- stop-connection [conn]
(when (is-connection-required?)
(if (get-in (ziggurat-config) [:tracer :enabled])
(.close conn)
(rmq/close conn))
(log/info "Disconnected from RabbitMQ")))
(log/info "Closing the RabbitMQ connection")
(rmq/close conn)))

(declare consumer-connection)
(defstate consumer-connection
:start (do (log/info "Creating consumer connection")
(start-connection false))
:stop (do (log/info "Stopping consume connection")
(stop-connection consumer-connection)))

(defstate connection
:start (start-connection)
:stop (stop-connection connection))
(declare producer-connection)
(defstate producer-connection
:start (do (log/info "Creating producer connection")
(start-connection true))
:stop (do (log/info "Stopping producer connection")
(stop-connection producer-connection)))
29 changes: 16 additions & 13 deletions src/ziggurat/messaging/consumer.clj
Expand Up @@ -5,9 +5,10 @@
[langohr.consumers :as lcons]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [get-in-config rabbitmq-config]]
[ziggurat.messaging.channel_pool :as cpool]
[ziggurat.kafka-consumer.consumer-handler :as ch]
[ziggurat.mapper :as mpr]
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.messaging.connection :refer [consumer-connection, producer-connection]]
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics]
[ziggurat.util.error :refer [report-error]]
Expand All @@ -18,14 +19,16 @@
(lb/reject ch delivery-tag))

(defn- publish-to-dead-set
[ch delivery-tag topic-entity payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
exchange (util/prefixed-queue-name topic-entity exchange-name)]
[delivery-tag topic-entity payload]
(let [ch (.borrowObject cpool/channel-pool)
{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
exchange (util/prefixed-queue-name topic-entity exchange-name)]
(try
(lb/publish ch exchange "" payload)
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(reject-message ch delivery-tag)))))
(reject-message ch delivery-tag))
(finally (.returnObject cpool/channel-pool ch)))))

(defn convert-and-ack-message
"De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
Expand All @@ -38,7 +41,7 @@
message)
(catch Exception e
(report-error e "Error while decoding message, publishing to dead queue...")
(publish-to-dead-set ch delivery-tag topic-entity payload)
(publish-to-dead-set delivery-tag topic-entity payload)
(metrics/increment-count ["rabbitmq-message" "conversion"] "failure" {:topic_name (name topic-entity)})
nil)))

Expand All @@ -56,7 +59,7 @@
(processing-fn message-payload)
(ack-message ch delivery-tag)
(catch Exception e
(publish-to-dead-set ch delivery-tag topic-entity payload)
(publish-to-dead-set delivery-tag topic-entity payload)
(report-error e "Error while processing message-payload from RabbitMQ")
(metrics/increment-count ["rabbitmq-message" "process"] "failure" {:topic_name (name topic-entity)}))))))

Expand Down Expand Up @@ -85,7 +88,7 @@
(get-dead-set-messages topic-entity nil count))
([topic-entity channel count]
(remove nil?
(with-open [ch (lch/open connection)]
(with-open [ch (lch/open consumer-connection)]
(doall (for [_ (range count)]
(read-message-from-queue ch (construct-queue-name topic-entity channel) topic-entity false)))))))

Expand All @@ -95,9 +98,9 @@
([topic-entity count processing-fn]
(process-dead-set-messages topic-entity nil count processing-fn))
([topic-entity channel count processing-fn]
(with-open [ch (lch/open connection)]
(with-open [ch (lch/open consumer-connection)]
(doall (for [_ (range count)]
(let [queue-name (construct-queue-name topic-entity channel)
(let [queue-name (construct-queue-name topic-entity channel)
[meta payload] (lb/get ch queue-name false)]
(when (some? payload)
(process-message-from-queue ch meta payload topic-entity processing-fn))))))))
Expand All @@ -106,7 +109,7 @@
"This method deletes `count` number of messages from RabbitMQ dead-letter queue for topic `topic-entity` and channel
`channel`."
[topic-entity channel count]
(with-open [ch (lch/open connection)]
(with-open [ch (lch/open producer-connection)]
(let [queue-name (construct-queue-name topic-entity channel)]
(doall (for [_ (range count)]
(lb/get ch queue-name true))))))
Expand All @@ -128,7 +131,7 @@
(defn start-retry-subscriber* [handler-fn topic-entity]
(when (get-in-config [:retry :enabled])
(dotimes [_ (get-in-config [:jobs :instant :worker-count])]
(start-subscriber* (lch/open connection)
(start-subscriber* (lch/open consumer-connection)
(get-in-config [:jobs :instant :prefetch-count])
(util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name]))
handler-fn
Expand All @@ -139,7 +142,7 @@
(let [channel-key (first channel)
channel-handler-fn (second channel)]
(dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])]
(start-subscriber* (lch/open connection)
(start-subscriber* (lch/open consumer-connection)
1
(util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
(mpr/channel-mapper-func channel-handler-fn channel-key)
Expand Down

0 comments on commit d7c1aa4

Please sign in to comment.