Skip to content

Commit

Permalink
Re-attempt to fetch offsets on broken pipe errors
Browse files Browse the repository at this point in the history
  • Loading branch information
atanasdinov committed May 12, 2022
1 parent fcd34ac commit 7a88ffe
Showing 1 changed file with 35 additions and 21 deletions.
56 changes: 35 additions & 21 deletions consumer_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,35 +126,49 @@ func (m *consumerMonitor) run(ctx context.Context, subscriptionEvents chan *subs
if err != nil {
log.WithError(err).Error("Failed to fetch consumer offsets")

m.scheduler.ticker.Reset(m.scheduler.shortenedInterval)

m.scheduler.failureCount++
if m.scheduler.failureCount >= m.scheduler.maxFailureCount {
log.Errorf("Fetching offsets failed %d times in a row. Consumer status data is stale.",
m.scheduler.failureCount)
if m.scheduler.failureCount < m.scheduler.maxFailureCount {
continue
}

m.clearConsumerStatus()
log.Errorf("Fetching offsets failed %d times in a row. Consumer status data is stale.",
m.scheduler.failureCount)

// It's necessary to restart the connection on broken pipe errors due to a
// bug in the Sarama library: https://github.com/Shopify/sarama/issues/1796
if errors.Is(err, syscall.EPIPE) {
log.Info("Attempting to re-establish monitoring connection...")
if !errors.Is(err, syscall.EPIPE) {
m.clearConsumerStatus()
continue
}

consumerFetcher, topicFetcher, err := newConsumerGroupOffsetFetchers(m.connectionString)
if err != nil {
log.WithError(err).Warn("Failed to establish new monitoring connection")
} else {
// Terminate the old connection and replace it.
_ = m.consumerOffsetFetcher.Close()
// It's necessary to restart the connection on broken pipe errors due to a
// bug in the Sarama library: https://github.com/Shopify/sarama/issues/1796
log.Info("Attempting to re-establish monitoring connection...")

m.consumerOffsetFetcher = consumerFetcher
m.topicOffsetFetcher = topicFetcher
consumerOffsetFetcher, topicOffsetFetcher, err := newConsumerGroupOffsetFetchers(m.connectionString)
if err != nil {
log.WithError(err).Warn("Failed to establish new monitoring connection")

log.Info("Established new monitoring connection")
}
}
m.clearConsumerStatus()
continue
}

m.scheduler.ticker.Reset(m.scheduler.shortenedInterval)
continue
// Terminate the old connection and replace it.
_ = m.consumerOffsetFetcher.Close()

m.consumerOffsetFetcher = consumerOffsetFetcher
m.topicOffsetFetcher = topicOffsetFetcher

log.Info("Established new monitoring connection")

// Re-attempt to fetch offsets and clear the status on failure.
offsets, err = m.fetchOffsets()
if err != nil {
log.WithError(err).Error("Failed to fetch consumer offsets after resetting the connection")

m.clearConsumerStatus()
continue
}
}

log.WithField("offsets", offsets).Debug("Offsets fetched successfully")
Expand Down

0 comments on commit 7a88ffe

Please sign in to comment.