-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bugfix: deadlock; flush delete buffer after stopping the subscriber #68
Conversation
@muharem What happens if multiple messages are received but not consumed? |
@jsafoodpanda Yes, only about those 5, which were explicitly marked as Now the subscriber just stops without flushing the delete buffer. |
LGTM, can you also provide unit test for this? |
@wilsont done |
@@ -65,6 +66,8 @@ var ( | |||
defaultSQSConsumeBase64 = true | |||
) | |||
|
|||
var sqsClientFactoryFunc = createSqsClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
helps to mock sqs client and use an actual constructor to create the subscriber for unittest
ReceiptHandle: m.message.ReceiptHandle, | ||
} | ||
if m.sub.isStopped() { | ||
return m.sub.deleteMessageBatch(batchInput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the subscriber is stopped there is no goroutine to remove a message, remove it inside of the same gogoutine
atomic.SwapUint32(&s.stopped, uint32(0)) | ||
s.stop = make(chan chan error, 1) | ||
s.flush = make(chan chan error, 1) | ||
s.toDelete = make(chan *deleteRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initializing all channels inside of Start func for a multiple run via one subscriber instance
@@ -224,49 +251,54 @@ func (s *subscriber) Start() <-chan pubsub.Message { | |||
s.Logger.Printf("found %d messages", len(resp.Messages)) | |||
// for each message, pass to output | |||
for _, msg := range resp.Messages { | |||
output <- &subscriberMessage{ | |||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid deadlock when a consumer stop the subscriber and does not reading from output channel anymore
var delRequest *deleteRequest | ||
var err error | ||
select { | ||
case flush := <-s.flush: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush and shutdown the goroutine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the explicit test cases for:
- Deadlock scenario you fixed
- Test assurance for unprocessed message can be recovered from the queue if the subscriber is stopped in between
@@ -168,12 +180,16 @@ func (m *subscriberMessage) ExtendDoneDeadline(d time.Duration) error { | |||
// message has been deleted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this description still holds good ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The information that a message will be removed synchronically in case when the subscriber is stopped
could be add but in my opinion for the client it does not make any sense.
The doc should say that message will be deleted be this function.
And SQSDeleteBufferSize
configuration's doc should say to a client about delete buffer.
_, err = s.sqs.DeleteMessageBatch(batchInput) | ||
delRequest.receipt <- err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please add suitable comments describing what this method is doing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
msq1.Done() | ||
assert.True(t, len(sqstest.Deleted) == 0, "Message unexpectedly was removed from the delete buffer") | ||
|
||
sub.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this scenario mimic the case when you have three messages to process and you only processed one and stopped the subscriber and then you are flushing all the remaining messages without processing them. What happens when you restart the subscriber ? Will you get the remaining two messages ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the tests which also confirms that you will not lose any unprocessed messages from queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when you restart the subscriber ?
Is there a use case for this? The app should not restart consumption after stoping as the only stop scenario is shutting down.
What do you mean by not lose any unprocessed messages
? If they are not processed, they should remain in the SQS and not deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when you restart the subscriber ?
Is there a use case for this? The app should not restart consumption after stoping as the only stop scenario is shutting down.
Sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by
not lose any unprocessed messages
? If they are not processed, they should remain in the SQS and not deleted.
Please ignore this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you get the remaining two messages ?
Yes
you are flushing all the remaining messages without processing them.
We are flushing only those which were marked as Done by Done
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case tests the deadlock
The subscriber gets 3 messages by one batch but the consumer reads from the channel only one and stop.
You can use this test case for the current version and see the deadlock
Already there
I do not get.
|
It means the subscriber starts and stopped during whatever process you were using subscriber. |
Fair enough |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @muharem confirmed that this PR is only for removing the already processed and marked done messages from the SQS and required tests cases are there approved from my side.
few fixes related to aws subscriber: