Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
made error handling in PartitionModeConsumer conform to docs (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevevls authored and dim committed Sep 18, 2018
1 parent 3a32293 commit f7d869d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 22 deletions.
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32,
// Start partition consumer goroutine
tomb.Go(func(stopper <-chan none) {
if c.client.config.Group.Mode == ConsumerModePartitions {
pc.waitFor(stopper, c.errors)
pc.waitFor(stopper)
} else {
pc.multiplex(stopper, c.messages, c.errors)
}
Expand Down
26 changes: 5 additions & 21 deletions partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,12 @@ func (c *partitionConsumer) Close() error {
return c.closeErr
}

func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
defer close(c.dead)

for {
select {
case err, ok := <-c.Errors():
if !ok {
return
}
select {
case errors <- err:
case <-stopper:
return
case <-c.dying:
return
}
case <-stopper:
return
case <-c.dying:
return
}
func (c *partitionConsumer) waitFor(stopper <-chan none) {
select {
case <-stopper:
case <-c.dying:
}
close(c.dead)
}

func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
Expand Down

0 comments on commit f7d869d

Please sign in to comment.