Skip to content

Commit

Permalink
Add option to propagate OffsetOutOfRange error
Browse files Browse the repository at this point in the history
When consuming a partition using a consumer group, the code
handles ErrOffsetOutOfRange errors by resetting to the "initial"
position, as specified by user (i.e. either oldest or newest available
offset). This, however, can be very dangerous. Say a consumer has
consumed up to offset 100 on replica A but replica B has only
replicated up to offset 99 due to temporary under-replication. During
a rebalance, sarama can end up with an offset out-of-range error if it
fetches partition metadata from replica B since the desired offset of
100 is greater than the newest offset of 99. The sarama consumer would
reset the offset in this case, which can cause reprocessing of old
data, especially if the initial offset is configured as "oldest".

This commit adds a config flag to disable this automatic reset. In the
above case, the consumer will be able to proceed normally after the
data replicates.
  • Loading branch information
Muir Manders authored and Daria Kolistratova committed Jun 16, 2022
1 parent 23c4286 commit 0340fb6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
10 changes: 10 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,17 @@ type Config struct {
// coordinator for the group.
UserData []byte
}

// support KIP-345
InstanceId string

// If true, consumer offsets will be automatically reset to configured Initial value
// if the fetched consumer offset is out of range of available offsets. Out of range
// can happen if the data has been deleted from the server, or during situations of
// under-replication where a replica does not have all the data yet. It can be
// dangerous to reset the offset automatically, particularly in the latter case. Defaults
// to true to maintain existing behavior.
ResetInvalidOffsets bool
}

Retry struct {
Expand Down Expand Up @@ -499,6 +508,7 @@ func NewConfig() *Config {
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
c.Consumer.Group.ResetInvalidOffsets = false

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
Expand Down
3 changes: 2 additions & 1 deletion consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,8 @@ type consumerGroupClaim struct {

func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
if errors.Is(err, ErrOffsetOutOfRange) {

if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets {
offset = sess.parent.config.Consumer.Offsets.Initial
pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
}
Expand Down

0 comments on commit 0340fb6

Please sign in to comment.