Skip to content

Commit

Permalink
using DNS address resolver for resolving RabbitMQ connection (#258)
Browse files Browse the repository at this point in the history
* temp dns resolver changes

* Using DNSAddressResolver as default for creating rmq connection

* Using DnsRecordAddressResolver for creating rabbitmq connections

* linting fix

Co-authored-by: prateek.khatri <prateek.khatri@gojek.com>
  • Loading branch information
mjayprateek and prateek.khatri committed Apr 22, 2022
1 parent eb6ba35 commit 94f4619
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 66 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,18 @@ Ziggurat Config | Default Value | Description | Mandatory?
:prefetch-count [3 :int]
:username "guest"
:password "guest"
:channel-timeout [2000 :int]}}}}
:channel-timeout [2000 :int]
:address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]. Default is :dns
:channel-pool {:max-wait-ms [5000 :int]
:min-idle [10 :int]
:max-idle [20 :int]}}}}}
```

- `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs).
- `:port` specifies the port number on which the RabbitMQ nodes are running.
- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster
- `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing
- `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud
be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided.

## Exponential Backoff based Retries

Expand Down
3 changes: 2 additions & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
:port [5672 :int]
:username "guest"
:password "guest"
:channel-timeout [2000 :int]}
:channel-timeout [2000 :int]
:address-resolver [:dns :keyword]} ;;possible values [:dns :ip-list]
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
:rabbit-mq {:delay {:queue-name "application_name_delay_queue_test"
Expand Down
26 changes: 13 additions & 13 deletions src/ziggurat/messaging/connection.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[ziggurat.messaging.util :as util]
[clojure.string :as str]
[ziggurat.util.error :refer [report-error]])
(:import [com.rabbitmq.client ShutdownListener Address ListAddressResolver]
(:import [com.rabbitmq.client ShutdownListener Address ListAddressResolver ConnectionFactory DnsRecordIpAddressResolver AddressResolver]
[java.util.concurrent Executors ExecutorService]
[io.opentracing.contrib.rabbitmq TracingConnectionFactory]
[com.rabbitmq.client.impl DefaultCredentialsProvider]))
Expand Down Expand Up @@ -38,20 +38,19 @@
batch-routes-instant-workers
stream-routes)))

(defn- create-traced-connection [config]
(let [connection-factory (TracingConnectionFactory. tracer)]
(.setCredentialsProvider connection-factory (DefaultCredentialsProvider. (:username config) (:password config)))
(if (some? (:executor config))
(.newConnection connection-factory ^ExecutorService (:executor config) ^ListAddressResolver (ListAddressResolver. (map #(Address. %) (util/list-of-hosts config))))
(.newConnection connection-factory ^ListAddressResolver (ListAddressResolver. (map #(Address. %) (util/list-of-hosts config)))))))

(defn- get-tracer-config []
(get-in (ziggurat-config) [:tracer :enabled]))
(defn create-rmq-connection
[connection-factory rabbitmq-config]
(.setCredentialsProvider connection-factory (DefaultCredentialsProvider. (:username rabbitmq-config) (:password rabbitmq-config)))
(if (some? (:executor rabbitmq-config))
(.newConnection connection-factory
^ExecutorService (:executor rabbitmq-config)
^AddressResolver (util/create-address-resolver rabbitmq-config))
(.newConnection connection-factory ^AddressResolver (util/create-address-resolver rabbitmq-config))))

(defn create-connection [config tracer-enabled]
(if tracer-enabled
(create-traced-connection config)
(rmq/connect (assoc config :hosts (util/list-of-hosts config)))))
(create-rmq-connection (TracingConnectionFactory. tracer) config)
(create-rmq-connection (ConnectionFactory.) config)))

(defn- get-connection-config
[is-producer?]
Expand All @@ -70,7 +69,8 @@
(when (is-connection-required?)
(try
(let
[connection (create-connection (get-connection-config is-producer?) (get-tracer-config))]
[is-tracer-enabled? (get-in (ziggurat-config) [:tracer :enabled])
connection (create-connection (get-connection-config is-producer?) is-tracer-enabled?)]
(println "Connection created " connection)
(doto connection
(.addShutdownListener
Expand Down
21 changes: 16 additions & 5 deletions src/ziggurat/messaging/util.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns ziggurat.messaging.util
(:require [clojure.string :as str]))
(:require [clojure.string :as str])
(:import (com.rabbitmq.client DnsRecordIpAddressResolver ListAddressResolver Address)))

(defn prefixed-queue-name [topic-entity value]
(str (name topic-entity) "_" value))
Expand All @@ -18,7 +19,17 @@
keys))

(defn list-of-hosts [config]
(let [{:keys [host hosts]} config]
(if hosts
(str/split hosts #",")
[host])))
(let [{:keys [host hosts]} config
rabbitmq-hosts (if (some? hosts)
hosts
host)]
(str/split rabbitmq-hosts #",")))

(defn create-address-resolver
[rabbitmq-config]
(let [host (:hosts rabbitmq-config)
port (:port rabbitmq-config)
address-resolver (get rabbitmq-config :address-resolver :dns)]
(if (= address-resolver :dns)
(DnsRecordIpAddressResolver. ^String host ^int port)
(ListAddressResolver. (map #(Address. %) (list-of-hosts rabbitmq-config))))))
88 changes: 44 additions & 44 deletions test/ziggurat/messaging/connection_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
[langohr.core :as rmq]
[mount.core :as mount]
[ziggurat.config :as config]
[ziggurat.messaging.connection :as mc :refer [producer-connection, consumer-connection, create-connection]]
[ziggurat.messaging.connection :as mc :refer [producer-connection, consumer-connection, create-connection create-rmq-connection]]
[ziggurat.util.error :refer [report-error]]))

(use-fixtures :once fix/mount-config-with-tracer)

(deftest connection-test
(testing "creates thread-pool for consumer connection"
(let [executor-present? (atom false)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)
stream-routes {:default {:handler-fn (constantly :channel-1)
:channel-1 (constantly :success)}}]
(with-redefs [rmq/connect (fn [provided-config]
(with-redefs [create-rmq-connection (fn [cf provided-config]
(when (some? (:executor provided-config))
(reset! executor-present? true))
(orig-rmq-connect provided-config))
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (assoc ziggurat-config
:jobs {:instant {:worker-count 4}}
:retry {:enabled true}
Expand All @@ -32,14 +32,14 @@
(is @executor-present?))))
(testing "does not create thread-pool for producer connection"
(let [executor-present? (atom false)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)
stream-routes {:default {:handler-fn (constantly :channel-1)
:channel-1 (constantly :success)}}]
(with-redefs [rmq/connect (fn [provided-config]
(when (some? (:executor provided-config))
(reset! executor-present? true))
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(when (some? (:executor provided-config))
(reset! executor-present? true))
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (assoc ziggurat-config
:jobs {:instant {:worker-count 4}}
:retry {:enabled true}
Expand All @@ -54,15 +54,15 @@
(deftest start-connection-test-with-tracer-disabled
(testing "[consumer-connection] should provide the correct number of threads for the thread pool if channels are present"
(let [thread-count (atom 0)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
rmq-connect-called? (atom false)
ziggurat-config (config/ziggurat-config)
stream-routes {:default {:handler-fn (constantly :channel-1)
:channel-1 (constantly :success)}}]
(with-redefs [rmq/connect (fn [provided-config]
(reset! rmq-connect-called? true)
(reset! thread-count (.getCorePoolSize (:executor provided-config)))
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(reset! rmq-connect-called? true)
(reset! thread-count (.getCorePoolSize (:executor provided-config)))
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (assoc ziggurat-config
:jobs {:instant {:worker-count 4}}
:retry {:enabled true}
Expand All @@ -78,14 +78,14 @@
(testing "[consumer-connection] if retry is enabled and channels are not present it should create connection"
(let [thread-count (atom 0)
rmq-connect-called? (atom false)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)
stream-routes {:default {:handler-fn (constantly :success)}}]
(with-redefs [rmq/connect (fn [provided-config]
(.getCorePoolSize (:executor provided-config))
(reset! rmq-connect-called? true)
(reset! thread-count (.getCorePoolSize (:executor provided-config)))
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(.getCorePoolSize (:executor provided-config))
(reset! rmq-connect-called? true)
(reset! thread-count (.getCorePoolSize (:executor provided-config)))
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (assoc ziggurat-config
:jobs {:instant {:worker-count 4}}
:retry {:enabled true}
Expand All @@ -100,12 +100,12 @@

(testing "if retry is enabled and channels are not present it should create connection"
(let [rmq-connect-called? (atom false)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)
stream-routes {:default {:handler-fn (constantly :success)}}]
(with-redefs [rmq/connect (fn [provided-config]
(reset! rmq-connect-called? true)
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(reset! rmq-connect-called? true)
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (assoc ziggurat-config
:retry {:enabled true}
:tracer {:enabled false}))]
Expand All @@ -117,12 +117,12 @@

(testing "if retry is disabled and channels are not present it should not create connection"
(let [rmq-connect-called? (atom false)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)
stream-routes {:default {:handler-fn (constantly :success)}}]
(with-redefs [rmq/connect (fn [provided-config]
(reset! rmq-connect-called? true)
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(reset! rmq-connect-called? true)
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (-> ziggurat-config
(assoc :retry {:enabled false})
(dissoc :tracer)))]
Expand All @@ -134,12 +134,12 @@

(testing "[consumer-connection] if retry is disabled and channels are not present it should not create connection"
(let [rmq-connect-called? (atom false)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)
stream-routes {:default {:handler-fn (constantly :success)}}]
(with-redefs [rmq/connect (fn [provided-config]
(reset! rmq-connect-called? true)
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(reset! rmq-connect-called? true)
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (-> ziggurat-config
(assoc :retry {:enabled false})
(dissoc :tracer)))]
Expand All @@ -151,15 +151,15 @@

(testing "if retry is disabled and channels are present it should create connection"
(let [rmq-connect-called? (atom false)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)
stream-routes {:default {:handler-fn (constantly :channel-1)
:channel-1 (constantly :success)}
:default-1 {:handler-fn (constantly :channel-3)
:channel-3 (constantly :success)}}]
(with-redefs [rmq/connect (fn [provided-config]
(reset! rmq-connect-called? true)
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(reset! rmq-connect-called? true)
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (assoc ziggurat-config
:retry {:enabled false}
:tracer {:enabled false}))]
Expand All @@ -171,11 +171,11 @@

(testing "[consumer-connection] should provide the correct number of threads for the thread pool when channels are not present"
(let [thread-count (atom 0)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)]
(with-redefs [rmq/connect (fn [provided-config]
(reset! thread-count (.getCorePoolSize (:executor provided-config)))
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(reset! thread-count (.getCorePoolSize (:executor provided-config)))
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (assoc ziggurat-config
:jobs {:instant {:worker-count 4}}
:retry {:enabled true}
Expand All @@ -187,11 +187,11 @@

(testing "should provide the correct number of threads for the thread pool for multiple stream routes"
(let [thread-count (atom 0)
orig-rmq-connect rmq/connect
orig-rmq-connect create-rmq-connection
ziggurat-config (config/ziggurat-config)]
(with-redefs [rmq/connect (fn [provided-config]
(reset! thread-count (.getCorePoolSize (:executor provided-config)))
(orig-rmq-connect provided-config))
(with-redefs [create-rmq-connection (fn [cf provided-config]
(reset! thread-count (.getCorePoolSize (:executor provided-config)))
(orig-rmq-connect cf provided-config))
config/ziggurat-config (constantly (assoc ziggurat-config
:jobs {:instant {:worker-count 4}}
:retry {:enabled true}
Expand Down

0 comments on commit 94f4619

Please sign in to comment.