Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: dynamically choose the number of messages for ReceiveBatch #1200

Merged
merged 6 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 80 additions & 0 deletions pubsub/averager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2019 The Go Cloud Development Kit Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub

import (
"sync"
"time"
)

// An averager keeps track of an average value over a time interval.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I didn't know about it. (How did you find it?) It looks much more sophisticated than what I have, which is nice. But it says

Current implementations assume an implicit time interval of 1.0 between every sample added. That is, the passage of time is treated as though it's the same as the arrival of samples. If you need time-based decay when samples are not arriving precisely at set intervals, then this package will not support your needs at present.

which means we can't use it.

But you raise the question whether time-based decay is what we want. I thought about implementing it, but decided the complexity wasn't worth it. I'm not sure it's going to matter much whether the contribution of 1-minute-old points is reduced or not. It's more important that their contribution eventually goes to zero, which my implementation achieves rather abruptly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which means we can't use it.

Is that true? I am convinced by your argument about using the predicted time-to-process of the queue to determine the batch size, but I'm not sure that that implies that we need to use time-based bucketing for the estimate of how long each message takes to process.

I'm not sure that that library will do better than what you have here, but it's worth thinking about. Are there scenarios where exponential decay will do the wrong thing? (probably).

What about an app that gets bursts of message periodically (e.g., every 15m it gets 1000 messages). IIUC, your impl will start at a constant (currently 1) for each burst and ramp up (because after 1m it forgets all about previous history). The decaying moving average wouldn't forget anything during the idle 15m. Which is better? (not obvious, TBH).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. It seemed obvious to me that we wanted the number to decay in proportion to elapsed time, but you're right, there's no strong reason for that (aside from a hand-wavy argument about things generally having some time locality).

ewma will forget in proportion to the number of messages processed, rather than time. Maybe that's fine. If it took you 100ms to process a message three days ago, why wouldn't it take the same time now? On the other hand, system behavior tends to be spiky. You want to ride the spikes when they happen, then quickly forget about them. But I'm really just speculating here.

I'm fine doing any of the following:

  1. Keeping my code.
  2. Switching to ewma
  3. Doing a better job of time-based moving average. I think doing it exactly requires saving every point, but we could combine bucketing with a decay factor.

In any case, we should add this discussion to the issue, or a new issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss at stand-up tomorrow.

// It does so by tracking the average over several sub-intervals.
type averager struct {
mu sync.Mutex
buckets []bucket // always nBuckets of these, last is most recent
bucketInterval time.Duration
end time.Time // latest time we can currently record (end of the last bucket's interval)
}

type bucket struct {
total float64 // total of points in the bucket
count float64 // number of points in the bucket
}

// newAverager returns an averager that will average a value over dur,
// by splitting it into nBuckets sub-intervals.
func newAverager(dur time.Duration, nBuckets int) *averager {
bi := dur / time.Duration(nBuckets)
return &averager{
buckets: make([]bucket, nBuckets),
bucketInterval: bi,
end: time.Now().Add(bi),
}
}

// average returns the average value.
// It returns NaN if there are no recorded points.
func (a *averager) average() float64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider making this Average since it's an "external" function? (even though the whole struct isn't). Not sure what typical Go style for this is, but I think it can be useful to be clear about what the expected API of internal objects is even if it's not enforced.

This code could also be moved into the internal/batcher package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the whole thing.

a.mu.Lock()
defer a.mu.Unlock()
var total, n float64
for _, b := range a.buckets {
total += b.total
n += b.count
}
return total / n
}

// add adds a point to the average at the present time.
func (a *averager) add(x float64) { a.addInternal(x, time.Now()) }

// addInternal adds a point x to the average, at time t.
// t should be time.Now() except during testing.
func (a *averager) addInternal(x float64, t time.Time) {
a.mu.Lock()
defer a.mu.Unlock()
// Add new buckets and discard old ones until we can accommodate time t.
for t.After(a.end) {
a.buckets = append(a.buckets[1:], bucket{})
a.end = a.end.Add(a.bucketInterval)
}
// We only support adding to the most recent bucket.
if t.Before(a.end.Add(-a.bucketInterval)) {
panic("time too early")
}
b := &a.buckets[len(a.buckets)-1]
b.total += x
b.count++
}
57 changes: 57 additions & 0 deletions pubsub/averager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2019 The Go Cloud Development Kit Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub

import (
"testing"
"time"
)

func TestAverager(t *testing.T) {
a := newAverager(time.Minute, 4)
start := time.Now()
for i := 0; i < 10; i++ {
a.addInternal(5, start)
}
if got, want := a.average(), 5.0; got != want {
t.Errorf("got %f, want %f", got, want)
}

a = newAverager(time.Minute, 4)
start = time.Now()
n := 60
var total float64
for i := 0; i < n; i++ {
total += float64(i)
a.addInternal(float64(i), start.Add(time.Duration(i)*time.Second))
}
// All the points are within one minute of each other, so they should all be counted.
got := a.average()
want := total / float64(n)
if got != want {
t.Errorf("got %f, want %f", got, want)
}

// Values older than the duration are dropped.
a = newAverager(time.Minute, 4)
a.add(10)
if got, want := a.average(), 10.0; got != want {
t.Errorf("got %f, want %f", got, want)
}
a.addInternal(3, a.end.Add(2*time.Minute))
if got, want := a.average(), 3.0; got != want {
t.Errorf("got %f, want %f", got, want)
}
}
Binary file not shown.
Binary file not shown.
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay
Binary file not shown.
Binary file not shown.
71 changes: 60 additions & 11 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ package pubsub // import "gocloud.dev/pubsub"
import (
"context"
"fmt"
"math"
"reflect"
"sync"
"time"

gax "github.com/googleapis/gax-go"
"gocloud.dev/gcerrors"
Expand Down Expand Up @@ -211,12 +213,33 @@ type Subscription struct {
ackBatcher driver.Batcher
cancel func() // for canceling all SendAcks calls

mu sync.Mutex // protects everything below
q []*Message // local queue of messages downloaded from server
err error // permanent error
waitc chan struct{} // for goroutines waiting on ReceiveBatch
mu sync.Mutex // protects everything below
q []*Message // local queue of messages downloaded from server
err error // permanent error
waitc chan struct{} // for goroutines waiting on ReceiveBatch
lastReceiveReturn time.Time // time that the last call to Receive returned
processTimeAverager *averager // keeps a running average of the seconds to process a message
}

const (
// The desired length of a subscription's queue of messages (the messages pulled
// and waiting in memory to be doled out to Receive callers). The length is
// expressed in time; the relationship to number of messages is
//
// lengthInMessages = desiredQueueLength / averageProcessTimePerMessage
//
// In other words, if it takes 100ms to process a message on average, and we want
// 2s worth of queued messages, then we need 2/.1 = 20 messages.
//
// If desiredQueueLength is too small, then there won't be a large enough buffer
// of messages to handle fluctuations in processing time, and the queue is likely
// to become empty, reducing throughput. If desiredQueueLength is too large, then
// messages will wait in memory for a long time, possibly timing out (that is,
// their ack deadline will be exceeded). Those messages could have been handled
// by another process receiving from the same subscription.
desiredQueueLength = 2 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue lengths are not defined in units of time. They are counts, so this should have a different name. Would desiredTimeInQueuePerMessage be more accurate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you okay with desiredQueueDuration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, as long as the comment explains what it means. I think it's how long we want a message to spend in the queue, although I find this idea a bit strange since I don't have a preference about how long messages stay in the queue so long as it's a lot less than the ack deadline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although I find this idea a bit strange

Exactly, that's why I don't think that's the right way to think about it. It's more like, we want to keep a few messages around in case Receive speeds up and starts chewing through them faster than it has been. In other words, a buffer. How many messages do we want to keep in the buffer? No, that's the wrong question: how much runway (in time) do we want before we run out of messages?

Yes, if Receive always takes the same time, then a message will spend all that time in the queue. But that's a side effect, the price we pay for having messages available on demand to improve throughput.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so to write it in Javanese, it would be more like desiredAmountOfTimeBeforeQueueProbablyRunsOut. Makes sense.

)

// Receive receives and returns the next message from the Subscription's queue,
// blocking and polling if none are available. This method can be called
// concurrently from multiple goroutines. The Ack() method of the returned
Expand All @@ -228,6 +251,12 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {

s.mu.Lock()
defer s.mu.Unlock()
defer func() { s.lastReceiveReturn = time.Now() }()

if !s.lastReceiveReturn.IsZero() {
s.processTimeAverager.add(float64(time.Since(s.lastReceiveReturn).Seconds()))
}

for {
// The lock is always held here, at the top of the loop.
if s.err != nil {
Expand All @@ -238,10 +267,12 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
// At least one message is available. Return it.
m := s.q[0]
s.q = s.q[1:]
// TODO(jba): pre-fetch more messages if the queue gets too small.
return m, nil
}
if s.waitc != nil {
// A call to ReceiveBatch is in flight. Wait for it.
// TODO(jba): support multiple calls in flight simultaneously.
waitc := s.waitc
s.mu.Unlock()
select {
Expand All @@ -255,12 +286,31 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
}
// No messages are available and there are no calls to ReceiveBatch in flight.
// Make a call.
//
// Ask for a number of messages that will give us the desired queue length.
// Unless we don't have information about process time (at the beginning), in
// which case just get one message.
nMessages := 1
vangent marked this conversation as resolved.
Show resolved Hide resolved
avgProcessTime := s.processTimeAverager.average()
if !math.IsNaN(avgProcessTime) {
// Using Ceil guarantees at least one message.
n := math.Ceil(desiredQueueLength.Seconds() / avgProcessTime)
// Cap nMessages at some non-ridiculous value.
// Slight hack: we should be using a larger cap, like MaxInt32. But
// that messes up replay: since the tests take very little time to ack,
// n is very large, and since our averaging process is time-sensitive,
// values can differ slightly from run to run. The current cap happens
// to work, but we should come up with a more robust solution.
// (Currently it doesn't matter for performance, because gcppubsub
// caps maxMessages to 1000 anyway.)
nMessages = int(math.Min(n, 1000))
}
s.waitc = make(chan struct{})
s.mu.Unlock()
// Even though the mutex is unlocked, only one goroutine can be here.
// The only way here is if s.waitc was nil. This goroutine just set
// s.waitc to non-nil while holding the lock.
msgs, err := s.getNextBatch(ctx)
msgs, err := s.getNextBatch(ctx, nMessages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is failing:

 drivertest.go:271: pubsub (code=Unknown): replayer: request not found: subscription:"projects/go-cloud-test-216917/subscriptions/TestConformance_TestSendReceiveTwo-subscription-1" max_messages:936

That looks likely to be flaky....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually 9364. The Mac tests run a bit slower. I changed the cap to 1000 for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9364? Why? (why isn't this flaky?) Seems like it's dependent on timing, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got the number 9364 from the actual travis log. I think you dropped a digit when you copied it. The relevant point is that it's > 1000.

It is time-dependent. But with a cap of 1000, it would have to take longer than 1ms between calls to Receive on average, and considering that we do nothing but call Ack, that's extremely unlikely.

I agree, though, that this solution isn't great.

s.mu.Lock()
close(s.waitc)
s.waitc = nil
Expand All @@ -276,14 +326,12 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
}

// getNextBatch gets the next batch of messages from the server and returns it.
func (s *Subscription) getNextBatch(ctx context.Context) ([]*Message, error) {
func (s *Subscription) getNextBatch(ctx context.Context, nMessages int) ([]*Message, error) {
var msgs []*driver.Message
for len(msgs) == 0 {
err := retry.Call(ctx, gax.Backoff{}, s.driver.IsRetryable, func() error {
var err error
// TODO(#691): dynamically adjust maxMessages
const maxMessages = 10
msgs, err = s.driver.ReceiveBatch(ctx, maxMessages)
msgs, err = s.driver.ReceiveBatch(ctx, nMessages)
return err
})
if err != nil {
Expand Down Expand Up @@ -342,8 +390,9 @@ var NewSubscription = newSubscription
func newSubscription(d driver.Subscription, newAckBatcher func(context.Context, *Subscription) driver.Batcher) *Subscription {
ctx, cancel := context.WithCancel(context.Background())
s := &Subscription{
driver: d,
cancel: cancel,
driver: d,
cancel: cancel,
processTimeAverager: newAverager(time.Minute, 4),
}
if newAckBatcher == nil {
newAckBatcher = defaultAckBatcher
Expand Down