Skip to content

Commit

Permalink
Merge d355ae5 into 02ce64d
Browse files Browse the repository at this point in the history
  • Loading branch information
mjayprateek committed Jun 12, 2019
2 parents 02ce64d + d355ae5 commit 8a6b06b
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 21 deletions.
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
[junit/junit "4.12"]
[org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "1.1.1" :classifier "test"]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]
[org.clojure/test.check "0.9.0"]]
:plugins [[lein-cloverage "1.0.13"]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[jonase/eastwood "0.2.6"]
Expand Down
31 changes: 23 additions & 8 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,26 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
31 changes: 23 additions & 8 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,26 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
9 changes: 6 additions & 3 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
[ziggurat.nrepl-server :as nrepl-server]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]))
[ziggurat.streams :as streams]
[ziggurat.producer :as producer :refer [kafka-producers]]))

(defstate statsd-reporter
:start (metrics/start-statsd-reporter (:datadog (ziggurat-config))
Expand Down Expand Up @@ -93,13 +94,15 @@
(start* #{#'config/config
#'statsd-reporter
#'sentry-reporter
#'nrepl-server/server}))
#'nrepl-server/server
#'kafka-producers}))

(defn stop-common-states []
(mount/stop #'config/config
#'statsd-reporter
#'messaging-connection/connection
#'nrepl-server/server))
#'nrepl-server/server
#'kafka-producers))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
Expand Down
124 changes: 124 additions & 0 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
(ns ziggurat.producer
"This namespace defines methods for publishing data to
Kafka topics. The methods defined here are essentially wrapper
around variants of `send` methods defined in
`org.apache.kafka.clients.producer.KafkaProducer`.
At the time of initialization, an instance of
`org.apache.kafka.clients.producer.KafkaProducer`
is constructed using config values provided in `resources/config.edn`.
A producer can be configured for each of the stream-routes
in config.edn. Please see the example below
`
:stream-router {:default {:application-id \"test\"\n
:bootstrap-servers \"localhost:9092\"\n
:stream-threads-count [1 :int]\n
:origin-topic \"topic\"\n
:proto-class \"flatland.protobuf.test.Example$Photo\"\n
:channels {:channel-1 {:worker-count [10 :int]\n :retry {:count [5 :int]\n :enabled [true :bool]}}}\n
:producer {:bootstrap-servers \"localhost:9092\"\n
:acks \"all\"\n
:retries-config 5\n
:max-in-flight-requests-per-connection 5\n
:enable-idempotence false\n
:value-serializer \"org.apache.kafka.common.serialization.StringSerializer\"\n
:key-serializer \"org.apache.kafka.common.serialization.StringSerializer\"}}
`
Usage:
`
Please see `send` for publishing data via Kafka producers
`
These are the KafkaProducer configs currenlty supported in Ziggurat.
- bootstrap.servers
- acks
- retries
- key.serializer
- value.serializer
- max.in.flight.requests.per.connection
- enable.idempotencecd
Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs)
for a complete list of all producer configs available in Kafka."

(:require [ziggurat.config :refer [ziggurat-config]]
[clojure.tools.logging :as log]
[mount.core :refer [defstate]])
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties)))

(defn- producer-properties-from-config [{:keys [bootstrap-servers
acks
key-serializer
value-serializer
enable-idempotence
retries-config
max-in-flight-requests-per-connection]}]
(doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers)
(.put ProducerConfig/ACKS_CONFIG acks)
(.put ProducerConfig/RETRIES_CONFIG (int retries-config))
(.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence)
(.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION (int max-in-flight-requests-per-connection))
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer)
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer)))

(defn producer-properties-map []
(reduce (fn [producer-map [stream-config-key stream-config]]
(let [producer-config (:producer stream-config)]
(if (some? producer-config)
(assoc producer-map stream-config-key (producer-properties-from-config producer-config))
producer-map)))
{}
(seq (:stream-router (ziggurat-config)))))

(defstate kafka-producers
:start (if (not-empty (producer-properties-map))
(do (log/info "Starting Kafka producers ...")
(reduce (fn [producers [stream-config-key properties]]
(do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
(assoc producers stream-config-key (KafkaProducer. properties))))
{}
(seq (producer-properties-map))))
(log/info "No producers found. Can not initiate start."))

:stop (if (not-empty kafka-producers)
(do (log/info "Stopping Kafka producers ...")
(doall (map (fn [[stream-config-key producer]]
(log/debug "Stopping Kafka producer associated with [" stream-config-key "]")
(doto producer
(.flush)
(.close)))
(seq kafka-producers))))
(log/info "No producers found. Can not initiate stop.")))

(defn send
"A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables
the users of Ziggurat to produce data to a Kafka topic using a Kafka producer
associated with a Kafka stream config key.
E.g.
For publishing data to producer defined for the
stream router config with defined agains
key `:default`, use send like this.
`(send :default \"test-topic\" \"key\" \"value\")`
`(send :default \"test-topic\" 1 \"key\" \"value\")`
"

([stream-config-key topic key value]
(send stream-config-key topic nil key value))

([stream-config-key topic partition key value]
(if (some? (get kafka-producers stream-config-key))
(let [producer-record (ProducerRecord. topic partition key value)]
(.send (stream-config-key kafka-producers) producer-record))

(let [error-msg (str "Can't publish data. No producers defined for stream config [" stream-config-key "]")]
(do (log/error error-msg)
(throw (ex-info error-msg {:stream-config-key stream-config-key})))))))
41 changes: 40 additions & 1 deletion test/ziggurat/fixtures.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.server :refer [server]]
[ziggurat.messaging.producer :as pr]
[ziggurat.producer :as producer]
[langohr.channel :as lch]
[langohr.exchange :as le]
[langohr.queue :as lq]))
[langohr.queue :as lq])
(:import (org.apache.kafka.streams.integration.utils EmbeddedKafkaCluster)
(java.util Properties)
(org.apache.kafka.clients.producer ProducerConfig)
(org.apache.kafka.clients.consumer ConsumerConfig)))

(defn mount-config []
(-> (mount/only [#'config/config])
Expand Down Expand Up @@ -91,3 +96,37 @@
(finally
(delete-queues ~stream-routes)
(delete-exchanges ~stream-routes))))

(defn mount-producer []
(-> (mount/only [#'producer/kafka-producers])
(mount/start)))

(defn construct-embedded-kafka-cluster []
(doto (EmbeddedKafkaCluster. 1)
(.start)))

(def ^:dynamic *embedded-kafka-cluster* nil)
(def ^:dynamic *bootstrap-servers* nil)
(def ^:dynamic *consumer-properties* nil)
(def ^:dynamic *producer-properties* nil)

(defn mount-only-config-and-producer [f]
(do
(mount-config)
(mount-producer)
(binding [*embedded-kafka-cluster* (construct-embedded-kafka-cluster)]
(binding [*bootstrap-servers* (.bootstrapServers *embedded-kafka-cluster*)]
(binding [*consumer-properties* (doto (Properties.)
(.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG, *bootstrap-servers*)
(.put ConsumerConfig/GROUP_ID_CONFIG, "ziggurat-consumer")
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG, "earliest")
(.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
(.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"))
*producer-properties* (doto (Properties.)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG *bootstrap-servers*)
(.put ProducerConfig/ACKS_CONFIG "all")
(.put ProducerConfig/RETRIES_CONFIG (int 0))
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer")
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))]
(f)))))
(mount/stop))
Loading

0 comments on commit 8a6b06b

Please sign in to comment.