Skip to content

Commit

Permalink
context.Done() may never reach if waiting on r.incoming <- msgErr
Browse files Browse the repository at this point in the history
Signed-off-by: nbajaj90 <nbajaj90@gmail.com>
  • Loading branch information
nbajaj90 committed Sep 13, 2023
1 parent 754a26b commit 6aa2742
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions protocol/kafka_sarama/v2/receiver.go
Expand Up @@ -66,15 +66,23 @@ func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
return nil
}
m := NewMessageFromConsumerMessage(msg)

r.incoming <- msgErr{
msgErrObj := msgErr{
msg: binding.WithFinish(m, func(err error) {
if protocol.IsACK(err) {
session.MarkMessage(msg, "")
}
}),
}

// Need to use select clause here, otherwise r.incoming <- msgErrObj can become a blocking operation,
// resulting in never reaching outside block's case <-session.Context().Done()
select {
case r.incoming <- msgErrObj:
// do nothing
case <-session.Context().Done():
return nil
}

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
Expand Down

0 comments on commit 6aa2742

Please sign in to comment.