Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making key and value serdes configurable for enabling JSON (de)serialization #99

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,48 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:with-key-val-serdes {:application-id "test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have you added a separate topic-entity for this. Why not mock config in the test and add or remove values there? This is causing unnecessary repitition in config of this file.

:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:key-deserializer-encoding "UTF8"
:value-deserializer-encoding "UTF8"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}

:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}

61 changes: 40 additions & 21 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,46 @@
:enabled [true :bool]}
:http-server {:port [8010 :int]
:thread-count [100 :int]}
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:stream-router {:default {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:with-key-val-serdes {:application-id "test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"
:key-deserializer-encoding "UTF8"
:value-deserializer-encoding "UTF8"
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
:max-in-flight-requests-per-connection 5
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:without-producer {:application-id "test"
:bootstrap-servers "localhost:9092"
:stream-threads-count [1 :int]
:origin-topic "topic"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}
45 changes: 45 additions & 0 deletions src/ziggurat/middleware/json.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
(ns ziggurat.middleware.json
"This namespace defines middleware methods for parsing JSON strings.
Please see [Ziggurat Middleware](https://github.com/gojek/ziggurat#middleware-in-ziggurat) for more details.
"
(:require [cheshire.core :refer :all]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use namespaced function names instead of :refer :all https://github.com/bbatsov/clojure-style-guide#prefer-require-over-use

[sentry-clj.async :as sentry]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.metrics :as metrics]))

(defn- deserialize-json
[message topic-entity-name key-fn]
(try
(parse-string message key-fn)
(catch Exception e
(let [additional-tags {:topic_name topic-entity-name}
default-namespace "json-message-parsing"]
(sentry/report-error sentry-reporter e (str "Could not parse JSON message " message))
(metrics/increment-count default-namespace "failed" additional-tags)
nil))))

(defn parse-json
"This method returns a function which deserializes the provided message before processing it.
It uses `deserialize-json` defined in this namespace to parse JSON strings.

Takes following arguments:
`handler-fn` : A function which would process the parsed JSON string
`topic-entity-name` : Stream route topic entity (as defined in config.edn). It is only used for publishing metrics.
`key-fn` : key-fn can be either true, false of a generic function to transform keys in JSON string.
If `true`, would coerce keys to keywords, and if `false` it would leave keys as strings.
Default value is true.

An Example
For parsing a JSON string `\"{\"foo\":\"bar\"}\"` to `{\"foo\":\"bar\"}`, call the function with `key-fn` false
`(parse-json (fn [message] ()) \"topic\" false)`

For parsing a JSON string `\"{\"foo\":\"bar\"}\"` to `{:foo:\"bar\"}`, call the function without passing `key-fn`,
whose default value is true.
`(parse-json (fn [message] ()) \"topic\")`

"
([handler-fn topic-entity-name]
(parse-json handler-fn topic-entity-name true))
([handler-fn topic-entity-name key-fn]
(fn [message]
(handler-fn (deserialize-json message topic-entity-name key-fn)))))
Loading