Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Anmol Vijaywargiya committed Aug 13, 2021
1 parent 6dfcb14 commit ce2d6b6
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
9 changes: 5 additions & 4 deletions test/ziggurat/messaging/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,10 @@

(deftest convert-and-ack-message-test
(let [message {:id 7 :path "/photos/h2k3j4h9h23"}
metadata {:topic "x" :partition 1 :timestamp 123}
proto-class Example$Photo
proto-message (proto/->bytes (proto/create proto-class message))
message-payload {:message proto-message :topic-entity topic-entity :retry-count 3}]
message-payload {:message proto-message :topic-entity topic-entity :retry-count 3 :metadata metadata}]
(testing "should return deserialized message-payload if serialized using protobuf"
(let [proto-serialized-message-payload (zmd/serialize-to-message-payload-proto message-payload)
converted-message-payload (consumer/convert-and-ack-message nil {:delivery-tag 1} proto-serialized-message-payload false "default")]
Expand All @@ -333,13 +334,13 @@
(let [nippy-message-payload (assoc (zmp/->MessagePayload proto-message topic-entity) :retry-count 3)
nippy-serialized-message-payload (nippy/freeze nippy-message-payload)
converted-message-payload (consumer/convert-and-ack-message nil {:delivery-tag 1} nippy-serialized-message-payload false "default")]
(is (= (bytes-to-str converted-message-payload) (bytes-to-str message-payload)))))
(is (= (bytes-to-str converted-message-payload) (bytes-to-str (into {} nippy-message-payload))))))
(testing "should return a map and not ziggurat.mapper/->MessagePayload if original message was a ziggurat.mapper/->MessagePayload and serialized using nippy"
(let [nippy-message-payload (assoc (mpr/->MessagePayload proto-message topic-entity) :retry-count 3)
nippy-serialized-message-payload (nippy/freeze nippy-message-payload)
converted-message-payload (consumer/convert-and-ack-message nil {:delivery-tag 1} nippy-serialized-message-payload false "default")]
(is (= (bytes-to-str converted-message-payload) (bytes-to-str message-payload)))))
(testing "should return a clojure map if serialized using nippy"
(is (= (bytes-to-str converted-message-payload) (bytes-to-str (into {} nippy-message-payload))))))
(testing "should return nil if the message is serialized using neither proto or nippy"
(let [random-bytes-as-message-payload (.getBytes (String. "Hello World"))
converted-message-payload (consumer/convert-and-ack-message nil {:delivery-tag 1} random-bytes-as-message-payload false "default")]
(is (= converted-message-payload nil))))))
5 changes: 4 additions & 1 deletion test/ziggurat/messaging/dead_set_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
fix/mount-metrics]))

(def topic-entity :default)
(def default-message-payload {:message (.getBytes "foo-bar") :topic-entity topic-entity :retry-count 1})
(def default-message-payload {:message (.getBytes "foo-bar")
:topic-entity topic-entity
:retry-count 1
:metadata {:topic "x" :partition 1 :timestamp 123}})

(deftest replay-test
(testing "puts message from dead set to instant queue"
Expand Down
17 changes: 15 additions & 2 deletions test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
fix/silence-logging]))

(def topic-entity :default)
(def message-payload {:message (.getBytes "hello-world") :topic-entity topic-entity})
(def message-payload {:message (.getBytes "hello-world") :topic-entity topic-entity :metadata {:topic "x" :partition 1 :timestamp 123}})
(defn retry-count-config [] (-> (ziggurat-config) :retry :count))

(deftest retry-for-channel-test
Expand Down Expand Up @@ -158,6 +158,8 @@
(let [message-from-mq (rmq/get-msg-from-dead-queue "default")]
(is (= nil message-from-mq)))
(let [message-from-mq (rmq/get-msg-from-delay-queue "default")]
(is (= nil message-from-mq)))
(let [message-from-mq (rmq/get-msg-from-instant-queue "default")]
(is (= nil message-from-mq))))))

(testing "publish to delay queue publishes with expiration from config"
Expand Down Expand Up @@ -220,6 +222,17 @@
:retry {:count 5
:enabled true
:type :exponential}))]

(testing "message with available retry counts same as that of ziggurat-config will be published to delay queue with suffix 1"
(fix/with-queues
{:default {:handler-fn #(constantly nil)}}
(let [retry-message-payload (assoc message-payload :retry-count (get-in (ziggurat-config) [:retry :count]))
expected-message (assoc message-payload :retry-count 4)]
(producer/retry retry-message-payload)
(let [message-from-mq (rmq/get-message-from-retry-queue "default" 1)]
(is (= (bytes-to-str message-from-mq) (bytes-to-str expected-message)))))))


(testing "message with available retry counts as 4 will be published to delay queue with suffix 2"
(fix/with-queues
{:default {:handler-fn #(constantly nil)}}
Expand Down Expand Up @@ -612,7 +625,7 @@
(let [serialize-called (atom false)]
(with-redefs [zmd/serialize-to-message-payload-proto (fn [_] (reset! serialize-called true))
lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _])]
lb/publish (fn [_ _ _ _ _] nil)]
(producer/publish "exchange" message-payload)
(is (true? @serialize-called))))))

Expand Down

0 comments on commit ce2d6b6

Please sign in to comment.