-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP-42 Add producer and consumer interceptors #1730
Conversation
5664afe
to
4c63884
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks alright to me.
This PR includes: Producer: `onSend` but it doesn't implement `onAcknowledgement` Consumer: `onConsume` but it doesn't implement `onCommit` I'm not sure if I need to add the `onClose` method. Maybe in another iteration ¯\_(ツ)_/¯
f10663f
to
a299948
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@d1egoaz approved, changes LGTM. It would be good to have something new in the |
@dnwe for sure, I'll add an example soon |
KIP-42 Add producer and consumer interceptors
This PR includes:
Producer:
OnSend
but it doesn't implementonAcknowledgement
Consumer:
OnConsume
but it doesn't implementonCommit
I tried on purpose to not follow the Java design too closely. This PR tries to follow Go idioms to the degree possible.
I didn't add an
onClose
method on the interface, Go has other alternatives for this if needed.for:
specifically the section:
My implementation does the following, as I'm passing the value by reference:
If one of the interceptors in the list throws an exception from onSend(), the exception is caught, logged,and the next interceptor is called
with the record returned by the last successful interceptor in the list.The interceptor method needs to make sure to only modify the object when it's safe to do so, otherwise it can modify the object and then fail, which will make the next interceptor to use a modified value from maybe a non successful interceptor.
For some reason, Sarama
ProducerMessage
contains a channel, which makes hard to clone this object.Another alternative will be to make a copy of the body, headers, topic, partition data, and if the interceptor call fails, revert the message to this original data, but I'm not sure if it worth the effort, as anyone working with go know that if you modify an object passed as a reference, it'll modify the source object.
Closes #1710