Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Json middleware #104

Merged
merged 8 commits into from
Oct 22, 2019
Merged

Json middleware #104

merged 8 commits into from
Oct 22, 2019

Conversation

mjayprateek
Copy link
Contributor

This PR achieves the following:

  1. Introduces JSON middleware
  2. Externalizes Kafka Serde and String encoding configs

@mjayprateek mjayprateek force-pushed the json_middleware branch 3 times, most recently from 0d3e4ab to 3c484ee Compare October 16, 2019 11:54
:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:using-string-serde {:application-id "test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we duplicating config here, why can't we redef it in the tests? This way the test will also be independent of the config file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it. But, adding/deleting existing configs looks messy in the code. Creating a new config simply allows us to just use the keyword everywhere in the tests.

That way config definition can be separated from its usage. But, I agree that more than two would make for a bad config file.

Additionally, these being reference configs I see the presence of multiple configs useful for a new user who can instantly see how Ziggurat supports multiple streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is it messy? There is just one redef that you need to do for it.

Again if we agree that more than 2 will not look good, then what happens when there is a need for another topic-entity in the tests? Do we use redef there? If we do, won't the tests then become inconsistent?

We can always have good documentation for users to understand the usage, instead of duplicating values in our code.

Copy link
Contributor Author

@mjayprateek mjayprateek Oct 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. But, we had two configs (:without-producer) even before this change. At least, this change is consistent with the previous code in the number of configs. So, I would argue that I haven't added a new config here. Just changed the old one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


setup:
docker-compose down
lein deps
docker-compose up -d
sleep 10
docker exec -it ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
docker exec -it ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(another_test_topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we redef the config, this creation of another topic will not be required

:enable-idempotence false
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}
:using-string-serde {:application-id "test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@@ -31,6 +31,10 @@
:oldest-processed-message-in-s 604800
:changelog-topic-replication-factor 3})

(def KEY_DESERIALIZER_ENCODING "key.deserializer.encoding")
(def VALUE_DESERIALIZER_ENCODING "value.deserializer.encoding")
(def DESERIALIZER_ENCODING "deserializer.encoding")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of def, define these using let in the function required

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done.

(if (some? value-deserializer-encoding)
(.put properties VALUE_DESERIALIZER_ENCODING value-deserializer-encoding))
(if (some? deserializer-encoding)
(.put properties DESERIALIZER_ENCODING deserializer-encoding)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the default values for these?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this config only valid when the serializer is a string serializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value is "UTF8" which is hard-coded in both StringSerializer and StringDeserializer. Hence, we don't need to provide our own. Please see [1], for example.

Yes, only StringSerializer uses this configuration.

[1] https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java

(defn- validate-auto-offset-reset-config
[auto-offset-reset-config]
(if-not (contains? #{"latest" "earliest" nil} auto-offset-reset-config)
(throw (ex-info "Stream offset can only be latest or earliest" {:offset auto-offset-reset-config}))))

(defn- get-serde [default-serde]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use the default config obtained using the deep-merge instead of defining a new function here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"This namespace defines middleware methods for parsing JSON strings.
Please see [Ziggurat Middleware](https://github.com/gojek/ziggurat#middleware-in-ziggurat) for more details.
"
(:require [cheshire.core :refer :all]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of :refer :all only refer the functions required https://github.com/bbatsov/clojure-style-guide#prefer-require-over-use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


"
([handler-fn topic-entity-name]
(parse-json handler-fn topic-entity-name true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is expecting (name topic-entity) i.e. a string rather than the topic-entity. By default users always pass topic-entity. We should start expecting topic-entity and then convert it inside.
Or expect any of string or keyword and handle it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is topic-entity only. I've added that -name suffix. I'll remove that to prevent any confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

(try
(parse-string message key-fn)
(catch Exception e
(let [additional-tags {:topic_name topic-entity}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the topic-entity that is being passed here, is that a string or a keyword? We want to send a string in the metric right? thats what I was referring to in the previous comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Done.

@@ -51,20 +67,26 @@
buffered-records-per-partition
commit-interval-ms
upgrade-from
changelog-topic-replication-factor]}]
changelog-topic-replication-factor
default-key-serde
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename this to something that makes more sense?
If we wish to keep it the same as kafka, then it should be default-key-serde-class.
I'd recommend to keep it to something else like key-serde-class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Actually, default-key-serde will be consistent with default.key.serde config defined here: https://docs.confluent.io/current/streams/developer-guide/config-streams.html.

@mjayprateek mjayprateek merged commit 6448a3f into gojek:master Oct 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants