Skip to content

Commit

Permalink
upgrade kafka stream library to 2.8
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed May 6, 2021
1 parent 1ccb2f0 commit 6733145
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 14 deletions.
6 changes: 3 additions & 3 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
[io.opentracing.contrib/opentracing-kafka-client "0.1.15" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-rabbitmq-client "0.1.5" :exclusions [com.rabbitmq/amqp-client]]
[org.apache.httpcomponents/fluent-hc "4.5.4"]
[org.apache.kafka/kafka-streams "2.7.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-streams "2.8.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.logging.log4j/log4j-core "2.12.1"]
[org.apache.logging.log4j/log4j-slf4j-impl "2.12.1"]
[org.clojure/clojure "1.10.0"]
Expand Down Expand Up @@ -73,8 +73,8 @@
:dependencies [[com.google.protobuf/protobuf-java "3.5.1"]
[junit/junit "4.12"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka-streams "2.7.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.7.0" :classifier "test"]
[org.apache.kafka/kafka-streams "2.8.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test"]
[org.clojure/test.check "0.10.0"]]
:plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]]
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
Expand Down
14 changes: 3 additions & 11 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@
(metrics/multi-ns-increment-count multi-namespaces metric additional-tags))
message)

(defn store-supplier-builder []
(KeyValueStoreBuilder. (RocksDbKeyValueBytesStoreSupplier. "state-store" true)
(Serdes/ByteArray)
(Serdes/ByteArray)
(SystemTime.)))

(defn- value-mapper [f]
(reify ValueMapper
(apply [_ v] (f v))))
Expand All @@ -95,17 +89,17 @@
delay-metric-namespace "message-received-delay-histogram"
metric-namespaces [service-name topic-entity-name delay-metric-namespace]
additional-tags {:topic_name topic-entity-name}]
(.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))
(.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array String []))))

(defn- stream-joins-delay-metric [topic topic-entity-name oldest-processed-message-in-s stream-builder]
(let [service-name (:app-name (ziggurat-config))
delay-metric-namespace "stream-joins-message-received-delay-histogram"
metric-namespaces [service-name topic-entity-name delay-metric-namespace]
additional-tags {:topic-name topic-entity-name :input-topic topic :app-name service-name}]
(.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))
(.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array String []))))

(defn- header-transform-values [stream-builder]
(.transformValues stream-builder (header-transformer-supplier) (into-array [(.name (store-supplier-builder))])))
(.transformValues stream-builder (header-transformer-supplier) (into-array String [])))

(declare stream)

Expand Down Expand Up @@ -186,7 +180,6 @@
"Its contract can change in the future releases of Ziggurat."
"Please refer to the README doc for more information.")
(let [builder (StreamsBuilder.)
_ (.addStateStore builder (store-supplier-builder))
stream-map (map (fn [[topic-key topic-value] [_ cfg]]
(let [topic-name (:name topic-value)]
{:stream (.stream builder topic-name) :cfg cfg :topic-name topic-name :topic-key topic-key})) input-topics (assoc join-cfg :end nil))
Expand All @@ -200,7 +193,6 @@
(let [builder (StreamsBuilder.)
topic-entity-name (name topic-entity)
topic-pattern (Pattern/compile origin-topic)]
(.addStateStore builder (store-supplier-builder))
(->> (.stream builder topic-pattern)
(timestamp-transform-values topic-entity-name oldest-processed-message-in-s)
(header-transform-values)
Expand Down

0 comments on commit 6733145

Please sign in to comment.