Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: dynamically choose the number of messages for ReceiveBatch #1200

Merged
merged 6 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
102 changes: 85 additions & 17 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ package pubsub // import "gocloud.dev/pubsub"
import (
"context"
"fmt"
"math"
"reflect"
"sync"
"time"

gax "github.com/googleapis/gax-go"
"gocloud.dev/gcerrors"
Expand All @@ -56,9 +58,12 @@ type Message struct {
// Metadata has key/value metadata for the message.
Metadata map[string]string

// processingStartTime is the time that this message was returned
// from Receive, or the zero time if it wasn't.
processingStartTime time.Time

// ack is a closure that queues this message for acknowledgement.
ack func()

// mu guards isAcked in case Ack() is called concurrently.
mu sync.Mutex

Expand Down Expand Up @@ -232,10 +237,47 @@ type Subscription struct {
ackBatcher driver.Batcher
cancel func() // for canceling all SendAcks calls

mu sync.Mutex // protects everything below
q []*Message // local queue of messages downloaded from server
err error // permanent error
waitc chan struct{} // for goroutines waiting on ReceiveBatch
mu sync.Mutex // protects everything below
q []*Message // local queue of messages downloaded from server
err error // permanent error
waitc chan struct{} // for goroutines waiting on ReceiveBatch
avgProcessTime float64 // moving average of the seconds to process a message
}

const (
// The desired duration of a subscription's queue of messages (the messages pulled
// and waiting in memory to be doled out to Receive callers). This is how long
// it would take to drain the queue at the current processing rate.
// The relationship to queue length (number of messages) is
//
// lengthInMessages = desiredQueueDuration / averageProcessTimePerMessage
//
// In other words, if it takes 100ms to process a message on average, and we want
// 2s worth of queued messages, then we need 2/.1 = 20 messages in the queue.
//
// If desiredQueueDuration is too small, then there won't be a large enough buffer
// of messages to handle fluctuations in processing time, and the queue is likely
// to become empty, reducing throughput. If desiredQueueDuration is too large, then
// messages will wait in memory for a long time, possibly timing out (that is,
// their ack deadline will be exceeded). Those messages could have been handled
// by another process receiving from the same subscription.
desiredQueueDuration = 2 * time.Second

// The factor by which old points decay when a new point is added to the moving
// average. The larger this number, the more weight will be given to the newest
// point in preference to older ones.
decay = 0.05
)

// Add message processing time d to the weighted moving average.
func (s *Subscription) addProcessingTime(d time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
if s.avgProcessTime == 0 {
s.avgProcessTime = d.Seconds()
} else {
s.avgProcessTime = s.avgProcessTime*(1-decay) + d.Seconds()*decay
}
}

// Receive receives and returns the next message from the Subscription's queue,
Expand All @@ -259,10 +301,13 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
// At least one message is available. Return it.
m := s.q[0]
s.q = s.q[1:]
m.processingStartTime = time.Now()
// TODO(jba): pre-fetch more messages if the queue gets too small.
return m, nil
}
if s.waitc != nil {
// A call to ReceiveBatch is in flight. Wait for it.
// TODO(jba): support multiple calls in flight simultaneously.
waitc := s.waitc
s.mu.Unlock()
select {
Expand All @@ -276,12 +321,30 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
}
// No messages are available and there are no calls to ReceiveBatch in flight.
// Make a call.
//
// Ask for a number of messages that will give us the desired queue length.
// Unless we don't have information about process time (at the beginning), in
// which case just get one message.
nMessages := 1
vangent marked this conversation as resolved.
Show resolved Hide resolved
if s.avgProcessTime > 0 {
// Using Ceil guarantees at least one message.
n := math.Ceil(desiredQueueDuration.Seconds() / s.avgProcessTime)
// Cap nMessages at some non-ridiculous value.
// Slight hack: we should be using a larger cap, like MaxInt32. But
// that messes up replay: since the tests take very little time to ack,
// n is very large, and since our averaging process is time-sensitive,
// values can differ slightly from run to run. The current cap happens
// to work, but we should come up with a more robust solution.
// (Currently it doesn't matter for performance, because gcppubsub
// caps maxMessages to 1000 anyway.)
nMessages = int(math.Min(n, 1000))
}
s.waitc = make(chan struct{})
s.mu.Unlock()
// Even though the mutex is unlocked, only one goroutine can be here.
// The only way here is if s.waitc was nil. This goroutine just set
// s.waitc to non-nil while holding the lock.
msgs, err := s.getNextBatch(ctx)
msgs, err := s.getNextBatch(ctx, nMessages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is failing:

 drivertest.go:271: pubsub (code=Unknown): replayer: request not found: subscription:"projects/go-cloud-test-216917/subscriptions/TestConformance_TestSendReceiveTwo-subscription-1" max_messages:936

That looks likely to be flaky....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually 9364. The Mac tests run a bit slower. I changed the cap to 1000 for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9364? Why? (why isn't this flaky?) Seems like it's dependent on timing, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got the number 9364 from the actual travis log. I think you dropped a digit when you copied it. The relevant point is that it's > 1000.

It is time-dependent. But with a cap of 1000, it would have to take longer than 1ms between calls to Receive on average, and considering that we do nothing but call Ack, that's extremely unlikely.

I agree, though, that this solution isn't great.

s.mu.Lock()
close(s.waitc)
s.waitc = nil
Expand All @@ -297,14 +360,12 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
}

// getNextBatch gets the next batch of messages from the server and returns it.
func (s *Subscription) getNextBatch(ctx context.Context) ([]*Message, error) {
func (s *Subscription) getNextBatch(ctx context.Context, nMessages int) ([]*Message, error) {
var msgs []*driver.Message
for len(msgs) == 0 {
err := retry.Call(ctx, gax.Backoff{}, s.driver.IsRetryable, func() error {
var err error
// TODO(#691): dynamically adjust maxMessages
const maxMessages = 10
msgs, err = s.driver.ReceiveBatch(ctx, maxMessages)
msgs, err = s.driver.ReceiveBatch(ctx, nMessages)
return err
})
if err != nil {
Expand All @@ -314,15 +375,22 @@ func (s *Subscription) getNextBatch(ctx context.Context) ([]*Message, error) {
var q []*Message
for _, m := range msgs {
id := m.AckID
q = append(q, &Message{
m2 := &Message{
Body: m.Body,
Metadata: m.Metadata,
ack: func() {
// Ignore the error channel. Errors are dealt with
// in the ackBatcher handler.
_ = s.ackBatcher.AddNoWait(id)
},
})
}
m2.ack = func() {
// Note: This call locks s.mu, and m2.mu is locked here as well. Deadlock
// will result if Message.Ack is ever called with s.mu held. That
// currently cannot happen, but we should be careful if/when implementing
// features like auto-ack.
s.addProcessingTime(time.Since(m2.processingStartTime))

// Ignore the error channel. Errors are dealt with
// in the ackBatcher handler.
_ = s.ackBatcher.AddNoWait(id)
}
q = append(q, m2)
}
return q, nil
}
Expand Down