From 01c0fa977163f1169266e4521656cb8ec5f2e81c Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Mon, 17 Jul 2023 22:56:36 -0400 Subject: [PATCH] Enforce a context --- kafka/group.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/group.go b/kafka/group.go index 58ab4da..62816b9 100644 --- a/kafka/group.go +++ b/kafka/group.go @@ -15,7 +15,7 @@ type Config struct { } type wrap[A any] struct { - handler func(A) error + handler func(context.Context, A) error logger *zerolog.Logger } @@ -29,7 +29,7 @@ func (w *wrap[A]) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama var a A if err := json.Unmarshal(msg.Value, &a); err != nil { w.logger.Err(err).Msg("Failed unmarshaling message.") - } else if err := w.handler(a); err != nil { + } else if err := w.handler(session.Context(), a); err != nil { w.logger.Err(err).Msg("Error processing message.") } session.MarkMessage(msg, "") @@ -39,7 +39,7 @@ func (w *wrap[A]) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama } } -func Consume[A any](ctx context.Context, config Config, handler func(A) error, logger *zerolog.Logger) error { +func Consume[A any](ctx context.Context, config Config, handler func(context.Context, A) error, logger *zerolog.Logger) error { g, err := sarama.NewConsumerGroup(config.Brokers, config.Group, nil) if err != nil { return err