diff --git a/resources/config.test.edn b/resources/config.test.edn index 0b92edbd..b84e15c7 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -37,6 +37,7 @@ :stream-threads-count [1 :int] :origin-topic "topic" :upgrade-from "1.1" + :consumer-type :default :channels {:channel-1 {:worker-count [10 :int] :retry {:type [:linear :keyword] :count [5 :int] diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index 05ab8b43..287a117b 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -64,6 +64,8 @@ (defn config-from-env [config-file] (clonfig/read-config (edn-config config-file))) +(declare config) + (defstate config :start (let [config-values-from-env (config-from-env config-file) app-name (-> config-values-from-env :ziggurat :app-name)] @@ -79,8 +81,11 @@ (let [cfg (ziggurat-config)] (get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future -(defn get-in-config [ks] - (get-in (ziggurat-config) ks)) +(defn get-in-config + ([ks] + (get-in (ziggurat-config) ks)) + ([ks default] + (get-in (ziggurat-config) ks default))) (defn channel-retry-config [topic-entity channel] (get-in (ziggurat-config) [:stream-router topic-entity :channels channel :retry])) diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 7567dd63..8a81f756 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -143,12 +143,18 @@ (shutdown-agents)) "Shutdown-handler"))) +(declare StreamRoute) + (s/defschema StreamRoute (s/conditional #(and (seq %) (map? %)) - {s/Keyword {:handler-fn (s/pred #(fn? %)) - s/Keyword (s/pred #(fn? %))}})) + {s/Keyword {:handler-fn (s/pred #(fn? %)) + (s/optional-key :consumer-type) (s/enum :default :stream-joins) + (s/optional-key :input-topics) (s/pred vector?) + (s/optional-key :output-topic) (s/pred string?) + (s/optional-key :join-cfg) (s/pred vector?) + s/Keyword (s/pred #(fn? %))}})) (defn validate-stream-routes [stream-routes modes] (when (or (empty? modes) (contains? (set modes) :stream-worker)) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 95f4d80e..b1bb6d95 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -95,6 +95,8 @@ (defrecord MessagePayload [message topic-entity]) +(declare message-payload-schema) + (s/defschema message-payload-schema {:message s/Any :topic-entity s/Keyword diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index 6fd6847e..3b73ab7d 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -10,7 +10,7 @@ [ziggurat.mapper :as mpr] [ziggurat.messaging.connection :refer [connection]] [ziggurat.sentry :refer [sentry-reporter]] - [ziggurat.messaging.util :refer :all] + [ziggurat.messaging.util :refer [prefixed-channel-name prefixed-queue-name]] [ziggurat.metrics :as metrics])) (defn- convert-to-message-payload @@ -24,14 +24,14 @@ (s/validate mpr/message-payload-schema message) (catch Exception e (log/info "old message format read, converting to message-payload: " message) - (let [retry-count (or (:retry-count message) 0) + (let [retry-count (or (:retry-count message) 0) message-payload (mpr/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))] (assoc message-payload :retry-count retry-count))))) (defn convert-and-ack-message "De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message if `ack?` is true." - [ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity] + [ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity] (try (let [message (nippy/thaw payload)] (when ack? @@ -48,8 +48,8 @@ (lb/ack ch delivery-tag)) (defn process-message-from-queue [ch meta payload topic-entity processing-fn] - (let [delivery-tag (:delivery-tag meta) - message-payload (convert-and-ack-message ch meta payload false topic-entity)] + (let [delivery-tag (:delivery-tag meta) + message-payload (convert-and-ack-message ch meta payload false topic-entity)] (when message-payload (log/infof "Processing message [%s] from RabbitMQ " message-payload) (try @@ -109,13 +109,13 @@ (defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity] (lb/qos ch prefetch-count) - (let [consumer-tag (lcons/subscribe ch - queue-name - (message-handler wrapped-mapper-fn topic-entity) - {:handle-shutdown-signal-fn (fn [consumer_tag reason] - (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) - :handle-consume-ok-fn (fn [consumer_tag] - (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})])) + (lcons/subscribe ch + queue-name + (message-handler wrapped-mapper-fn topic-entity) + {:handle-shutdown-signal-fn (fn [consumer_tag reason] + (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) + :handle-consume-ok-fn (fn [consumer_tag] + (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})) (defn start-retry-subscriber* [mapper-fn topic-entity channels] (when (get-in-config [:retry :enabled]) @@ -129,8 +129,9 @@ (defn start-channels-subscriber [channels topic-entity] (doseq [channel channels] (let [channel-key (first channel) - channel-handler-fn (second channel)] - (dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])] + channel-handler-fn (second channel) + worker-count (get-in-config [:stream-router topic-entity :channels channel-key :worker-count] 0)] + (dotimes [_ worker-count] (start-subscriber* (lch/open connection) 1 (prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) diff --git a/src/ziggurat/middleware/default.clj b/src/ziggurat/middleware/default.clj index de33c550..6a288b29 100644 --- a/src/ziggurat/middleware/default.clj +++ b/src/ziggurat/middleware/default.clj @@ -30,7 +30,9 @@ (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) (metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags) nil))) - message)) + (let [{left :left right :right} message] + {:left (deserialise-message left proto-class topic-entity-name) + :right (deserialise-message right proto-class topic-entity-name)}))) (defn protobuf->hash "This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap" diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 4df85c8c..5c2aed0b 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -1,7 +1,6 @@ (ns ziggurat.streams (:require [clojure.tools.logging :as log] [mount.core :as mount :refer [defstate]] - [sentry-clj.async :as sentry] [ziggurat.channel :as chl] [ziggurat.config :refer [ziggurat-config]] [ziggurat.header-transformer :as header-transformer] @@ -16,13 +15,14 @@ [org.apache.kafka.common.serialization Serdes] [org.apache.kafka.common.utils SystemTime] [org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology] - [org.apache.kafka.streams.kstream ValueMapper TransformerSupplier ValueTransformerSupplier] + [org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier] [org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier] [ziggurat.timestamp_transformer IngestionTimeExtractor] [io.opentracing Tracer] [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] [io.opentracing.contrib.kafka TracingKafkaUtils] - [io.opentracing.tag Tags])) + [io.opentracing.tag Tags] + [java.time Duration])) (def default-config-for-stream {:buffered-records-per-partition 10000 @@ -31,6 +31,9 @@ :oldest-processed-message-in-s 604800 :changelog-topic-replication-factor 3 :session-timeout-ms-config 60000 + ;; :consumer-type :default + ;; :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" + ;; :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" :default-key-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde" :default-value-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"}) @@ -46,9 +49,9 @@ "Populates `key-deserializer-encoding`, `value-deserializer-encoding` and `deserializer-encoding` in `properties` only if non-nil." [properties key-deserializer-encoding value-deserializer-encoding deserializer-encoding] - (let [KEY_DESERIALIZER_ENCODING "key.deserializer.encoding" + (let [KEY_DESERIALIZER_ENCODING "key.deserializer.encoding" VALUE_DESERIALIZER_ENCODING "value.deserializer.encoding" - DESERIALIZER_ENCODING "deserializer.encoding"] + DESERIALIZER_ENCODING "deserializer.encoding"] (if (some? key-deserializer-encoding) (.put properties KEY_DESERIALIZER_ENCODING key-deserializer-encoding)) (if (some? value-deserializer-encoding) @@ -126,10 +129,10 @@ (get [_] (header-transformer/create)))) (defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] - (let [service-name (:app-name (ziggurat-config)) + (let [service-name (:app-name (ziggurat-config)) delay-metric-namespace "message-received-delay-histogram" - metric-namespaces [service-name topic-entity-name delay-metric-namespace] - additional-tags {:topic_name topic-entity-name}] + metric-namespaces [service-name topic-entity-name delay-metric-namespace] + additional-tags {:topic_name topic-entity-name}] (.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) (defn- header-transform-values [stream-builder] @@ -144,14 +147,14 @@ (defn- traced-handler-fn [handler-fn channels message topic-entity] (let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer) - span (as-> tracer t - (.buildSpan t "Message-Handler") - (.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) - (.withTag t (.getKey Tags/COMPONENT) "ziggurat") - (if (nil? parent-ctx) - t - (.asChildOf t parent-ctx)) - (.start t))] + span (as-> tracer t + (.buildSpan t "Message-Handler") + (.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) + (.withTag t (.getKey Tags/COMPONENT) "ziggurat") + (if (nil? parent-ctx) + t + (.asChildOf t parent-ctx)) + (.start t))] (try ((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message))) (catch Exception e @@ -160,22 +163,83 @@ (finally (.finish span))))) -(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels] - (let [builder (StreamsBuilder.) - topic-entity-name (name topic-entity) - topic-pattern (Pattern/compile origin-topic)] - (.addStateStore builder (store-supplier-builder)) - (->> (.stream builder topic-pattern) +(defn- join-streams + [stream-1 stream-2] + (let [cfg-1 (:cfg stream-1) + cfg-2 (:cfg stream-2) + strm-1 (:stream stream-1) + strm-2 (:stream stream-2) + join-window (JoinWindows/of (Duration/ofMillis (:join-window cfg-1))) + join-type (:join-type cfg-1) + value-joiner (reify ValueJoiner + (apply [_ left right] + {:left left :right right}))] + {:stream + (if cfg-1 + (case join-type + :left (.leftJoin strm-1 strm-2 value-joiner join-window) + :outer (.outerJoin strm-1 strm-2 value-joiner join-window) + (.join strm-1 strm-2 value-joiner join-window)) + strm-1) + :cfg cfg-2})) + +(defn- map-stream [stream oldest-processed-message-in-s handler-fn channels topic-entity] + (let [topic-entity-name (name topic-entity)] + (->> stream (timestamp-transform-values topic-entity-name oldest-processed-message-in-s) (header-transform-values) (map-values #(log-and-report-metrics topic-entity-name %)) - (map-values #(traced-handler-fn handler-fn channels % topic-entity))) + (map-values #(traced-handler-fn handler-fn channels % topic-entity))))) + +(defn- create-stream [builder topic cfg oldest-processed-message-in-s handler-fn channels topic-entity] + (let [stream (case (:input-type cfg) + ;; :stream (.stream builder topic) + ;; :table (.table builder topic) + (.stream builder topic))] + (map-stream stream oldest-processed-message-in-s handler-fn channels topic-entity) + stream)) + +(defn- stream-joins-topology [handler-fn {:keys [input-topics join-cfg output-topic oldest-processed-message-in-s]} topic-entity channels] + (let [builder (StreamsBuilder.) + ;;topic-patterns (map (fn [input-topic] (Pattern/compile (:name input-topic))) input-topics) + topic-patterns (map :name input-topics) + _ (.addStateStore builder (store-supplier-builder)) + stream-map (map (fn [topic-pattern cfg] {:stream (create-stream builder topic-pattern cfg oldest-processed-message-in-s handler-fn channels topic-entity) :cfg cfg}) topic-patterns (conj join-cfg nil)) + {stream :stream} (reduce join-streams stream-map) + strm (if (instance? org.apache.kafka.streams.kstream.KTable stream) + (.toStream stream) + stream)] + ;; (.addStateStore builder (store-supplier-builder)) + (map-stream strm oldest-processed-message-in-s handler-fn channels topic-entity) + (when output-topic + (.to strm output-topic)) + (.build builder))) + +(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels] + (let [builder (StreamsBuilder.) + topic-pattern (Pattern/compile origin-topic)] + (.addStateStore builder (store-supplier-builder)) + (create-stream builder topic-pattern nil oldest-processed-message-in-s handler-fn channels topic-entity) (.build builder))) (defn- start-stream* [handler-fn stream-config topic-entity channels] - (KafkaStreams. ^Topology (topology handler-fn stream-config topic-entity channels) - ^Properties (properties stream-config) - (new TracingKafkaClientSupplier tracer))) + (let [topology-fn (case (:consumer-type stream-config) + :stream-joins stream-joins-topology + topology) + _ (println (:consumer-type stream-config) ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + top (topology-fn handler-fn stream-config topic-entity channels) + _ (log/info (.describe top))] + (KafkaStreams. ^Topology top + ^Properties (properties stream-config) + (new TracingKafkaClientSupplier tracer)))) + +(defn- merge-stream-joins-config ;; not sure about this, :consumer-type can be from the config.edn or the map from init-fn, please comment! + [config stream] + (case (:consumer-type config) + :stream-joins (assoc config :input-topics (:input-topics stream) :output-topic (:output-topic stream) :join-cfg (:join-cfg stream) :consumer-type (:consumer-type config)) + (case (:consumer-type stream) + :stream-joins (assoc config :input-topics (:input-topics stream) :output-topic (:output-topic stream) :join-cfg (:join-cfg stream) :consumer-type (:consumer-type stream)) + (assoc config :consumer-type :default)))) (defn start-streams ([stream-routes] @@ -183,10 +247,13 @@ ([stream-routes stream-configs] (reduce (fn [streams stream] (let [topic-entity (first stream) - topic-handler-fn (-> stream second :handler-fn) + snd (second stream) + topic-handler-fn (:handler-fn snd) channels (chl/get-keys-for-topic stream-routes topic-entity) stream-config (-> stream-configs (get-in [:stream-router topic-entity]) + (merge-stream-joins-config snd) + ;; (assoc :input-topics (:input-topics snd) :output-topic (:output-topic snd) :join-cfg (:join-cfg snd)) (umap/deep-merge default-config-for-stream)) stream (start-stream* topic-handler-fn stream-config topic-entity channels)] (.start stream) diff --git a/test/ziggurat/middleware/default_test.clj b/test/ziggurat/middleware/default_test.clj index c117e927..d174fdac 100644 --- a/test/ziggurat/middleware/default_test.clj +++ b/test/ziggurat/middleware/default_test.clj @@ -1,10 +1,9 @@ (ns ziggurat.middleware.default-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] [flatland.protobuf.core :as proto] - [sentry-clj.async :as sentry] [ziggurat.fixtures :as fix] [ziggurat.metrics :as metrics] - [ziggurat.middleware.default :refer :all]) + [ziggurat.middleware.default :refer [protobuf->hash]]) (:import (flatland.protobuf.test Example$Photo))) (use-fixtures :once (join-fixtures [fix/mount-only-config @@ -23,6 +22,22 @@ (reset! handler-fn-called? true)))] ((protobuf->hash handler-fn proto-class topic-entity-name) proto-message) (is (true? @handler-fn-called?)))) + (testing "deserialize a message from a stream join" + (let [handler-fn-called? (atom false) + left-message {:id 123 + :path "/path/to/left"} + right-message {:id 456 + :path "/path/to/right"} + proto-class Example$Photo + topic-entity-name "test" + left-proto-message (proto/protobuf-dump (proto/protodef Example$Photo) left-message) + right-proto-message (proto/protobuf-dump (proto/protodef Example$Photo) right-message) + handler-fn (fn [{:keys [left right]}] + (if (and (= left left-message) + (= right right-message)) + (reset! handler-fn-called? true)))] + ((protobuf->hash handler-fn proto-class topic-entity-name) {:left left-proto-message :right right-proto-message}) + (is (true? @handler-fn-called?)))) (testing "When deserialisation fails, it reports to sentry, publishes metrics and passes nil to handler function" (let [handler-fn-called? (atom false) metric-reporter-called? (atom false) @@ -34,15 +49,4 @@ (reset! metric-reporter-called? true))] ((protobuf->hash handler-fn nil topic-entity-name) nil)) (is (true? @handler-fn-called?)) - (is (true? @metric-reporter-called?)))) - (testing "When an already deserialised message is passed to the function it calls the handler fn without altering it" - (let [handler-fn-called? (atom false) - message {:id 7 - :path "/photos/h2k3j4h9h23"} - proto-class Example$Photo - topic-entity-name "test" - handler-fn (fn [msg] - (if (= msg message) - (reset! handler-fn-called? true)))] - ((protobuf->hash handler-fn proto-class topic-entity-name) message) - (is (true? @handler-fn-called?))))) + (is (true? @metric-reporter-called?))))) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index cc010cf0..ad92d2af 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -1,5 +1,5 @@ (ns ziggurat.streams-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest is join-fixtures use-fixtures]] [flatland.protobuf.core :as proto] [ziggurat.streams :refer [start-streams stop-streams]] [ziggurat.fixtures :as fix] @@ -99,6 +99,29 @@ (stop-streams streams) (is (= times @message-received-count)))) +(deftest start-stream-joins-test + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count) + times 6 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn + :consumer-type :stream-joins + :input-topics [{:name "topic"} {:name "another-test-topic"}] + :output-topic "output" + :join-cfg [{:join-window 5000 :join-type :inner}]}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] + (Thread/sleep 10000) ;;waiting for streams to start + (IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic]) + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count)))) + (deftest start-streams-test-with-string-serde (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count json-message)