Skip to content

Commit

Permalink
feat: add functionality pause and resume for consumer (#89), closes (#77
Browse files Browse the repository at this point in the history
)

* feat: add functionality pause and resume for consumer

* chore: typo

* refactor: extract to the base

* feat: add unit tests

* feat: add pause/resume integration tests

* chore: add documentation

* refactor: convert pause ch in order to save cpu

* chore: fix tests

* chore: lint
  • Loading branch information
Abdulsametileri committed Jan 15, 2024
1 parent 4f33fc4 commit d0a5b26
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 5 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ manager ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer)).
- Added ability for manipulating kafka message headers.
- Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
- Enable manuel commit at both single and batch consuming modes.
- Enabling consumer resume/pause functionality. Please refer to [its example](examples/with-pause-resume-consumer) and
[how it works](examples/with-pause-resume-consumer/how-it-works.md) documentation.
- Bumped [kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer/releases) to the latest version:
- Backoff strategy support (linear, exponential options)
- Added message key for retried messages
Expand Down Expand Up @@ -197,6 +199,10 @@ After running `docker-compose up` command, you can run any application you want.

Please refer to [Tracing Example](examples/with-tracing/README.md)

#### With Pause & Resume Consumer

Please refer to [Pause Resume Example](examples/with-pause-resume-consumer)

#### With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can
Expand Down
8 changes: 8 additions & 0 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type batchConsumer struct {
messageGroupLimit int
}

func (b *batchConsumer) Pause() {
b.base.Pause()
}

func (b *batchConsumer) Resume() {
b.base.Resume()
}

func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg, cfg.BatchConfiguration.MessageGroupLimit*cfg.Concurrency)
if err != nil {
Expand Down
50 changes: 50 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"errors"
"reflect"
"strconv"
Expand Down Expand Up @@ -354,6 +355,55 @@ func Test_batchConsumer_chunk(t *testing.T) {
}
}

func Test_batchConsumer_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
bc := batchConsumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
},
}

go func() {
<-bc.base.pause
}()

// When
bc.Pause()

// Then
if bc.base.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_batchConsumer_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
bc := batchConsumer{
base: &base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
},
}

// When
bc.Resume()

// Then
if bc.base.consumerState != stateRunning {
t.Fatal("consumer state must be in resume!")
}
}

func createMessages(partitionStart int, partitionEnd int) []*Message {
messages := make([]*Message, 0)
for i := partitionStart; i < partitionEnd; i++ {
Expand Down
9 changes: 9 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ type consumer struct {
consumeFn func(*Message) error
}

func (c *consumer) Pause() {
c.base.Pause()
}

func (c *consumer) Resume() {
c.base.Resume()
}

func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg, cfg.Concurrency)
if err != nil {
Expand All @@ -40,6 +48,7 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {

func (c *consumer) Consume() {
go c.subprocesses.Start()

c.wg.Add(1)
go c.startConsume()

Expand Down
55 changes: 53 additions & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ type Consumer interface {
// Consume starts consuming
Consume()

// Pause function pauses consumer, it is stop consuming new messages
Pause()

// Resume function resumes consumer, it is start to working
Resume()

// WithLogger for injecting custom log implementation
WithLogger(logger LoggerInterface)

Expand All @@ -33,6 +39,13 @@ type Reader interface {
CommitMessages(messages []kafka.Message) error
}

type state string

const (
stateRunning state = "running"
statePaused state = "paused"
)

type base struct {
cronsumer kcronsumer.Cronsumer
api API
Expand All @@ -42,6 +55,7 @@ type base struct {
r Reader
cancelFn context.CancelFunc
metric *ConsumerMetric
pause chan struct{}
quit chan struct{}
messageProcessedStream chan struct{}
incomingMessageStream chan *IncomingMessage
Expand All @@ -56,6 +70,7 @@ type base struct {
retryEnabled bool
transactionalRetry bool
distributedTracingEnabled bool
consumerState state
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand All @@ -79,6 +94,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
metric: &ConsumerMetric{},
incomingMessageStream: make(chan *IncomingMessage, messageChSize),
quit: make(chan struct{}),
pause: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
transactionalRetry: *cfg.TransactionalRetry,
Expand All @@ -90,6 +106,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
messageProcessedStream: make(chan struct{}, cfg.Concurrency),
singleConsumingStream: make(chan *Message, cfg.Concurrency),
batchConsumingStream: make(chan []*Message, cfg.Concurrency),
consumerState: stateRunning,
}

if cfg.DistributedTracingEnabled {
Expand Down Expand Up @@ -125,6 +142,9 @@ func (c *base) startConsume() {

for {
select {
case <-c.pause:
c.logger.Debug("startConsume exited!")
return
case <-c.quit:
close(c.incomingMessageStream)
return
Expand Down Expand Up @@ -153,17 +173,48 @@ func (c *base) startConsume() {
}
}

func (c *base) Pause() {
c.logger.Info("Consumer is paused!")

c.cancelFn()

c.pause <- struct{}{}

c.consumerState = statePaused
}

func (c *base) Resume() {
c.logger.Info("Consumer is resumed!")

c.pause = make(chan struct{})
c.context, c.cancelFn = context.WithCancel(context.Background())
c.consumerState = stateRunning

c.wg.Add(1)
go c.startConsume()
}

func (c *base) WithLogger(logger LoggerInterface) {
c.logger = logger
}

func (c *base) Stop() error {
c.logger.Debug("Stop called!")
c.logger.Info("Stop is called!")

var err error
c.once.Do(func() {
c.subprocesses.Stop()
c.cancelFn()
c.quit <- struct{}{}

// In order to save cpu, we break startConsume loop in pause mode.
// If consumer is pause mode and Stop is called
// We need to close incomingMessageStream, because c.wg.Wait() blocks indefinitely.
if c.consumerState == stateRunning {
c.quit <- struct{}{}
} else if c.consumerState == statePaused {
close(c.incomingMessageStream)
}

c.wg.Wait()
err = c.r.Close()
})
Expand Down
55 changes: 52 additions & 3 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"time"

"github.com/google/go-cmp/cmp"

"github.com/segmentio/kafka-go"
)

func Test_base_startConsume(t *testing.T) {
t.Run("Return_When_Quit_Signal_Is_Came", func(t *testing.T) {
mc := mockReader{wantErr: true}
b := base{
wg: sync.WaitGroup{}, r: &mc,
wg: sync.WaitGroup{},
r: &mc,
incomingMessageStream: make(chan *IncomingMessage),
quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
logger: NewZapLogger(LogLevelError),
consumerState: stateRunning,
}
b.context, b.cancelFn = context.WithCancel(context.Background())

Expand Down Expand Up @@ -57,6 +59,53 @@ func Test_base_startConsume(t *testing.T) {
})
}

func Test_base_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
}
go func() {
<-b.pause
}()

// When
b.Pause()

// Then
if b.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_base_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
}

// When
b.Resume()

// Then
if b.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
if ctx == b.context {
t.Fatal("contexts must be differ!")
}
}

type mockReader struct {
wantErr bool
}
Expand Down
50 changes: 50 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package kafka

import (
"context"
"errors"
"sync"
"testing"
)

Expand Down Expand Up @@ -114,3 +116,51 @@ func Test_consumer_process(t *testing.T) {
}
})
}

func Test_consumer_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
c := consumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
},
}
go func() {
<-c.base.pause
}()

// When
c.Pause()

// Then
if c.base.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_consumer_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
c := consumer{
base: &base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
},
}

// When
c.Resume()

// Then
if c.base.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
}

0 comments on commit d0a5b26

Please sign in to comment.