Skip to content

Commit

Permalink
Move Acknowledger to its own package (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Häusler committed Feb 27, 2018
1 parent e3978cb commit 4f4a5f5
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 98 deletions.
3 changes: 3 additions & 0 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ checks:
method-complexity:
config:
threshold: 10
method-length:
config:
threshold: 32

plugins:
gofmt:
Expand Down
40 changes: 40 additions & 0 deletions acknowledger/acknowledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package acknowledger

import (
"github.com/corvus-ch/rabbitmq-cli-consumer/config"
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
)

// Mapping of script exit codes and message acknowledgment.
const (
exitAck = 0
exitReject = 3
exitRejectRequeue = 4
exitNack = 5
exitNackRequeue = 6
)

// Acknowledger does message acknowledgment depending on the scripts exit code.
type Acknowledger interface {
Ack(d delivery.Delivery, code int) error
}

// New creates new Acknowledger using strict or default behaviour.
func New(strict bool, onFailure int) Acknowledger {
if strict {
return &Strict{}
}

return &Default{
OnFailure: onFailure,
}
}

// NewFromConfig creates a new Acknowledger from the given configuration.
func NewFromConfig(cfg *config.Config) Acknowledger {
if cfg.RabbitMq.Stricfailure {
return &Strict{}
}

return &Default{cfg.RabbitMq.Onfailure}
}
30 changes: 30 additions & 0 deletions acknowledger/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package acknowledger

import "github.com/corvus-ch/rabbitmq-cli-consumer/delivery"

// Default is an Acknowledger implementation using a configurable default behaviour for script errors.
type Default struct {
OnFailure int
}

// Ack acknowledges the message on success or negatively acknowledges or rejects the message according to the configured
// on error behaviour.
func (a Default) Ack(d delivery.Delivery, code int) error {
if code == exitAck {
d.Ack()
return nil
}
switch a.OnFailure {
case exitReject:
d.Reject(false)
case exitRejectRequeue:
d.Reject(true)
case exitNack:
d.Nack(false)
case exitNackRequeue:
d.Nack(true)
default:
d.Nack(true)
}
return nil
}
32 changes: 32 additions & 0 deletions acknowledger/strict.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package acknowledger

import (
"fmt"

"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
)

// Strict is an Acknowledger implementation strictly using the scripts exit code.
type Strict struct{}

// Ack acknowledges the message on success or negatively acknowledges or rejects the message according to the scripts
// exit code. It is an error if the script does not exit with one of the predefined exit codes.
func (a Strict) Ack(d delivery.Delivery, code int) error {
switch code {
case exitAck:
d.Ack()
case exitReject:
d.Reject(false)
case exitRejectRequeue:
d.Reject(true)
case exitNack:
d.Nack(false)
case exitNackRequeue:
d.Nack(true)
default:
d.Nack(true)
return fmt.Errorf("unexpected exit code %v", code)
}

return nil
}
19 changes: 10 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (

type Config struct {
RabbitMq struct {
AmqpUrl string
Host string
Username string
Password string
Port string
Vhost string
Queue string
Compression bool
Onfailure int
AmqpUrl string
Host string
Username string
Password string
Port string
Vhost string
Queue string
Compression bool
Onfailure int
Stricfailure bool
}
Prefetch struct {
Count int
Expand Down
84 changes: 0 additions & 84 deletions consumer/acknowledger.go

This file was deleted.

5 changes: 3 additions & 2 deletions consumer/consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/bouk/monkey"
log "github.com/corvus-ch/logr/buffered"
"github.com/corvus-ch/rabbitmq-cli-consumer/acknowledger"
"github.com/corvus-ch/rabbitmq-cli-consumer/command"
"github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestProcessing(t *testing.T) {
body := []byte(test.name)
c := consumer.Consumer{
Builder: b,
Acknowledger: consumer.NewAcknowledger(test.strict, test.onFailure),
Acknowledger: acknowledger.New(test.strict, test.onFailure),
}

b.On("GetCommand", p, di, body).Return(cmd, nil)
Expand Down Expand Up @@ -108,7 +109,7 @@ func TestStrictDefault(t *testing.T) {
body := []byte("strictDefault")
c := consumer.Consumer{
Builder: b,
Acknowledger: &consumer.StrictAcknowledger{},
Acknowledger: &acknowledger.Strict{},
Log: log.New(0),
}

Expand Down
5 changes: 3 additions & 2 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"os"

"github.com/corvus-ch/rabbitmq-cli-consumer/acknowledger"
"github.com/corvus-ch/rabbitmq-cli-consumer/command"
"github.com/corvus-ch/rabbitmq-cli-consumer/config"
"github.com/corvus-ch/rabbitmq-cli-consumer/delivery"
Expand All @@ -15,7 +16,7 @@ import (
type Consumer struct {
Connection Connection
Builder command.Builder
Acknowledger Acknowledger
Acknowledger acknowledger.Acknowledger
Log logr.Logger
}

Expand Down Expand Up @@ -75,7 +76,7 @@ func (c *Consumer) ProcessMessage(d delivery.Delivery) {
}

// New returns a initialized consumer based on config
func New(cfg *config.Config, builder command.Builder, ack Acknowledger, l logr.Logger) (*Consumer, error) {
func New(cfg *config.Config, builder command.Builder, ack acknowledger.Acknowledger, l logr.Logger) (*Consumer, error) {
conn, err := NewConnection(cfg, l)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/codegangsta/cli"
"github.com/corvus-ch/rabbitmq-cli-consumer/acknowledger"
"github.com/corvus-ch/rabbitmq-cli-consumer/command"
"github.com/corvus-ch/rabbitmq-cli-consumer/config"
"github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
Expand Down Expand Up @@ -102,7 +103,7 @@ func Action(c *cli.Context) error {
return fmt.Errorf("failed to create command builder: %v", err)
}

ack := consumer.NewAcknowledger(c.Bool("strict-exit-code"), cfg.RabbitMq.Onfailure)
ack := acknowledger.NewFromConfig(cfg)

client, err := consumer.New(cfg, builder, ack, l)
if err != nil {
Expand Down Expand Up @@ -179,6 +180,10 @@ func LoadConfiguration(c *cli.Context) (*config.Config, error) {
cfg.Logs.Verbose = c.Bool("verbose")
}

if c.IsSet("strict-exit-code") {
cfg.RabbitMq.Stricfailure = c.Bool("strict-exit-code")
}

return cfg, nil
}

Expand Down

0 comments on commit 4f4a5f5

Please sign in to comment.