Skip to content

Commit

Permalink
Extended producing support to multiple producers
Browse files Browse the repository at this point in the history
* User can define producer config inside a stream-router config
* User can publish data to any producer by calling send
	method with the stream-router config key against which
	producer config is defined.
* Few unit tests
  • Loading branch information
mjayprateek committed Jun 12, 2019
1 parent 3c68b22 commit d355ae5
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 101 deletions.
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
[junit/junit "4.12"]
[org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "1.1.1" :classifier "test"]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]
[org.clojure/test.check "0.9.0"]]
:plugins [[lein-cloverage "1.0.13"]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[jonase/eastwood "0.2.6"]
Expand Down
38 changes: 23 additions & 15 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,26 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:producer {:default {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
38 changes: 23 additions & 15 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,26 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:producer {:default {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
8 changes: 5 additions & 3 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.producer :refer [kafka-producer]]))
[ziggurat.producer :as producer :refer [kafka-producers]]))

(defstate statsd-reporter
:start (metrics/start-statsd-reporter (:datadog (ziggurat-config))
Expand Down Expand Up @@ -94,13 +94,15 @@
(start* #{#'config/config
#'statsd-reporter
#'sentry-reporter
#'nrepl-server/server}))
#'nrepl-server/server
#'kafka-producers}))

(defn stop-common-states []
(mount/stop #'config/config
#'statsd-reporter
#'messaging-connection/connection
#'nrepl-server/server))
#'nrepl-server/server
#'kafka-producers))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
Expand Down
138 changes: 91 additions & 47 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,36 @@
around variants of `send` methods defined in
`org.apache.kafka.clients.producer.KafkaProducer`.
Users of Ziggurat can use methods defined here to send data
to Kafka topics.
Constructs an instance of `org.apache.kafka.clients.producer.KafkaProducer` using config values
provided in `resources/config.edn`. Producer configs currently supported in Ziggurat are
mentioned below.
At the time of initialization, an instance of
`org.apache.kafka.clients.producer.KafkaProducer`
is constructed using config values provided in `resources/config.edn`.
A producer can be configured for each of the stream-routes
in config.edn. Please see the example below
`
:stream-router {:default {:application-id \"test\"\n
:bootstrap-servers \"localhost:9092\"\n
:stream-threads-count [1 :int]\n
:origin-topic \"topic\"\n
:proto-class \"flatland.protobuf.test.Example$Photo\"\n
:channels {:channel-1 {:worker-count [10 :int]\n :retry {:count [5 :int]\n :enabled [true :bool]}}}\n
:producer {:bootstrap-servers \"localhost:9092\"\n
:acks \"all\"\n
:retries-config 5\n
:max-in-flight-requests-per-connection 5\n
:enable-idempotence false\n
:value-serializer \"org.apache.kafka.common.serialization.StringSerializer\"\n
:key-serializer \"org.apache.kafka.common.serialization.StringSerializer\"}}
`
Usage:
`
Please see `send` for publishing data via Kafka producers
`
These are the KafkaProducer configs currenlty supported in Ziggurat.
- bootstrap.servers
- acks
- retries
Expand All @@ -28,53 +51,74 @@
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties)))

(defn producer-config [] (get-in (ziggurat-config) [:producer :default]))

(defn producer-config-properties []
(let [bootstrap-servers (:bootstrap-servers (producer-config))
acks (:acks (producer-config))
key-serializer-class (:key-serializer (producer-config))
value-serializer-class (:value-serializer (producer-config))
enable-idempotence (:enable-idempotence (producer-config))
retries-config (int (:retries-config (producer-config)))
max-in-flight-requests-per-connection (int (:max-in-flight-requests-per-connection (producer-config)))]
(doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers)
(.put ProducerConfig/ACKS_CONFIG acks)
(.put ProducerConfig/RETRIES_CONFIG retries-config)
(.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence)
(.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max-in-flight-requests-per-connection)
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer-class)
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer-class))))

(defstate kafka-producer
:start (do (log/info "Starting Kafka producer ...")
(KafkaProducer. (producer-config-properties)))
:stop (do (log/info "Stopping Kafka producer ...")
(doto kafka-producer
(.flush)
(.close))))
(defn- producer-properties-from-config [{:keys [bootstrap-servers
acks
key-serializer
value-serializer
enable-idempotence
retries-config
max-in-flight-requests-per-connection]}]
(doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers)
(.put ProducerConfig/ACKS_CONFIG acks)
(.put ProducerConfig/RETRIES_CONFIG (int retries-config))
(.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence)
(.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION (int max-in-flight-requests-per-connection))
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer)
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer)))

(defn producer-properties-map []
(reduce (fn [producer-map [stream-config-key stream-config]]
(let [producer-config (:producer stream-config)]
(if (some? producer-config)
(assoc producer-map stream-config-key (producer-properties-from-config producer-config))
producer-map)))
{}
(seq (:stream-router (ziggurat-config)))))

(defstate kafka-producers
:start (if (not-empty (producer-properties-map))
(do (log/info "Starting Kafka producers ...")
(reduce (fn [producers [stream-config-key properties]]
(do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
(assoc producers stream-config-key (KafkaProducer. properties))))
{}
(seq (producer-properties-map))))
(log/info "No producers found. Can not initiate start."))

:stop (if (not-empty kafka-producers)
(do (log/info "Stopping Kafka producers ...")
(doall (map (fn [[stream-config-key producer]]
(log/debug "Stopping Kafka producer associated with [" stream-config-key "]")
(doto producer
(.flush)
(.close)))
(seq kafka-producers))))
(log/info "No producers found. Can not initiate stop.")))

(defn send
"A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables
the users of ziggurat to produce data to a Kafka topic using a Kafka producer.
Please see `producer` for the list of config currently supported in ziggurat.
the users of Ziggurat to produce data to a Kafka topic using a Kafka producer
associated with a Kafka stream config key.
E.g.
(send \"test-topic\" \"value\")
(send \"test-topic\" 1 \"key\" \"value\")
"

([topic data]
(let [producer-record (ProducerRecord. topic data)]
(.send kafka-producer producer-record)))

([topic partition key data]
(let [producer-record (ProducerRecord. topic (int partition) key data)]
(.send kafka-producer producer-record))))
For publishing data to producer defined for the
stream router config with defined agains
key `:default`, use send like this.
`(send :default \"test-topic\" \"key\" \"value\")`
`(send :default \"test-topic\" 1 \"key\" \"value\")`
"

([stream-config-key topic key value]
(send stream-config-key topic nil key value))

([stream-config-key topic partition key value]
(if (some? (get kafka-producers stream-config-key))
(let [producer-record (ProducerRecord. topic partition key value)]
(.send (stream-config-key kafka-producers) producer-record))

(let [error-msg (str "Can't publish data. No producers defined for stream config [" stream-config-key "]")]
(do (log/error error-msg)
(throw (ex-info error-msg {:stream-config-key stream-config-key})))))))
Loading

0 comments on commit d355ae5

Please sign in to comment.