Skip to content

Commit

Permalink
Merge pull request #68 from foodora/bugfix-remove-messages-from-delet…
Browse files Browse the repository at this point in the history
…e-buffer

bugfix: deadlock; flush delete buffer after stopping the subscriber
  • Loading branch information
Muharem Ismailov committed Jul 31, 2019
2 parents f211fbe + 0df11a3 commit 7d6318b
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 82 deletions.
131 changes: 90 additions & 41 deletions pubsub/awssub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type (
queueURL *string

toDelete chan *deleteRequest
flush chan chan error
// inFlight and stopped are signals to manage delete requests
// at shutdown.
inFlight uint64
Expand Down Expand Up @@ -67,6 +68,8 @@ var (
defaultSQSConsumeBase64 = true
)

var sqsClientFactoryFunc = createSqsClient

func defaultSQSConfig(cfg *SQSConfig) {
if cfg.MaxMessages == 0 {
cfg.MaxMessages = defaultSQSMaxMessages
Expand Down Expand Up @@ -108,25 +111,20 @@ func NewSubscriber(cfg SQSConfig) (pubsub.Subscriber, error) {
var err error

s := &subscriber{
cfg: cfg,
toDelete: make(chan *deleteRequest),
stop: make(chan chan error, 1),
Logger: pubsub.DefaultLogger,
cfg: cfg,
stopped: 1,
Logger: pubsub.DefaultLogger,
}

if (len(cfg.QueueName) == 0) && (len(cfg.QueueURL) == 0) {
return s, errors.New("sqs queue name or url is required")
}

sess, err := session.NewSession()
sqsClient, err := sqsClientFactoryFunc(&cfg)
if err != nil {
return s, err
}

s.sqs = sqs.New(sess, &aws.Config{
Region: cfg.Region,
Endpoint: cfg.Endpoint,
})
s.sqs = sqsClient

if len(cfg.QueueURL) == 0 {
var urlResp *sqs.GetQueueUrlOutput
Expand All @@ -147,6 +145,20 @@ func NewSubscriber(cfg SQSConfig) (pubsub.Subscriber, error) {
return s, nil
}

func createSqsClient(cfg *SQSConfig) (sqsiface.SQSAPI, error) {
sess, err := session.NewSession()
if err != nil {
return nil, err
}

sqsClient := sqs.New(sess, &aws.Config{
Region: cfg.Region,
Endpoint: cfg.Endpoint,
})

return sqsClient, nil
}

// Message will decode message bodies and simply return string message.
func (m *subscriberMessage) String() string {
msgBody := aws.StringValue(m.message.Body)
Expand Down Expand Up @@ -178,12 +190,16 @@ func (m *subscriberMessage) ExtendDoneDeadline(d time.Duration) error {
// message has been deleted.
func (m *subscriberMessage) Done() error {
defer m.sub.decrementInFlight()
batchInput := &sqs.DeleteMessageBatchRequestEntry{
Id: m.message.MessageId,
ReceiptHandle: m.message.ReceiptHandle,
}
if m.sub.isStopped() {
return m.sub.deleteMessageBatch(batchInput)
}
receipt := make(chan error)
m.sub.toDelete <- &deleteRequest{
entry: &sqs.DeleteMessageBatchRequestEntry{
Id: m.message.MessageId,
ReceiptHandle: m.message.ReceiptHandle,
},
entry: batchInput,
receipt: receipt,
}
return <-receipt
Expand All @@ -207,8 +223,19 @@ func (m *subscriberMessage) GetReceiveCount() (int, error) {
// If it encounters any issues, it will populate the Err() error
// and close the returned channel.
func (s *subscriber) Start() <-chan pubsub.Message {
if !s.isStopped() {
s.sqsErr = errors.New("subscriber already is running")
return nil
}
atomic.SwapUint32(&s.stopped, uint32(0))
s.stop = make(chan chan error, 1)
s.flush = make(chan chan error, 1)
s.toDelete = make(chan *deleteRequest)

output := make(chan pubsub.Message)

go s.handleDeletes()

go func() {
defer close(output)
var (
Expand Down Expand Up @@ -249,49 +276,55 @@ func (s *subscriber) Start() <-chan pubsub.Message {
s.Logger.Printf("found %d messages", len(resp.Messages))
// for each message, pass to output
for _, msg := range resp.Messages {
output <- &subscriberMessage{
select {
case exit := <-s.stop:
exit <- nil
return
case output <- &subscriberMessage{
sub: s,
message: msg,
}:
s.incrementInFlight()
}
s.incrementInFlight()
}
}
}()
return output
}

func (s *subscriber) handleDeletes() {
batchInput := &sqs.DeleteMessageBatchInput{
QueueUrl: s.queueURL,
}
var (
err error
entriesBuffer []*sqs.DeleteMessageBatchRequestEntry
delRequest *deleteRequest
)
for delRequest = range s.toDelete {
entriesBuffer = append(entriesBuffer, delRequest.entry)
// if the subscriber is stopped and this is the last request,
// flush quit!
if s.isStopped() && s.inFlightCount() == 1 {
break
var entriesBuffer []*sqs.DeleteMessageBatchRequestEntry
for {
var delRequest *deleteRequest
var err error
select {
case flush := <-s.flush:
if len(entriesBuffer) > 0 {
err = s.deleteMessageBatch(entriesBuffer...)
}
flush <- err
return
case delRequest = <-s.toDelete:
}
entriesBuffer = append(entriesBuffer, delRequest.entry)
// if buffer is full, send the request
if len(entriesBuffer) > *s.cfg.DeleteBufferSize {
batchInput.Entries = entriesBuffer
_, err = s.sqs.DeleteMessageBatch(batchInput)
// clear buffer
err = s.deleteMessageBatch(entriesBuffer...)
entriesBuffer = []*sqs.DeleteMessageBatchRequestEntry{}
}

delRequest.receipt <- err
}
// clear any remainders before shutdown
if len(entriesBuffer) > 0 {
batchInput.Entries = entriesBuffer
_, err = s.sqs.DeleteMessageBatch(batchInput)
delRequest.receipt <- err
}

// deleteMessageBatch is helper function to remove messages from sqs in batches
func (s *subscriber) deleteMessageBatch(batchReq ...*sqs.DeleteMessageBatchRequestEntry) error {
batchInput := &sqs.DeleteMessageBatchInput{
QueueUrl: s.queueURL,
Entries: batchReq,
}
_, err := s.sqs.DeleteMessageBatch(batchInput)

return err
}

func (s *subscriber) isStopped() bool {
Expand All @@ -302,12 +335,28 @@ func (s *subscriber) isStopped() bool {
// messages.
func (s *subscriber) Stop() error {
if s.isStopped() {
return errors.New("sqs subscriber is already stopped")
return errors.New("sqs subscriber is not running")
}
defer func() {
close(s.stop)
close(s.toDelete)
close(s.flush)
}()
exit := make(chan error)
defer close(exit)
// stop subscriber
s.stop <- exit
atomic.SwapUint32(&s.stopped, uint32(1))
return <-exit
err := <-exit
if err != nil {
return err
}
//flush deleted msg buffer
flush := make(chan error)
defer close(flush)
s.flush <- flush

return <-flush
}

// Err will contain any errors that occurred during
Expand Down
Loading

0 comments on commit 7d6318b

Please sign in to comment.