Skip to content

Commit

Permalink
giving config flag a better name
Browse files Browse the repository at this point in the history
making it default to true

fixing tests
  • Loading branch information
jprobinson committed Feb 10, 2016
1 parent 2a003aa commit 48fedd8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 16 deletions.
7 changes: 4 additions & 3 deletions config/aws.go
Expand Up @@ -30,9 +30,10 @@ type (
SleepInterval *time.Duration `envconfig:"AWS_SQS_SLEEP_INTERVAL"`
// DeleteBufferSize will override the DefaultSQSDeleteBufferSize.
DeleteBufferSize *int `envconfig:"AWS_SQS_DELETE_BUFFER_SIZE"`
// ConsumeProtobuf is a flag to signal the subscriber to base64 decode the payload.
// before returning it.
ConsumeProtobuf bool `envconfig:"AWS_SQS_CONSUME_PROTOBUF"`
// ConsumeBase64 is a flag to signal the subscriber to base64 decode the payload
// before returning it. If it is not set in the config, the flag will default
// to 'true'.
ConsumeBase64 *bool `envconfig:"AWS_SQS_CONSUME_BASE64"`
}

// SNS holds the info required to work with Amazon SNS.
Expand Down
25 changes: 16 additions & 9 deletions pubsub/aws.go
Expand Up @@ -86,15 +86,17 @@ var (
// defaultSQSTimeoutSeconds is the default number of seconds the
// SQS client will wait before timing out.
defaultSQSTimeoutSeconds int64 = 2
// SQSSleepInterval is the default time.Duration the
// defaultSQSSleepInterval is the default time.Duration the
// SQSSubscriber will wait if it sees no messages
// on the queue.
defaultSQSSleepInterval = 2 * time.Second

// SQSDeleteBufferSize is the default limit of messages
// defaultSQSDeleteBufferSize is the default limit of messages
// allowed in the delete buffer before
// executing a 'delete batch' request.
defaultSQSDeleteBufferSize = 0

defaultSQSConsumeBase64 = true
)

func defaultSQSConfig(cfg *config.SQS) {
Expand All @@ -113,6 +115,10 @@ func defaultSQSConfig(cfg *config.SQS) {
if cfg.DeleteBufferSize == nil {
cfg.DeleteBufferSize = &defaultSQSDeleteBufferSize
}

if cfg.ConsumeBase64 == nil {
cfg.ConsumeBase64 = &defaultSQSConsumeBase64
}
}

type (
Expand Down Expand Up @@ -187,14 +193,15 @@ func NewSQSSubscriber(cfg *config.SQS) (*SQSSubscriber, error) {
// Message will decode protobufed message bodies and simply return
// a byte slice containing the message body for all others types.
func (m *SQSMessage) Message() []byte {
if m.sub.cfg.ConsumeProtobuf {
msgBody, err := base64.StdEncoding.DecodeString(*m.message.Body)
if err != nil {
Log.Warnf("unable to parse message body: %s", err)
}
return msgBody
if !*m.sub.cfg.ConsumeBase64 {
return []byte(*m.message.Body)
}

msgBody, err := base64.StdEncoding.DecodeString(*m.message.Body)
if err != nil {
Log.Warnf("unable to parse message body: %s", err)
}
return []byte(*m.message.Body)
return msgBody
}

// Done will queue up a message to be deleted. By default,
Expand Down
10 changes: 6 additions & 4 deletions pubsub/awssub_test.go
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/golang/protobuf/proto"
)

func TestSQSSubscriber(t *testing.T) {
func TestSQSSubscriberNoBase64(t *testing.T) {
test1 := "hey hey hey!"
test2 := "ho ho ho!"
test3 := "yessir!"
Expand Down Expand Up @@ -42,7 +42,8 @@ func TestSQSSubscriber(t *testing.T) {
},
}

cfg := &config.SQS{}
fals := false
cfg := &config.SQS{ConsumeBase64: &fals}
defaultSQSConfig(cfg)
sub := &SQSSubscriber{
sqs: sqstest,
Expand Down Expand Up @@ -79,7 +80,7 @@ func verifySQSSub(t *testing.T, queue <-chan SubscriberMessage, testsqs *TestSQS
}
}

func TestSQSSubscriberProto(t *testing.T) {
func TestSQSSubscriber(t *testing.T) {
test1 := &TestProto{"hey hey hey!"}
test2 := &TestProto{"ho ho ho!"}
test3 := &TestProto{"yessir!"}
Expand Down Expand Up @@ -108,7 +109,8 @@ func TestSQSSubscriberProto(t *testing.T) {
},
},
}
cfg := &config.SQS{ConsumeProtobuf: true}

cfg := &config.SQS{}
defaultSQSConfig(cfg)
sub := &SQSSubscriber{
sqs: sqstest,
Expand Down

0 comments on commit 48fedd8

Please sign in to comment.