Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GOJ-87311 Add producing support in Ziggurat via Kafka Producer #50

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,11 @@
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:enabled [true :bool]}}}}}
:producer {:default {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}}
9 changes: 8 additions & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,11 @@
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:enabled [true :bool]}}}}}
:producer {:default {:bootstrap-servers "localhost:9092"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this default key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kartik7153 default just refers to the default KafkaProducer config. Let me know if this needs change.

:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}}
9 changes: 6 additions & 3 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
[ziggurat.nrepl-server :as nrepl-server]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]))
[ziggurat.streams :as streams]
[ziggurat.producer :refer [kafka-producer]]))

(defstate statsd-reporter
:start (metrics/start-statsd-reporter (:datadog (ziggurat-config))
Expand Down Expand Up @@ -40,7 +41,8 @@
(messaging-consumer/start-subscribers stream-routes) ;; We want subscribers to start after creating queues on RabbitMQ.
(start* #{#'server/server
#'nrepl-server/server
#'streams/stream}
#'streams/stream
#'kafka-producer}
{:stream-routes stream-routes
:actor-routes actor-routes}))

Expand All @@ -52,7 +54,8 @@
#'messaging-connection/connection
#'server/server
#'nrepl-server/server
#'streams/stream)
#'streams/stream
#'kafka-producer)
(actor-stop-fn)
(mount/stop #'config/config))

Expand Down
80 changes: 80 additions & 0 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
(ns ziggurat.producer
"This namespace defines methods for publishing data to
Kafka topics. The methods defined here are essentially wrapper
around variants of `send` methods defined in
`org.apache.kafka.clients.producer.KafkaProducer`.

Users of Ziggurat can use methods defined here to send data
to Kafka topics.

Constructs an instance of `org.apache.kafka.clients.producer.KafkaProducer` using config values
provided in `resources/config.edn`. Producer configs currently supported in Ziggurat are
mentioned below.

- bootstrap.servers
- acks
- retries
- key.serializer
- value.serializer
- max.in.flight.requests.per.connection
- enable.idempotencecd

Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs)
for a complete list of all producer configs available in Kafka."

(:require [ziggurat.config :refer [ziggurat-config]]
[clojure.tools.logging :as log]
[mount.core :refer [defstate]])
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties)))

(defn producer-config [] (get-in (ziggurat-config) [:producer :default]))

(defn producer-config-properties []
(let [bootstrap-servers (:bootstrap-servers (producer-config))
acks (:acks (producer-config))
key-serializer-class (:key-serializer (producer-config))
value-serializer-class (:value-serializer (producer-config))
enable-idempotence (:enable-idempotence (producer-config))
retries-config (int (:retries-config (producer-config)))
max-in-flight-requests-per-connection (int (:max-in-flight-requests-per-connection (producer-config)))]
(doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers)
(.put ProducerConfig/ACKS_CONFIG acks)
(.put ProducerConfig/RETRIES_CONFIG retries-config)
(.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence)
(.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max-in-flight-requests-per-connection)
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer-class)
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer-class))))

(defstate kafka-producer
:start (do (log/info "Starting Kafka producer ...")
(KafkaProducer. (producer-config-properties)))
:stop (do (log/info "Stopping Kafka producer ...")
(doto kafka-producer
(.flush)
(.close))))

(defn send
"A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables
the users of ziggurat to produce data to a Kafka topic using a Kafka producer.

Please see `producer` for the list of config currently supported in ziggurat.

E.g.
(send \"test-topic\" \"value\")
(send \"test-topic\" 1 \"key\" \"value\")
"

([topic data]
(let [producer-record (ProducerRecord. topic data)]
(.send kafka-producer producer-record)))

([topic partition key data]
(let [producer-record (ProducerRecord. topic (int partition) key data)]
(.send kafka-producer producer-record))))





12 changes: 12 additions & 0 deletions test/ziggurat/fixtures.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.server :refer [server]]
[ziggurat.messaging.producer :as pr]
[ziggurat.producer :as producer]
[langohr.channel :as lch]
[langohr.exchange :as le]
[langohr.queue :as lq]))
Expand Down Expand Up @@ -91,3 +92,14 @@
(finally
(delete-queues ~stream-routes)
(delete-exchanges ~stream-routes))))

(defn mount-producer []
(-> (mount/only [#'producer/kafka-producer])
(mount/start)))

(defn mount-only-config-and-producer [f]
(do
(mount-config)
(mount-producer))
(f)
(mount/stop))
27 changes: 27 additions & 0 deletions test/ziggurat/producer_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
(ns ziggurat.producer-test
(:require [clojure.test :refer :all]
[flatland.protobuf.core :as proto]
[ziggurat.streams :refer [start-streams stop-streams]]
[ziggurat.fixtures :as fix]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.producer :as producer]))

(use-fixtures :once fix/mount-only-config-and-producer)

(deftest properties-map-is-generated-correctly
(is (= "localhost:9092" (.get (producer/producer-config-properties) "bootstrap.servers")))
(is (= "all" (.get (producer/producer-config-properties) "acks")))
(is (= 5 (.get (producer/producer-config-properties) "retries")))
(is (= 5 (.get (producer/producer-config-properties) "max.in.flight.requests.per.connection")))
(is (= "org.apache.kafka.common.serialization.StringSerializer" (.get (producer/producer-config-properties) "key.serializer")))
(is (= "org.apache.kafka.common.serialization.StringSerializer" (.get (producer/producer-config-properties) "value.serializer")))
(is (false? (.get (producer/producer-config-properties) "enable.idempotence"))))

(deftest send-data-with-topic-and-value-test
(let [topic "test-topic"
data "Hello World!!!"]
(let [future (producer/send topic data)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be in the last let only? like:

    (let [  topic "test-topic"
              data "Hello World!!!"
              future (producer/send topic data)]

also, what happens if the topic is nil? 🤔

Copy link
Contributor Author

@mjayprateek mjayprateek May 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@riteeks You're right. But, I think these two lets logically divide the code in two parts:

  1. First let setups the data for calling send()
  2. Second let simply acts on the result of calling send()

Mixing these two, IMHO, will lead to a less readable code.

send in ziggurat is just a wrapper around KafkaProducer#send. Thus, whatever arguments are passed will just be passed "as they are" to KafkaProducer. That class will be responsible for error handling and exceptions. Ziggurat's code is just a thin layer in-front
of KafkaProducer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixing these two, IMHO, will lead to a less readable code.

make sense.

send in ziggurat is just a wrapper around KafkaProducer#send. Thus, whatever arguments are passed will just be passed "as they are" to KafkaProducer. That class will be responsible for error handling and exceptions. Ziggurat's code is just a thin layer in-front
of KafkaProducer.

for this, I was just thinking of having a test for bad-path also for the ziggurat's send function. But I am not very sure how much value will this add.

(Thread/sleep 1000)
(is (-> future .get .partition (>= 0))))))