diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay index aab2165097..d3e9a060ba 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay differ diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay index 2951187c05..7bf9d3dcd7 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay differ diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay index 9eb845832e..f480a0e5cc 100644 Binary files a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay and b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay differ diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 9e4146ed62..9ddbfd6309 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -36,8 +36,10 @@ package pubsub // import "gocloud.dev/pubsub" import ( "context" "fmt" + "math" "reflect" "sync" + "time" gax "github.com/googleapis/gax-go" "gocloud.dev/gcerrors" @@ -56,9 +58,12 @@ type Message struct { // Metadata has key/value metadata for the message. Metadata map[string]string + // processingStartTime is the time that this message was returned + // from Receive, or the zero time if it wasn't. + processingStartTime time.Time + // ack is a closure that queues this message for acknowledgement. ack func() - // mu guards isAcked in case Ack() is called concurrently. mu sync.Mutex @@ -232,10 +237,47 @@ type Subscription struct { ackBatcher driver.Batcher cancel func() // for canceling all SendAcks calls - mu sync.Mutex // protects everything below - q []*Message // local queue of messages downloaded from server - err error // permanent error - waitc chan struct{} // for goroutines waiting on ReceiveBatch + mu sync.Mutex // protects everything below + q []*Message // local queue of messages downloaded from server + err error // permanent error + waitc chan struct{} // for goroutines waiting on ReceiveBatch + avgProcessTime float64 // moving average of the seconds to process a message +} + +const ( + // The desired duration of a subscription's queue of messages (the messages pulled + // and waiting in memory to be doled out to Receive callers). This is how long + // it would take to drain the queue at the current processing rate. + // The relationship to queue length (number of messages) is + // + // lengthInMessages = desiredQueueDuration / averageProcessTimePerMessage + // + // In other words, if it takes 100ms to process a message on average, and we want + // 2s worth of queued messages, then we need 2/.1 = 20 messages in the queue. + // + // If desiredQueueDuration is too small, then there won't be a large enough buffer + // of messages to handle fluctuations in processing time, and the queue is likely + // to become empty, reducing throughput. If desiredQueueDuration is too large, then + // messages will wait in memory for a long time, possibly timing out (that is, + // their ack deadline will be exceeded). Those messages could have been handled + // by another process receiving from the same subscription. + desiredQueueDuration = 2 * time.Second + + // The factor by which old points decay when a new point is added to the moving + // average. The larger this number, the more weight will be given to the newest + // point in preference to older ones. + decay = 0.05 +) + +// Add message processing time d to the weighted moving average. +func (s *Subscription) addProcessingTime(d time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + if s.avgProcessTime == 0 { + s.avgProcessTime = d.Seconds() + } else { + s.avgProcessTime = s.avgProcessTime*(1-decay) + d.Seconds()*decay + } } // Receive receives and returns the next message from the Subscription's queue, @@ -259,10 +301,13 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { // At least one message is available. Return it. m := s.q[0] s.q = s.q[1:] + m.processingStartTime = time.Now() + // TODO(jba): pre-fetch more messages if the queue gets too small. return m, nil } if s.waitc != nil { // A call to ReceiveBatch is in flight. Wait for it. + // TODO(jba): support multiple calls in flight simultaneously. waitc := s.waitc s.mu.Unlock() select { @@ -276,12 +321,30 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { } // No messages are available and there are no calls to ReceiveBatch in flight. // Make a call. + // + // Ask for a number of messages that will give us the desired queue length. + // Unless we don't have information about process time (at the beginning), in + // which case just get one message. + nMessages := 1 + if s.avgProcessTime > 0 { + // Using Ceil guarantees at least one message. + n := math.Ceil(desiredQueueDuration.Seconds() / s.avgProcessTime) + // Cap nMessages at some non-ridiculous value. + // Slight hack: we should be using a larger cap, like MaxInt32. But + // that messes up replay: since the tests take very little time to ack, + // n is very large, and since our averaging process is time-sensitive, + // values can differ slightly from run to run. The current cap happens + // to work, but we should come up with a more robust solution. + // (Currently it doesn't matter for performance, because gcppubsub + // caps maxMessages to 1000 anyway.) + nMessages = int(math.Min(n, 1000)) + } s.waitc = make(chan struct{}) s.mu.Unlock() // Even though the mutex is unlocked, only one goroutine can be here. // The only way here is if s.waitc was nil. This goroutine just set // s.waitc to non-nil while holding the lock. - msgs, err := s.getNextBatch(ctx) + msgs, err := s.getNextBatch(ctx, nMessages) s.mu.Lock() close(s.waitc) s.waitc = nil @@ -297,14 +360,12 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { } // getNextBatch gets the next batch of messages from the server and returns it. -func (s *Subscription) getNextBatch(ctx context.Context) ([]*Message, error) { +func (s *Subscription) getNextBatch(ctx context.Context, nMessages int) ([]*Message, error) { var msgs []*driver.Message for len(msgs) == 0 { err := retry.Call(ctx, gax.Backoff{}, s.driver.IsRetryable, func() error { var err error - // TODO(#691): dynamically adjust maxMessages - const maxMessages = 10 - msgs, err = s.driver.ReceiveBatch(ctx, maxMessages) + msgs, err = s.driver.ReceiveBatch(ctx, nMessages) return err }) if err != nil { @@ -314,15 +375,22 @@ func (s *Subscription) getNextBatch(ctx context.Context) ([]*Message, error) { var q []*Message for _, m := range msgs { id := m.AckID - q = append(q, &Message{ + m2 := &Message{ Body: m.Body, Metadata: m.Metadata, - ack: func() { - // Ignore the error channel. Errors are dealt with - // in the ackBatcher handler. - _ = s.ackBatcher.AddNoWait(id) - }, - }) + } + m2.ack = func() { + // Note: This call locks s.mu, and m2.mu is locked here as well. Deadlock + // will result if Message.Ack is ever called with s.mu held. That + // currently cannot happen, but we should be careful if/when implementing + // features like auto-ack. + s.addProcessingTime(time.Since(m2.processingStartTime)) + + // Ignore the error channel. Errors are dealt with + // in the ackBatcher handler. + _ = s.ackBatcher.AddNoWait(id) + } + q = append(q, m2) } return q, nil }