Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

awssub: remove message from queue in non concurrent way #88

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 23 additions & 130 deletions pubsub/awssub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package awssub
import (
"errors"
"fmt"
"strconv"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/foodora/go-ranger/pubsub"
"strconv"
"sync/atomic"
"time"
)

type (
Expand All @@ -22,14 +23,9 @@ type (
cfg SQSConfig
queueURL *string

toDelete chan *deleteRequest
flush chan chan error
// inFlight and stopped are signals to manage delete requests
// at shutdown.
inFlight uint64
stopped uint32
stopped uint32

stop chan chan error
stop chan struct{}
sqsErr error

Logger pubsub.Logger
Expand All @@ -43,11 +39,6 @@ type (
sub *subscriber
message *sqs.Message
}

deleteRequest struct {
entry *sqs.DeleteMessageBatchRequestEntry
receipt chan error
}
)

var (
Expand All @@ -62,13 +53,6 @@ var (
// subscriber will wait if it sees no messages
// on the queue.
defaultSQSSleepInterval = 2 * time.Second

// defaultSQSDeleteBufferSize is the default limit of messages
// allowed in the delete buffer before
// executing a 'delete batch' request.
defaultSQSDeleteBufferSize = 0

defaultSQSConsumeBase64 = true
)

var sqsClientFactoryFunc = createSqsClient
Expand All @@ -85,26 +69,6 @@ func defaultSQSConfig(cfg *SQSConfig) {
if cfg.SleepInterval == 0 {
cfg.SleepInterval = defaultSQSSleepInterval
}

if cfg.DeleteBufferSize == nil {
cfg.DeleteBufferSize = &defaultSQSDeleteBufferSize
}
}

// incrementInflight will increment the add in flight count.
func (s *subscriber) incrementInFlight() {
atomic.AddUint64(&s.inFlight, 1)
}

// removeInfFlight will decrement the in flight count.
func (s *subscriber) decrementInFlight() {
atomic.AddUint64(&s.inFlight, ^uint64(0))
}

// inFlightCount returns the number of in-flight requests currently
// running on this server.
func (s *subscriber) inFlightCount() uint64 {
return atomic.LoadUint64(&s.inFlight)
}

// NewSubscriber will initiate a new Decrypter for the subscriber
Expand Down Expand Up @@ -153,13 +117,7 @@ func createSqsClient(cfg *SQSConfig) (sqsiface.SQSAPI, error) {
if err != nil {
return nil, err
}

sqsClient := sqs.New(sess, &aws.Config{
Region: cfg.Region,
Endpoint: cfg.Endpoint,
})

return sqsClient, nil
return sqs.New(sess, &cfg.Config), nil
}

// Message will decode message bodies and simply return string message.
Expand Down Expand Up @@ -188,24 +146,14 @@ func (m *subscriberMessage) ExtendDoneDeadline(d time.Duration) error {
return err
}

// Done will queue up a message to be deleted. By default,
// the `SQSDeleteBufferSize` will be 0, so this will block until the
// message has been deleted.
// Done removes a message from a queue
func (m *subscriberMessage) Done() error {
defer m.sub.decrementInFlight()
batchInput := &sqs.DeleteMessageBatchRequestEntry{
Id: m.message.MessageId,
deleteReq := &sqs.DeleteMessageInput{
QueueUrl: m.sub.queueURL,
ReceiptHandle: m.message.ReceiptHandle,
}
if m.sub.isStopped() {
return m.sub.deleteMessageBatch(batchInput)
}
receipt := make(chan error)
m.sub.toDelete <- &deleteRequest{
entry: batchInput,
receipt: receipt,
}
return <-receipt
_, err := m.sub.sqs.DeleteMessage(deleteReq)
return err
}

// Returns the number of times a message has been received from the queue but not deleted.
Expand All @@ -227,18 +175,15 @@ func (m *subscriberMessage) GetReceiveCount() (int, error) {
// and close the returned channel.
func (s *subscriber) Start() <-chan pubsub.Message {
if !s.isStopped() {
s.sqsErr = errors.New("subscriber already is running")
err := errors.New("subscriber already is running")
s.sqsErr = err
s.onErrorFunc(err)
return nil
}
atomic.SwapUint32(&s.stopped, uint32(0))
s.stop = make(chan chan error, 1)
s.flush = make(chan chan error, 1)
s.toDelete = make(chan *deleteRequest)

s.stop = make(chan struct{})
output := make(chan pubsub.Message)

go s.handleDeletes()

go func() {
defer close(output)
var (
Expand All @@ -247,8 +192,7 @@ func (s *subscriber) Start() <-chan pubsub.Message {
)
for {
select {
case exit := <-s.stop:
exit <- nil
case <-s.stop:
return
default:
}
Expand Down Expand Up @@ -281,14 +225,13 @@ func (s *subscriber) Start() <-chan pubsub.Message {
// for each message, pass to output
for _, msg := range resp.Messages {
select {
case exit := <-s.stop:
exit <- nil
case <-s.stop:
return
case output <- &subscriberMessage{
sub: s,
message: msg,
}:
s.incrementInFlight()
continue
}
}
}
Expand All @@ -301,41 +244,6 @@ func (s *subscriber) SetOnErrorFunc(fn func(error)) {
s.onErrorFunc = fn
}

func (s *subscriber) handleDeletes() {
var entriesBuffer []*sqs.DeleteMessageBatchRequestEntry
for {
var delRequest *deleteRequest
var err error
select {
case flush := <-s.flush:
if len(entriesBuffer) > 0 {
err = s.deleteMessageBatch(entriesBuffer...)
}
flush <- err
return
case delRequest = <-s.toDelete:
}
entriesBuffer = append(entriesBuffer, delRequest.entry)
// if buffer is full, send the request
if len(entriesBuffer) > *s.cfg.DeleteBufferSize {
err = s.deleteMessageBatch(entriesBuffer...)
entriesBuffer = []*sqs.DeleteMessageBatchRequestEntry{}
}
delRequest.receipt <- err
}
}

// deleteMessageBatch is helper function to remove messages from sqs in batches
func (s *subscriber) deleteMessageBatch(batchReq ...*sqs.DeleteMessageBatchRequestEntry) error {
batchInput := &sqs.DeleteMessageBatchInput{
QueueUrl: s.queueURL,
Entries: batchReq,
}
_, err := s.sqs.DeleteMessageBatch(batchInput)

return err
}

func (s *subscriber) isStopped() bool {
return atomic.LoadUint32(&s.stopped) == 1
}
Expand All @@ -346,26 +254,11 @@ func (s *subscriber) Stop() error {
if s.isStopped() {
return errors.New("sqs subscriber is not running")
}
defer func() {
close(s.stop)
close(s.toDelete)
close(s.flush)
}()
exit := make(chan error)
defer close(exit)
// stop subscriber
s.stop <- exit
atomic.SwapUint32(&s.stopped, uint32(1))
err := <-exit
if err != nil {
return err
}
//flush deleted msg buffer
flush := make(chan error)
defer close(flush)
s.flush <- flush

return <-flush
// stop subscriber
s.stop <- struct{}{}
close(s.stop)
return nil
}

// Err will contain any errors that occurred during
Expand Down
Loading