Skip to content

Commit

Permalink
Merge branch '2.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik7153 committed Jun 21, 2019
2 parents dc012ee + 79a583f commit 3056a4c
Show file tree
Hide file tree
Showing 23 changed files with 1,038 additions and 341 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ jobs:

- stage: deploy
script: lein deploy clojars
branches:
only:
- 2.x
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@ All notable changes to this project will be documented in this file. This change

## Unreleased Changes

## 3.0.0-rc1 - 2019-05-17

## 3.0.0-alpha - 2019-06-21
- Upgrades kafka streams to version 2.1. Please refer [this](UpgradeGuide.md) to upgrade

## 2.12.0 - 2019-06-17
- Add support for providing a topic-name label in the metrics
- Multiple Kafka producers support in ziggurat (#55)
- Validate stream routes only when modes is not present or it contains stream-server (#59)

## 2.11.1 - 2019-06-04
- Actor stop fn should stop before the Ziggurat state (#53)

## 2.11.0 - 2019-05-31
- Running ziggurat in different modes (#46)

## 2.10.2 - 2019-05-03
- Adds config to change the changelog topic replication factor

Expand Down
60 changes: 57 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Upgarde Guide to 3.x refer[here](UpgradeGuide.md)

Add this to your project.clj:

`[tech.gojek/ziggurat "3.0.0-rc1"]`
`[tech.gojek/ziggurat "3.0.0-alpha"]`

or for latest stable version

Expand Down Expand Up @@ -128,7 +128,45 @@ To start a stream (a thread that reads messages from Kafka), add this to your co
Ziggurat also sets up a HTTP server by default and you can pass in your own routes that it will serve. The above example demonstrates
how you can pass in your own route.

or
```clojure
(ziggurat/main {:start-fn start-fn
:stop-fn stop-fn
:stream-routes {:stream-id {:handler-fn main-fn}}
:actor-routes routes
:modes [:api-server :stream-worker]})
```

This will start both api-server and stream-worker modes

There are four modes supported by ziggurat
```
:api-server - Mode by which only server will be started with actor routes and management routes(Dead set management)
:stream-worker - Only start the server plus rabbitmq for only producing the messages for retry and channels
:worker - Starts the rabbitmq consumer for retry and channel
:management-api - Servers only routes which used for deadset management
```

You can pass in multiple modes and it will start accordingly
If nothing passed to modes then it will start all the modes.

## Publishing data to Kafka Topics in Ziggurat
To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`.

At the time of initialization, an instance of `org.apache.kafka.clients.producer.KafkaProducer` is constructed using config values provided in `resources/config.edn`. A producer can be configured for each of the stream-routes in config.edn. Please see the example below.

At present, only a few configurations are supported for constructing KafkaProducer. These have been explained [here](#configuration). Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs)
for a complete list of all producer configs available in Kafka.

Ziggurat.producer namespace defines a multi-arity `send` function which is a thin wrapper around [KafkaProducer#send](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-). This method publishes data to a Kafka topic through a Kafka producer
defined in the stream router configuration. See configuration section below.

E.g.
For publishing data using a producer which is defined for the stream router config with key `:default`, use send like this:

`(send :default "test-topic" "key" "value")`

`(send :default "test-topic" 1 "key" "value")`

## Configuration

Expand All @@ -142,7 +180,14 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
:origin-topic "kafka-topic-*"
:oldest-processed-message-in-s [604800 :int]
:proto-class "proto-class"
:changelog-topic-replication-factor [3 :int]}}
:changelog-topic-replication-factor [3 :int]
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}}
:datadog {:host "localhost"
:port [8125 :int]
:enabled [false :bool]}
Expand Down Expand Up @@ -170,7 +215,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
:http-server {:port [8010 :int]
:thread-count [100 :int]}}}
:thread-count [100 :int]}}}
```
* app-name - Refers to the name of the application. Used to namespace queues and metrics.
* nrepl-server - Port on which the repl server will be hosted
Expand All @@ -183,6 +228,15 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur
* oldest-processed-messages-in-s - The oldest message which will be processed by stream in second. By default the value is 604800 (1 week)
* proto-class - The proto-class of the message so that it can be decompiled before being passed to the mapper function
* changelog-topic-replication-factor - the internal changelog topic replication factor. By default the value is 3
* producer - Configuration for KafkaProducer. Currently, only following options are supported. Please see [Producer Configs](https://kafka.apache.org/documentation/#producerconfigs) for detailed explanation for each of the configuration parameters.
* bootstrap.servers - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
* acks - The number of acknowledgments the producer requires the leader to have received before considering a request complete. Valid values are [all, -1, 0, 1].
* retries - Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.
* key.serializer - Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
* value.serializer - Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
* max.in.flight.requests.per.connection - The maximum number of unacknowledged requests the client will send on a single connection before blocking.
* enable.idempotence - When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.

* datadog - The statsd host and port that metrics should be sent to, although the key name is datadog, it supports statsd as well to send metrics.
* sentry - Whenever a :failure keyword is returned from the mapper-function or an exception is raised while executing the mapper-function, an event is sent to sentry. You can skip this flow by disabling it.
* rabbit-mq-connection - The details required to make a connection to rabbitmq. We use rabbitmq for the retry mechanism.
Expand Down
5 changes: 3 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject tech.gojek/ziggurat "3.0.0-rc1"
(defproject tech.gojek/ziggurat "3.0.0-alpha"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down Expand Up @@ -49,7 +49,8 @@
[junit/junit "4.12"]
[org.apache.kafka/kafka-streams "2.1.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.1.0" :classifier "test"]
[org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]]
[org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]
[org.clojure/test.check "0.9.0"]]
:plugins [[lein-cloverage "1.0.13"]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[lein-cljfmt "0.6.3"]
Expand Down
33 changes: 24 additions & 9 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,27 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
33 changes: 24 additions & 9 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,27 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:proto-class "flatland.protobuf.test.Example$Photo"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}}}
Loading

0 comments on commit 3056a4c

Please sign in to comment.