Skip to content

Commit

Permalink
Ensure events get handled in the right order (#47)
Browse files Browse the repository at this point in the history
Resolves #40
  • Loading branch information
corvus-ch committed Nov 5, 2018
1 parent 038d0ec commit cf17cd9
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
7 changes: 7 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consumer
import (
"context"
"fmt"
"sync"

"github.com/bketelsen/logr"
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
Expand All @@ -18,6 +19,9 @@ type Consumer struct {
Processor processor.Processor
Log logr.Logger
canceled bool

// wg is used to ensure NotifyClose() gets handled before the consumer exits.
wg sync.WaitGroup
}

// New creates a new consumer instance. The setup of the amqp connection and channel is expected to be done by the
Expand Down Expand Up @@ -102,6 +106,7 @@ func (c *Consumer) consume(msgs <-chan amqp.Delivery, done chan error) {
return
}
}
c.wg.Wait()
done <- nil
}

Expand Down Expand Up @@ -129,13 +134,15 @@ func (c *Consumer) Close() error {
// The chan provided will be closed when the Channel is closed and on a Graceful close, no error will be sent.
func (c *Consumer) NotifyClose(receiver chan error) chan error {
if c.Channel != nil {
c.wg.Add(1)
realChan := make(chan *amqp.Error)
c.Channel.NotifyClose(realChan)

go func() {
for {
err, ok := <-realChan
if !ok {
c.wg.Done()
return
}
receiver <- err
Expand Down
30 changes: 30 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,36 @@ func TestEndToEnd(t *testing.T) {
})
}

var closeTests = []struct {
name string
closeArgs []string
}{
{"connection", []string{"exec", "-T", "rabbitmq", "/bin/bash", "-c", "until rabbitmqadmin list connections | grep -v 'No items' > /dev/null; do sleep 1; done; eval $(rabbitmqadmin list connections -f kvp); rabbitmqadmin close connection name=\"${name}\""}},
{"shutdown", []string{"stop"}},
}

func TestConnectionClose(t *testing.T) {
for _, test := range closeTests {
t.Run(test.name, func(t *testing.T) {
conn, ch := prepare(t)

args := []string{"-V", "-no-datetime", "-e", command, "-c", "fixtures/default.conf"}
cmd, stdout, _ := startConsumer(t, []string{}, args...)
waitForOutput(t, stdout, "Waiting for messages...")

conn.Close()
ch.Close()

stop := exec.Command("docker-compose", test.closeArgs...)
if err := stop.Run(); err != nil {
t.Fatalf("failed to close connection/shutdown server: %v", err)
}

assert.EqualError(t, cmd.Wait(), "exit status 10")
})
}
}

func assertOutput(t *testing.T, stdout, stderr *bytes.Buffer) {
goldie.Assert(t, t.Name()+"Output", bytes.Trim(stdout.Bytes(), "\x00"))
goldie.Assert(t, t.Name()+"Error", bytes.Trim(stderr.Bytes(), "\x00"))
Expand Down

0 comments on commit cf17cd9

Please sign in to comment.