Skip to content

Commit

Permalink
implement stream joins
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Jun 4, 2020
1 parent a9bbcb4 commit 4fd2dfe
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 62 deletions.
1 change: 1 addition & 0 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
:stream-threads-count [1 :int]
:origin-topic "topic"
:upgrade-from "1.1"
:consumer-type :default
:channels {:channel-1 {:worker-count [10 :int]
:retry {:type [:linear :keyword]
:count [5 :int]
Expand Down
9 changes: 7 additions & 2 deletions src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
(defn config-from-env [config-file]
(clonfig/read-config (edn-config config-file)))

(declare config)

(defstate config
:start (let [config-values-from-env (config-from-env config-file)
app-name (-> config-values-from-env :ziggurat :app-name)]
Expand All @@ -79,8 +81,11 @@
(let [cfg (ziggurat-config)]
(get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future

(defn get-in-config [ks]
(get-in (ziggurat-config) ks))
(defn get-in-config
([ks]
(get-in (ziggurat-config) ks))
([ks default]
(get-in (ziggurat-config) ks default)))

(defn channel-retry-config [topic-entity channel]
(get-in (ziggurat-config) [:stream-router topic-entity :channels channel :retry]))
Expand Down
10 changes: 8 additions & 2 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,18 @@
(shutdown-agents))
"Shutdown-handler")))

(declare StreamRoute)

(s/defschema StreamRoute
(s/conditional
#(and (seq %)
(map? %))
{s/Keyword {:handler-fn (s/pred #(fn? %))
s/Keyword (s/pred #(fn? %))}}))
{s/Keyword {:handler-fn (s/pred #(fn? %))
(s/optional-key :consumer-type) (s/enum :default :stream-joins)
(s/optional-key :input-topics) (s/pred vector?)
(s/optional-key :output-topic) (s/pred string?)
(s/optional-key :join-cfg) (s/pred vector?)
s/Keyword (s/pred #(fn? %))}}))

(defn validate-stream-routes [stream-routes modes]
(when (or (empty? modes) (contains? (set modes) :stream-worker))
Expand Down
2 changes: 2 additions & 0 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@

(defrecord MessagePayload [message topic-entity])

(declare message-payload-schema)

(s/defschema message-payload-schema
{:message s/Any
:topic-entity s/Keyword
Expand Down
29 changes: 15 additions & 14 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[ziggurat.mapper :as mpr]
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.messaging.util :refer :all]
[ziggurat.messaging.util :refer [prefixed-channel-name prefixed-queue-name]]
[ziggurat.metrics :as metrics]))

(defn- convert-to-message-payload
Expand All @@ -24,14 +24,14 @@
(s/validate mpr/message-payload-schema message)
(catch Exception e
(log/info "old message format read, converting to message-payload: " message)
(let [retry-count (or (:retry-count message) 0)
(let [retry-count (or (:retry-count message) 0)
message-payload (mpr/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))]
(assoc message-payload :retry-count retry-count)))))

(defn convert-and-ack-message
"De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
if `ack?` is true."
[ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity]
[ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity]
(try
(let [message (nippy/thaw payload)]
(when ack?
Expand All @@ -48,8 +48,8 @@
(lb/ack ch delivery-tag))

(defn process-message-from-queue [ch meta payload topic-entity processing-fn]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
(when message-payload
(log/infof "Processing message [%s] from RabbitMQ " message-payload)
(try
Expand Down Expand Up @@ -109,13 +109,13 @@

(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity]
(lb/qos ch prefetch-count)
(let [consumer-tag (lcons/subscribe ch
queue-name
(message-handler wrapped-mapper-fn topic-entity)
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
:handle-consume-ok-fn (fn [consumer_tag]
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})]))
(lcons/subscribe ch
queue-name
(message-handler wrapped-mapper-fn topic-entity)
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
:handle-consume-ok-fn (fn [consumer_tag]
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))}))

(defn start-retry-subscriber* [mapper-fn topic-entity channels]
(when (get-in-config [:retry :enabled])
Expand All @@ -129,8 +129,9 @@
(defn start-channels-subscriber [channels topic-entity]
(doseq [channel channels]
(let [channel-key (first channel)
channel-handler-fn (second channel)]
(dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])]
channel-handler-fn (second channel)
worker-count (get-in-config [:stream-router topic-entity :channels channel-key :worker-count] 0)]
(dotimes [_ worker-count]
(start-subscriber* (lch/open connection)
1
(prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
Expand Down
4 changes: 3 additions & 1 deletion src/ziggurat/middleware/default.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
(sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class))
(metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags)
nil)))
message))
(let [{left :left right :right} message]
{:left (deserialise-message left proto-class topic-entity-name)
:right (deserialise-message right proto-class topic-entity-name)})))

(defn protobuf->hash
"This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap"
Expand Down
121 changes: 94 additions & 27 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns ziggurat.streams
(:require [clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[sentry-clj.async :as sentry]
[ziggurat.channel :as chl]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.header-transformer :as header-transformer]
Expand All @@ -16,13 +15,14 @@
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.common.utils SystemTime]
[org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.kstream ValueMapper TransformerSupplier ValueTransformerSupplier]
[org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier]
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
[ziggurat.timestamp_transformer IngestionTimeExtractor]
[io.opentracing Tracer]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.tag Tags]))
[io.opentracing.tag Tags]
[java.time Duration]))

(def default-config-for-stream
{:buffered-records-per-partition 10000
Expand All @@ -31,6 +31,9 @@
:oldest-processed-message-in-s 604800
:changelog-topic-replication-factor 3
:session-timeout-ms-config 60000
;; :consumer-type :default
;; :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
;; :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:default-key-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"
:default-value-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"})

Expand All @@ -46,9 +49,9 @@
"Populates `key-deserializer-encoding`, `value-deserializer-encoding` and `deserializer-encoding`
in `properties` only if non-nil."
[properties key-deserializer-encoding value-deserializer-encoding deserializer-encoding]
(let [KEY_DESERIALIZER_ENCODING "key.deserializer.encoding"
(let [KEY_DESERIALIZER_ENCODING "key.deserializer.encoding"
VALUE_DESERIALIZER_ENCODING "value.deserializer.encoding"
DESERIALIZER_ENCODING "deserializer.encoding"]
DESERIALIZER_ENCODING "deserializer.encoding"]
(if (some? key-deserializer-encoding)
(.put properties KEY_DESERIALIZER_ENCODING key-deserializer-encoding))
(if (some? value-deserializer-encoding)
Expand Down Expand Up @@ -126,10 +129,10 @@
(get [_] (header-transformer/create))))

(defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(let [service-name (:app-name (ziggurat-config))
(let [service-name (:app-name (ziggurat-config))
delay-metric-namespace "message-received-delay-histogram"
metric-namespaces [service-name topic-entity-name delay-metric-namespace]
additional-tags {:topic_name topic-entity-name}]
metric-namespaces [service-name topic-entity-name delay-metric-namespace]
additional-tags {:topic_name topic-entity-name}]
(.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))

(defn- header-transform-values [stream-builder]
Expand All @@ -144,14 +147,14 @@

(defn- traced-handler-fn [handler-fn channels message topic-entity]
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
span (as-> tracer t
(.buildSpan t "Message-Handler")
(.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag t (.getKey Tags/COMPONENT) "ziggurat")
(if (nil? parent-ctx)
t
(.asChildOf t parent-ctx))
(.start t))]
span (as-> tracer t
(.buildSpan t "Message-Handler")
(.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag t (.getKey Tags/COMPONENT) "ziggurat")
(if (nil? parent-ctx)
t
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message)))
(catch Exception e
Expand All @@ -160,33 +163,97 @@
(finally
(.finish span)))))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
(let [builder (StreamsBuilder.)
topic-entity-name (name topic-entity)
topic-pattern (Pattern/compile origin-topic)]
(.addStateStore builder (store-supplier-builder))
(->> (.stream builder topic-pattern)
(defn- join-streams
[stream-1 stream-2]
(let [cfg-1 (:cfg stream-1)
cfg-2 (:cfg stream-2)
strm-1 (:stream stream-1)
strm-2 (:stream stream-2)
join-window (JoinWindows/of (Duration/ofMillis (:join-window cfg-1)))
join-type (:join-type cfg-1)
value-joiner (reify ValueJoiner
(apply [_ left right]
{:left left :right right}))]
{:stream
(if cfg-1
(case join-type
:left (.leftJoin strm-1 strm-2 value-joiner join-window)
:outer (.outerJoin strm-1 strm-2 value-joiner join-window)
(.join strm-1 strm-2 value-joiner join-window))
strm-1)
:cfg cfg-2}))

(defn- map-stream [stream oldest-processed-message-in-s handler-fn channels topic-entity]
(let [topic-entity-name (name topic-entity)]
(->> stream
(timestamp-transform-values topic-entity-name oldest-processed-message-in-s)
(header-transform-values)
(map-values #(log-and-report-metrics topic-entity-name %))
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))))

(defn- create-stream [builder topic cfg oldest-processed-message-in-s handler-fn channels topic-entity]
(let [stream (case (:input-type cfg)
;; :stream (.stream builder topic)
;; :table (.table builder topic)
(.stream builder topic))]
(map-stream stream oldest-processed-message-in-s handler-fn channels topic-entity)
stream))

(defn- stream-joins-topology [handler-fn {:keys [input-topics join-cfg output-topic oldest-processed-message-in-s]} topic-entity channels]
(let [builder (StreamsBuilder.)
;;topic-patterns (map (fn [input-topic] (Pattern/compile (:name input-topic))) input-topics)
topic-patterns (map :name input-topics)
_ (.addStateStore builder (store-supplier-builder))
stream-map (map (fn [topic-pattern cfg] {:stream (create-stream builder topic-pattern cfg oldest-processed-message-in-s handler-fn channels topic-entity) :cfg cfg}) topic-patterns (conj join-cfg nil))
{stream :stream} (reduce join-streams stream-map)
strm (if (instance? org.apache.kafka.streams.kstream.KTable stream)
(.toStream stream)
stream)]
;; (.addStateStore builder (store-supplier-builder))
(map-stream strm oldest-processed-message-in-s handler-fn channels topic-entity)
(when output-topic
(.to strm output-topic))
(.build builder)))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
(let [builder (StreamsBuilder.)
topic-pattern (Pattern/compile origin-topic)]
(.addStateStore builder (store-supplier-builder))
(create-stream builder topic-pattern nil oldest-processed-message-in-s handler-fn channels topic-entity)
(.build builder)))

(defn- start-stream* [handler-fn stream-config topic-entity channels]
(KafkaStreams. ^Topology (topology handler-fn stream-config topic-entity channels)
^Properties (properties stream-config)
(new TracingKafkaClientSupplier tracer)))
(let [topology-fn (case (:consumer-type stream-config)
:stream-joins stream-joins-topology
topology)
_ (println (:consumer-type stream-config) ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
top (topology-fn handler-fn stream-config topic-entity channels)
_ (log/info (.describe top))]
(KafkaStreams. ^Topology top
^Properties (properties stream-config)
(new TracingKafkaClientSupplier tracer))))

(defn- merge-stream-joins-config ;; not sure about this, :consumer-type can be from the config.edn or the map from init-fn, please comment!
[config stream]
(case (:consumer-type config)
:stream-joins (assoc config :input-topics (:input-topics stream) :output-topic (:output-topic stream) :join-cfg (:join-cfg stream) :consumer-type (:consumer-type config))
(case (:consumer-type stream)
:stream-joins (assoc config :input-topics (:input-topics stream) :output-topic (:output-topic stream) :join-cfg (:join-cfg stream) :consumer-type (:consumer-type stream))
(assoc config :consumer-type :default))))

(defn start-streams
([stream-routes]
(start-streams stream-routes (ziggurat-config)))
([stream-routes stream-configs]
(reduce (fn [streams stream]
(let [topic-entity (first stream)
topic-handler-fn (-> stream second :handler-fn)
snd (second stream)
topic-handler-fn (:handler-fn snd)
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(merge-stream-joins-config snd)
;; (assoc :input-topics (:input-topics snd) :output-topic (:output-topic snd) :join-cfg (:join-cfg snd))
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
(.start stream)
Expand Down
Loading

0 comments on commit 4fd2dfe

Please sign in to comment.