Skip to content

Commit

Permalink
Merge e007309 into b962435
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Jun 15, 2020
2 parents b962435 + e007309 commit b3192e9
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 80 deletions.
54 changes: 41 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ To start a stream (a thread that reads messages from Kafka), add this to your co
[message]
(println message)
:success)

(def handler-fn
(-> main-fn
(middleware/protobuf->hash ProtoClass :stream-id)))
;; Here ProtoClass refers to the fully qualified name of the Java class which the code is used to de-serialize the message.

(ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}})
```
_NOTE: this example assumes that the message is serialized in Protobuf format_
_NOTE: this example assumes that the message is serialized in Protobuf format_

Please refer the [Middleware section](#middleware-in-ziggurat) for understanding `handler-fn` here.

Expand Down Expand Up @@ -129,15 +129,15 @@ Please refer the [Middleware section](#middleware-in-ziggurat) for understanding
[message]
(println message)
:success)

(def handler-fn
(-> main-fn
(middleware/protobuf->hash ProtoClass :stream-id)))

(ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}} routes)

```
_NOTE: this example assumes that the message is serialized in Protobuf format_
_NOTE: this example assumes that the message is serialized in Protobuf format_

Ziggurat also sets up a HTTP server by default and you can pass in your own routes that it will serve. The above example demonstrates
how you can pass in your own route.
Expand Down Expand Up @@ -211,9 +211,9 @@ It can be used like this.
(parse-json :stream-route-config)))
```

Here, `message-handler-fn` calls `parse-json` with a message handler function
`actual-message-handler-function` as the first argument and the key of a stream-route
config (as defined in `config.edn`) as the second argument.
Here, `message-handler-fn` calls `parse-json` with a message handler function
`actual-message-handler-function` as the first argument and the key of a stream-route
config (as defined in `config.edn`) as the second argument.

## Publishing data to Kafka Topics in Ziggurat
To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`.
Expand Down Expand Up @@ -245,8 +245,8 @@ Tracing has been added to the following flows:
3. Produce to rabbitmq channel
4. Produce to another kafka topic

By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment)
and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables.
By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment)
and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables.
To enable custom tracer, a custom tracer provider function name can be set in `:custom-provider`. The corresponding function will be executed in runtime to create a tracer. In the event of any errors while executing the custom tracer provider, a Noop tracer will be created.

To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key.
Expand All @@ -263,6 +263,34 @@ JAEGER_AGENT_HOST: "localhost"
JAEGER_AGENT_PORT: 6831
```

## Stream Joins
This will allow an actor to join messages from 2 topics into 1 result. To be able to use stream joins just add the configuration below to your `config.edn`
```clojure
{:ziggurat {:stream-router {:stream-id {
:consumer-type :stream-joins
:input-topics {:topic-1 {} :topic-2 {}}
:join-cfg {:topic-1-and-topic-2 {:join-window-ms 5000 :join-type :inner}}
}}}
```
* consumer-type - enables stream joins if `:stream-joins` key is provided, other possible value is `:default` which is the default actor behavior
* input-topics - a map of topics in which you want to use for joining
* join-cfg - a map of configurations which you define the join-window-ms and the join-type (`:inner`, `:left` or `:outer`)

And your actor's handler function be like
```clojure
(def handler-func
(-> main-func
(mw/protobuf->hash [com.gojek.esb.booking.BookingLogMessage com.gojek.esb.booking.BookingLogMessage] :booking)))
```

`Please take note of the vector containing the proto classes`

Your handler function will receive a message in the following format/structure

```clojure
{:left "message-from-1st-topic" :right "message-from-2nd-topic"}
```

## Configuration

All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key.
Expand Down Expand Up @@ -344,7 +372,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur

## Alpha (Experimental) Features
The contract and interface for experimental features in Ziggurat can be changed as we iterate towards better designs for that feature.
For all purposes these features should be considered unstable and should only be used after understanding their risks and implementations.
For all purposes these features should be considered unstable and should only be used after understanding their risks and implementations.

### Exponential Backoff based Retries
In addition to linear retries, Ziggurat users can now use exponential backoff strategy for retries. This means that the message
Expand All @@ -359,11 +387,11 @@ Exponential retries can be configured as described below.
:ziggurat {:stream-router {:default {:application-id "application_name"...}}}
:retry {:type [:exponential :keyword]
:count [10 :int]
:enable [true :bool]}
:enable [true :bool]}

```

Exponential retries can be configured for channels too. Additionally, a user can specify a custom `queue-timeout-ms` value per channel.
Exponential retries can be configured for channels too. Additionally, a user can specify a custom `queue-timeout-ms` value per channel.
Timeouts for exponential backoffs are calculated using `queue-timeout-ms`. This implies that each channel can have separate count of retries
and different timeout values.

Expand All @@ -373,7 +401,7 @@ and different timeout values.
:retry {:type [:exponential :keyword]
:count [10 :int]
:queue-timeout-ms 2000
:enable [true :bool]}}}}}
:enable [true :bool]}}}}}
```

## Deprecation Notice
Expand Down
3 changes: 3 additions & 0 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
:stream-threads-count [1 :int]
:origin-topic "topic"
:upgrade-from "1.1"
:consumer-type :default
:input-topics {:topic {} :another-test-topic {}}
:join-cfg {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :inner}}
:channels {:channel-1 {:worker-count [10 :int]
:retry {:type [:linear :keyword]
:count [5 :int]
Expand Down
9 changes: 7 additions & 2 deletions src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
(defn config-from-env [config-file]
(clonfig/read-config (edn-config config-file)))

(declare config)

(defstate config
:start (let [config-values-from-env (config-from-env config-file)
app-name (-> config-values-from-env :ziggurat :app-name)]
Expand All @@ -80,8 +82,11 @@
(let [cfg (ziggurat-config)]
(get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future

(defn get-in-config [ks]
(get-in (ziggurat-config) ks))
(defn get-in-config
([ks]
(get-in (ziggurat-config) ks))
([ks default]
(get-in (ziggurat-config) ks default)))

(defn channel-retry-config [topic-entity channel]
(get-in (ziggurat-config) [:stream-router topic-entity :channels channel :retry]))
Expand Down
11 changes: 7 additions & 4 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,19 @@
(defn- add-shutdown-hook [actor-stop-fn modes]
(.addShutdownHook
(Runtime/getRuntime)
(Thread. ^Runnable #(do (stop actor-stop-fn modes)
(shutdown-agents))
(Thread. ^Runnable #((stop actor-stop-fn modes)
(shutdown-agents))
"Shutdown-handler")))

(declare StreamRoute)

(s/defschema StreamRoute
(s/conditional
#(and (seq %)
(map? %))
{s/Keyword {:handler-fn (s/pred #(fn? %))
s/Keyword (s/pred #(fn? %))}}))
{s/Keyword {:handler-fn (s/pred #(fn? %))
(s/optional-key :consumer-type) (s/enum :default :stream-joins)
s/Keyword (s/pred #(fn? %))}}))

(defn validate-stream-routes [stream-routes modes]
(when (or (empty? modes) (contains? (set modes) :stream-worker))
Expand Down
2 changes: 2 additions & 0 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@

(defrecord MessagePayload [message topic-entity])

(declare message-payload-schema)

(s/defschema message-payload-schema
{:message s/Any
:topic-entity s/Keyword
Expand Down
29 changes: 15 additions & 14 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[ziggurat.mapper :as mpr]
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.messaging.util :refer :all]
[ziggurat.messaging.util :refer [prefixed-channel-name prefixed-queue-name]]
[ziggurat.metrics :as metrics]))

(defn- convert-to-message-payload
Expand All @@ -24,14 +24,14 @@
(s/validate mpr/message-payload-schema message)
(catch Exception e
(log/info "old message format read, converting to message-payload: " message)
(let [retry-count (or (:retry-count message) 0)
(let [retry-count (or (:retry-count message) 0)
message-payload (mpr/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))]
(assoc message-payload :retry-count retry-count)))))

(defn convert-and-ack-message
"De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
if `ack?` is true."
[ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity]
[ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity]
(try
(let [message (nippy/thaw payload)]
(when ack?
Expand All @@ -48,8 +48,8 @@
(lb/ack ch delivery-tag))

(defn process-message-from-queue [ch meta payload topic-entity processing-fn]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
(when message-payload
(log/infof "Processing message [%s] from RabbitMQ " message-payload)
(try
Expand Down Expand Up @@ -109,13 +109,13 @@

(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity]
(lb/qos ch prefetch-count)
(let [consumer-tag (lcons/subscribe ch
queue-name
(message-handler wrapped-mapper-fn topic-entity)
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
:handle-consume-ok-fn (fn [consumer_tag]
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})]))
(lcons/subscribe ch
queue-name
(message-handler wrapped-mapper-fn topic-entity)
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
:handle-consume-ok-fn (fn [consumer_tag]
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))}))

(defn start-retry-subscriber* [mapper-fn topic-entity channels]
(when (get-in-config [:retry :enabled])
Expand All @@ -129,8 +129,9 @@
(defn start-channels-subscriber [channels topic-entity]
(doseq [channel channels]
(let [channel-key (first channel)
channel-handler-fn (second channel)]
(dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])]
channel-handler-fn (second channel)
worker-count (get-in-config [:stream-router topic-entity :channels channel-key :worker-count] 0)]
(dotimes [_ worker-count]
(start-subscriber* (lch/open connection)
1
(prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
Expand Down
14 changes: 8 additions & 6 deletions src/ziggurat/middleware/default.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
(ns ziggurat.middleware.default
(:require [protobuf.impl.flatland.mapdef :as protodef]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.config :refer [get-in-config ziggurat-config]]
[flatland.protobuf.core :as proto]
[ziggurat.metrics :as metrics]
[ziggurat.config :as config]
[ziggurat.sentry :refer [sentry-reporter]]))

(defn- deserialise-message
Expand Down Expand Up @@ -32,7 +31,9 @@
(sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class))
(metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags)
nil)))
message))
(let [{:keys [left right]} message]
{:left (deserialise-message left (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name) ;; TODO: convert proto-class into a vector only on the next version
:right (deserialise-message right (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)})))

(defn- deserialise-message-deprecated
"This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a
Expand All @@ -59,15 +60,16 @@
(sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class))
(metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags)
nil)))
message))
(let [{:keys [left right]} message]
{:left (deserialise-message left (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name) ;; TODO: convert proto-class into a vector only on the next version
:right (deserialise-message right (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)})))

(defn get-deserializer []
(if (config/get-in-config [:alpha-features :protobuf-middleware :enabled])
(if (get-in-config [:alpha-features :protobuf-middleware :enabled])
deserialise-message
deserialise-message-deprecated))

(defn protobuf->hash
"This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap"
[handler-fn proto-class topic-entity-name]
(fn [message] (handler-fn ((get-deserializer) message proto-class topic-entity-name))))

0 comments on commit b3192e9

Please sign in to comment.