-
Notifications
You must be signed in to change notification settings - Fork 1
Conversation
if err := h.handleMessage(ctx, msg, claim); err != nil { | ||
if c.consumer.config.Discarded != nil { | ||
c.consumer.config.Discarded(msg, err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't that naming imply that the message will always get "rejected"? How would retries get handled for instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new handler API, the handler itself can handle retries - in my view retrying is more domain-specific and hence closer to kafka-go's domain than felice's. The plan is to add a RetryHandler type (essentially middleware for handlers) that will handle retries like the current logic does, but because it's just an arbitrary wrapper, it becomes easy to tweak the behaviour (for example to retry for a period depending on the message contents).
The default behaviour of kafka-go would be to use a RetryHandler (the API is planned to remain backwardly compatible). This is my current idea as to what the RetryHandler API might look like:
// RetryHandler implements Handler by retrying
type RetryHandler struct {
// Handler holds the handler that will be retried.
Handler Handler
// Strategy is used to determine how the handler
// retries HandleMessage calls. If there are no more available
// attempts, the last error returned from HandleMessage
// will be returnefd
Strategy func(m *Message) retry.Strategy
// Logf is used to log a message when an attempt
// to send the message failed.
Logf func(format string, args ...interface{})
// IsFatal reports whether the given
// error should be considered fatal. Fatal
// errors will cause the retry loop to abort.
IsFatal func(err error) bool
}
func (h RetryHandler) HandleMessage(ctx context.Context, m *Message) error
On reflection, I designed that a while ago, and if this logic lives in kafka-go, the Logf field is probably unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense! 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(re: Strategy I would maybe pass both message and error to the strategy func and let it decide which error to return)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there's bound to be room for changing things there. We'll probably look at some real scenarios and see how it fits. Also, the nice thing about this is it doesn't have to be one-size-fits-all - if you have a special requirement, you can always just roll your own handler or middleware.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for that @rogpeppe, I left you few questions.
c.handlers.Set(topic, HandlerConfig{ | ||
Handler: h, | ||
Converter: converter, | ||
c.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why do we need a mutex here. Handle
will never be called concurrently IIUC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we call it within the go routines (one per parition)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Handle
is called to setup the handlers between creating the Consumer and calling Serve
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just for consistency - we document that the mutex guards the consumerGroup field, so it's good to use it for access to that, even if we don't think that Handle will ever be called concurrently.
consumer/consumer.go
Outdated
// can achieve by calling Close. It returns an error only if Close | ||
// was called and the consumer shut down cleanly. | ||
// | ||
// Serve will return an error if it is called more than once, | ||
// the Consumer has been closed, or no handlers have | ||
// been registered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure to understand well when Serve
should return an error or not. I think that this comment can be improved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improved comment
func (h *topicHandler) handleMessage( | ||
ctx context.Context, | ||
sm *sarama.ConsumerMessage, | ||
claim sarama.ConsumerGroupClaim, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case the sarama.ConsumerGroupClaim
parameter is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the claim so that the Message type can implement HighWaterMarkOffset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. A few small comments inline.
// KafkaAddrs holds kafka brokers addresses. There must be at least | ||
// one entry in the slice. | ||
// The default value is "localhost:9092". | ||
// KafkaAddrs holds the kafka broker addresses in host:port |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🌷 Incidentally, I'd love to know if there's a proper name for "host:port" format. I know it's defined as part of URI, but it is used outside that context so often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Me too. In previous places, I've tried be consistent and use "HostPort" and "Host" where appropriate, but URLs mess everything up by allowing both :)
consumer/consumer.go
Outdated
quit chan struct{} | ||
// consumerGroup holds the consumerGroup started by Serve. | ||
consumerGroup sarama.ConsumerGroup | ||
// handlers holds the set of handlers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment isn't really adding much, IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
c.handlers.Set(topic, HandlerConfig{ | ||
Handler: h, | ||
Converter: converter, | ||
c.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we call it within the go routines (one per parition)?
consumer/consumer.go
Outdated
// Serve runs the consumer and listens for new messages on the given | ||
// topics. Serve will block until it is instructed to stop, which you | ||
// can achieve by calling Close. It returns an error only if Close | ||
// was called and the consumer shut down cleanly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This second sentence here is somewhat surprising.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean you'd expect "Close" to be named "Stop" ?
You may be right in that case, but I generally think that Close
is the name we use in Go for closing resources, and this is really just another resource to be cleaned up.
// h := handler.HandlerFunc(func(msg *consumer.Message) error { | ||
// fmt.Printf("%+v", *msg) | ||
// h := handler.HandlerFunc(func(ctx context.Context, m *Message) error { | ||
// fmt.Printf("%+v", msg) | ||
// }) | ||
package consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we're shifting responsibility for error handling and retry to then user now, perhaps we should add some indications of how that is achieved in the docs here.
We'd like to make it easier to have a more flexible strategy for retrying message sending (e.g. retry only on some errors or for a message-dependant time), so remove the retry logic from felice, leaving it up to the caller to determine. This means that we need to pass the Context to the handler (so that a retrying handler can know when to abort when the context is done) and also enough information that appropriate metrics can be sent. Doing this means we can also simplify the configuration, because we don't need to log or send metrics here any more (that can be done in handler wrappers themselves). We also a address couple of issues in the existing implementation: - there is currently a race between Serve and Stop - Serve needs to be called in its own goroutine, so there's a race between setting Consumer.consumer and Stop calling Consumer.consumer.Close. - Return.Errors is set to true, but there's nothing that reads from the errors channel, which makes that pointless. - All the other types use Close instead of Stop, so we rename Stop to Close accordingly. - Don't require the full config for creating a MessageConverterV1. Fixes issue #50.
2f73a59
to
1d0a529
Compare
We'd like to make it easier to have a more flexible strategy for
retrying message sending (e.g. retry only on some errors
or for a message-dependant time), so remove
the retry logic from felice, leaving it up to the caller
to determine.
This means that we need to pass the Context to the handler
(so that a retrying handler can know when to abort when the
context is done) and also enough information that appropriate
metrics can be sent.
Doing this means we can also simplify the configuration, because
we don't need to log or send metrics here any more (that can
be done in handler wrappers themselves).
We also a address couple of issues in the existing implementation:
be called in its own goroutine, so there's a race between setting
Consumer.consumer and Stop calling Consumer.consumer.Close.
Fixes issue #50.