-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for all KafkaProducer configs #143
Add support for all KafkaProducer configs #143
Conversation
The |
project.clj
Outdated
@@ -1,3 +1,7 @@ | |||
(require 'cemerick.pomegranate.aether) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this block required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the time of development, one of the dependencies was relying on a repo that wasn't TLS encrypted, which leiningen doesn't allow. This block of code can re-enable support for unsafe repositories. However, this was supposed to be in there only for the time-being, and I'm not seeing it anymore, so I can remove this now.
:retries 5 | ||
:max-in-flight-requests-per-connection 5 | ||
:enable-idempotence false | ||
:value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why has -class
been added here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this approach generates config field names based on the provided configs, it's required to be consistent with the naming of it.
:max-in-flight-requests-per-connection 5 | ||
:enable-idempotence false | ||
:value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" | ||
:key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why -class
?
:max-in-flight-requests-per-connection 5 | ||
:enable-idempotence false | ||
:value-serializer-class "org.apache.kafka.common.serialization.StringSerializer" | ||
:key-serializer-class "org.apache.kafka.common.serialization.StringSerializer"}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question as above.
src/ziggurat/producer.clj
Outdated
(catch ClassNotFoundException e | ||
false))) | ||
|
||
(spec/def ::key-serializer-class implements-serializer?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This spec library is in the alpha stage of development. Plus, it forces the users to use Clojure 1.10.
Let's just focus on having all KafkaProducer configs available through this PR.
Once a decision is taken on removing backward compatibility for Clojure 1.10, we can include this spec library
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can use schema for validation if there are reservations against using spec.
@@ -98,7 +150,7 @@ | |||
(.flush) | |||
(.close))) | |||
(seq kafka-producers)))) | |||
(log/info "No producers found. Can not initiate stop."))) | |||
(log/info "No producers found.n Can not initiate stop."))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra n.
@murtaza0xFF Have you tested these changes locally? |
Yes, these changes have been tested locally. These changes are cherry-picked from a library we wrote solely for this purpose. We've been using that library for 6 months now. |
@murtaza0xFF How does these changes affect actors with older configurations? Should we implement something that is also backward compatible? |
@macalimlim this will be a breaking change for those actors, unfortunately. I did consider mapping the new config keys to the old ones, but that's a patchy solution to this problem. Let me know if you have something in mind. |
- Remove the workaround for lein which was added to fetch deps from repositories that weren't TLS enrypted"
@murtaza0xFF do you need this feature on your end? |
…kward compatibility
cfa6c19
to
6d2e3bd
Compare
|
||
(def implements-serializer? (s/pred *implements-serializer?* 'implements-serializer?)) | ||
|
||
(s/defschema ProducerConfigSchema {(s/required-key :bootstrap-servers) s/Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
url validator?
(s/optional-key :value-serializer-class) implements-serializer? | ||
(s/optional-key :key-serializer) implements-serializer? | ||
(s/optional-key :value-serializer) implements-serializer? | ||
(s/optional-key :retries-config) s/Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use s/any. put specific validators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @kartik7153, This PR is already delayed a lot. We can address specific validations as refactorings later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not refactoring. This will break if you pass the wrong data types to java client since you are doing s/any.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it won't. All the configs are str'd here: https://github.com/murtaza0xFF/ziggurat/blob/0c56aa6a2681b6db79dbc1d4c9d057c654477fe8/src/ziggurat/producer.clj#L124
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about java client. Everything is str?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
I don't everything is str. Let me know if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kartik7153 yeah I think it makes sense to just validate against the types defined here instead of str'ing everything. Let me know what you think. https://kafka.apache.org/documentation/#producerconfigs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on Slack, I just verified and it seems like the current implementation is correct. Java's Property API requires the key and the value to be set to string. The properties are coerced to the correct types by KafkaProducer internally. The types on the documentation are an indication of what it's parsed to internally, as opposed to the types it expects.
Here is the function responsible for parsing. Notice that each of these functions responsible for parsing expects a string.
Ziggurat presently supports a limited number of
KafkaProducer
configs.This change takes advantage of the fact that there's a pattern to how the
ProducerConfig
's static fields are named. It does this by interpolating the user-defined configs into the fully-qualified package name of theProducerConfig
static fields after processing them to match the case of the original fields, and then evaluating them to assign the config values.