-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
strawman to show how we might do concurrency #9
Conversation
@serbrech is probably on a plane right now but should be intereste in this since I asssume his mental model was the same as mine that each message got its own go routine. Otherwise retry later holding the message makes no sense. |
listener/listener.go
Outdated
return nil | ||
} | ||
if l.concurrency == 0 { | ||
l.concurrency = 5 //what's the right default? 1? something higher? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this actually needs to be pretty high or retry later will still sink us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove any max concurrancy here and allways spin up as many go functions as we have messages. That is closer to what charlie was considering.
integration/pubsub_test.go
Outdated
events := []testEvent{} | ||
for i := 0; i < numOfEvents; i++ { | ||
j := i + 1 | ||
event := testEvent{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could just be anonymous empty struct we're not actually using the data here.
|
||
assert.Equal(suite.T(), msgstatus, expected) | ||
|
||
//time.Sleep(20 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is added to not close listener before it prints out the msgs, I think it's not need anymore as for the loop above blocks until we finish the checks.
@@ -114,6 +117,30 @@ func (suite *serviceBusSuite) TestPublishAndListenRetryLater() { | |||
}, event) | |||
} | |||
|
|||
// Note that, this test need manually verify, you should be able to see 3 msgs are receievd in a row from the test log |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's awesome to have the order verification, the comment here is outdated :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question, why do we have order verification?
servicebus can't guarantee order by design. especially with concurrent workers pulling from the subscription?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really we wanted to show that we recieved messages before we completed others (actually proved we were parallel) Showing the ordering was mostly for debugging the test could just be that we got a certain number of recieves before completes.
} | ||
var msgstatus []string | ||
expected := []string{ | ||
"recieved1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think 123 are actually gartuneed in that order.
if l.concurrency == 0 { | ||
l.concurrency = 1 //RetryLater won't work with this. Come back and change after talking to strebec? | ||
} | ||
prefetch := servicebus.SubscriptionWithPrefetchCount(uint32(l.concurrency)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think prefetch is strictly like to concurrency.
prefetch will make the request fetch N messages at once. that's much better obviously.
but since the messages can be processed asynchronously from the fetching, it's not a requirement.
- fetch message1
- start process message 1
- fetch message2
- start process message 2
- done process message 2
- done process message 1
the above is concurrent processing with a prefetch of 1.
- fetch message1 and message2
- start process message 1
- start process message 2
- done process message 2
- done process message 1
The above is concurrent processing with prefetch of 2 (assuming the messages are present in the subscription when the receiver starts)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree it's not strictly tied though probably a good default.
Not sure this PR is useful to autoupgrader or replacing the rp without lock auto-renewal. Take a look at message pump here to see concurrency and auto renewal. I kind of want to get that up to the servicebus sdk as an optional handler any consumer can use.
No description provided.