Skip to content

Commit

Permalink
fix(mock consumer): HighWaterMarkOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
gr8web committed Mar 3, 2023
1 parent 397cee4 commit aa0f7ee
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
2 changes: 1 addition & 1 deletion mocks/consumer.go
Expand Up @@ -343,7 +343,7 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
}

func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
return atomic.LoadInt64(&pc.highWaterMarkOffset)
}

// Pause implements the Pause method from the sarama.PartitionConsumer interface.
Expand Down
5 changes: 5 additions & 0 deletions mocks/consumer_test.go
Expand Up @@ -395,6 +395,11 @@ func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) {
if len(trm.errors) != 0 {
t.Errorf("Expected to not report any errors, found: %v", trm.errors)
}

if pc.HighWaterMarkOffset() != message2.Offset+1 {
diff := pc.HighWaterMarkOffset() - message2.Offset
t.Errorf("Difference between highwatermarkoffset and last message offset greater than 1, got: %v", diff)
}
}

func TestConsumerInvalidConfiguration(t *testing.T) {
Expand Down

0 comments on commit aa0f7ee

Please sign in to comment.