Skip to content

Commit

Permalink
Enforce a context
Browse files Browse the repository at this point in the history
  • Loading branch information
elffjs committed Jul 18, 2023
1 parent a940420 commit 01c0fa9
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions kafka/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Config struct {
}

type wrap[A any] struct {
handler func(A) error
handler func(context.Context, A) error
logger *zerolog.Logger
}

Expand All @@ -29,7 +29,7 @@ func (w *wrap[A]) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama
var a A
if err := json.Unmarshal(msg.Value, &a); err != nil {
w.logger.Err(err).Msg("Failed unmarshaling message.")
} else if err := w.handler(a); err != nil {
} else if err := w.handler(session.Context(), a); err != nil {
w.logger.Err(err).Msg("Error processing message.")
}
session.MarkMessage(msg, "")
Expand All @@ -39,7 +39,7 @@ func (w *wrap[A]) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama
}
}

func Consume[A any](ctx context.Context, config Config, handler func(A) error, logger *zerolog.Logger) error {
func Consume[A any](ctx context.Context, config Config, handler func(context.Context, A) error, logger *zerolog.Logger) error {
g, err := sarama.NewConsumerGroup(config.Brokers, config.Group, nil)
if err != nil {
return err
Expand Down

0 comments on commit 01c0fa9

Please sign in to comment.