Skip to content

Commit

Permalink
Merge 9a4e3aa into 02ce64d
Browse files Browse the repository at this point in the history
  • Loading branch information
mjayprateek committed Jun 13, 2019
2 parents 02ce64d + 9a4e3aa commit 699eaa7
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 23 deletions.
38 changes: 36 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ There are four modes supported by ziggurat
You can pass in multiple modes and it will start accordingly
If nothing passed to modes then it will start all the modes.

## Publishing data to Kafka Topics in Ziggurat
To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`.

At the time of initialization, an instance of `org.apache.kafka.clients.producer.KafkaProducer` is constructed using config values provided in `resources/config.edn`. A producer can be configured for each of the stream-routes in config.edn. Please see the example below.

At present, only a few configurations are supported for constructing KafkaProducer. These have been explained [here](#configuration). Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs)
for a complete list of all producer configs available in Kafka.

Ziggurat.producer namespace defines a multi-arity `send` function which is a thin wrapper around [KafkaProducer#send](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-). This method publishes data to a Kafka topic through a Kafka producer
defined in the stream router configuration. See configuration section below.

E.g.
For publishing data using a producer which is defined for the stream router config with key `:default`, use send like this:

`(send :default "test-topic" "key" "value")`

`(send :default "test-topic" 1 "key" "value")`

## Configuration

All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key.
Expand All @@ -151,7 +169,14 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
:origin-topic "kafka-topic-*"
:oldest-processed-message-in-s [604800 :int]
:proto-class "proto-class"
:changelog-topic-replication-factor [3 :int]}}
:changelog-topic-replication-factor [3 :int]
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}
:datadog {:host "localhost"
:port [8125 :int]
:enabled [false :bool]}
Expand Down Expand Up @@ -179,7 +204,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
:http-server {:port [8010 :int]
:thread-count [100 :int]}}}
:thread-count [100 :int]}}}
```
* app-name - Refers to the name of the application. Used to namespace queues and metrics.
* nrepl-server - Port on which the repl server will be hosted
Expand All @@ -192,6 +217,15 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
* oldest-processed-messages-in-s - The oldest message which will be processed by stream in second. By default the value is 604800 (1 week)
* proto-class - The proto-class of the message so that it can be decompiled before being passed to the mapper function
* changelog-topic-replication-factor - the internal changelog topic replication factor. By default the value is 3
* producer - Configuration for KafkaProducer. Currently, only following options are supported. Please see [Producer Configs](https://kafka.apache.org/documentation/#producerconfigs) for detailed explanation for each of the configuration parameters.
* bootstrap.servers - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
* acks - The number of acknowledgments the producer requires the leader to have received before considering a request complete. Valid values are [all, -1, 0, 1].
* retries - Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.
* key.serializer - Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
* value.serializer - Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
* max.in.flight.requests.per.connection - The maximum number of unacknowledged requests the client will send on a single connection before blocking.
* enable.idempotence - When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.

* datadog - The statsd host and port that metrics should be sent to, although the key name is datadog, it supports statsd as well to send metrics.
* sentry - Whenever a :failure keyword is returned from the mapper-function or an exception is raised while executing the mapper-function, an event is sent to sentry. You can skip this flow by disabling it.
* rabbit-mq-connection - The details required to make a connection to rabbitmq. We use rabbitmq for the retry mechanism.
Expand Down
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
[junit/junit "4.12"]
[org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "1.1.1" :classifier "test"]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]]
[org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]
[org.clojure/test.check "0.9.0"]]
:plugins [[lein-cloverage "1.0.13"]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[jonase/eastwood "0.2.6"]
Expand Down
31 changes: 23 additions & 8 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,26 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
31 changes: 23 additions & 8 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,26 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "application-name-test-02-multiple"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "kafka-topic-*"
:proto-class "com.company.LogMessage"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
17 changes: 14 additions & 3 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
[ziggurat.nrepl-server :as nrepl-server]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]))
[ziggurat.streams :as streams]
[ziggurat.producer :as producer :refer [kafka-producers]]))

(defstate statsd-reporter
:start (metrics/start-statsd-reporter (:datadog (ziggurat-config))
Expand All @@ -38,7 +39,11 @@
(start-rabbitmq-connection args)
(messaging-producer/make-queues (get args :stream-routes)))

(defn start-kafka-producers []
(start* #{#'kafka-producers}))

(defn start-stream [args]
(start-kafka-producers)
(start-rabbitmq-producers args)
(start* #{#'streams/stream} args))

Expand All @@ -51,22 +56,28 @@
(start* #{#'server/server} args))

(defn start-workers [args]
(start-kafka-producers)
(start-rabbitmq-producers args)
(start-rabbitmq-consumers args))

(defn- stop-rabbitmq-connection []
(mount/stop #'messaging-connection/connection))

(defn stop-kafka-producers []
(mount/stop #'kafka-producers))

(defn stop-workers []
(stop-rabbitmq-connection))
(stop-rabbitmq-connection)
(stop-kafka-producers))

(defn stop-server []
(mount/stop #'server/server)
(stop-rabbitmq-connection))

(defn stop-stream []
(mount/stop #'streams/stream)
(stop-rabbitmq-connection))
(stop-rabbitmq-connection)
(stop-kafka-producers))

(defn stop-management-apis []
(mount/stop #'server/server)
Expand Down
Loading

0 comments on commit 699eaa7

Please sign in to comment.