Skip to content

Commit

Permalink
feat(pubsub): add min extension period (#6041)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed May 31, 2022
1 parent bd1e4cc commit f2407c7
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 30 deletions.
78 changes: 62 additions & 16 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ import (
// of the actual deadline.
const gracePeriod = 5 * time.Second

// These are vars so tests can change them.
var (
maxDurationPerLeaseExtension = 10 * time.Minute
minDurationPerLeaseExtension = 10 * time.Second
minDurationPerLeaseExtensionExactlyOnce = 1 * time.Minute
)

type messageIterator struct {
ctx context.Context
cancel func() // the function that will cancel ctx; called in stop
Expand All @@ -46,7 +53,7 @@ type messageIterator struct {
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks (more frequent than acks)
nackTicker *time.Ticker // message nacks
pingTicker *time.Ticker // sends to the stream to keep it open
failed chan struct{} // closed on stream error
drained chan struct{} // closed when stopped && no more pending messages
Expand All @@ -61,11 +68,12 @@ type messageIterator struct {
// message arrives, we'll record now+MaxExtension in this table; whenever we have a chance
// to update ack deadlines (via modack), we'll consult this table and only include IDs
// that are not beyond their deadline.
keepAliveDeadlines map[string]time.Time
pendingAcks map[string]bool
pendingNacks map[string]bool
pendingModAcks map[string]bool // ack IDs whose ack deadline is to be modified
err error // error from stream failure
keepAliveDeadlines map[string]time.Time
pendingAcks map[string]bool
pendingNacks map[string]bool
pendingModAcks map[string]bool // ack IDs whose ack deadline is to be modified
err error // error from stream failure
enableExactlyOnceDelivery bool
}

// newMessageIterator starts and returns a new messageIterator.
Expand All @@ -85,7 +93,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
}
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
// the first keepAlive halfway towards the minimum ack deadline.
keepAlivePeriod := minAckDeadline / 2
keepAlivePeriod := minDurationPerLeaseExtension / 2

// Ack promptly so users don't lose work if client crashes.
ackTicker := time.NewTicker(100 * time.Millisecond)
Expand All @@ -106,7 +114,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
pingTicker: pingTicker,
failed: make(chan struct{}),
drained: make(chan struct{}),
ackTimeDist: distribution.New(int(maxAckDeadline/time.Second) + 1),
ackTimeDist: distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1),
keepAliveDeadlines: map[string]time.Time{},
pendingAcks: map[string]bool{},
pendingNacks: map[string]bool{},
Expand Down Expand Up @@ -148,9 +156,19 @@ func (it *messageIterator) checkDrained() {
}
}

// Given a receiveTime, add the elapsed time to the iterator's ack distribution.
// These values are bounded by the ModifyAckDeadline limits, which are
// min/maxDurationPerLeaseExtension.
func (it *messageIterator) addToDistribution(receiveTime time.Time) {
d := time.Since(receiveTime)
d = minDuration(d, minDurationPerLeaseExtension)
d = maxDuration(d, maxDurationPerLeaseExtension)
it.ackTimeDist.Record(int(d / time.Second))
}

// Called when a message is acked/nacked.
func (it *messageIterator) done(ackID string, ack bool, receiveTime time.Time) {
it.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))
it.addToDistribution(receiveTime)
it.mu.Lock()
defer it.mu.Unlock()
delete(it.keepAliveDeadlines, ackID)
Expand Down Expand Up @@ -565,14 +583,42 @@ func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) {
func (it *messageIterator) ackDeadline() time.Duration {
pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second

if it.po.maxExtensionPeriod > 0 && pt > it.po.maxExtensionPeriod {
return it.po.maxExtensionPeriod
return boundedDuration(pt, it.po.minExtensionPeriod, it.po.maxExtensionPeriod, it.enableExactlyOnceDelivery)
}

func boundedDuration(ackDeadline, minExtension, maxExtension time.Duration, exactlyOnce bool) time.Duration {
// If the user explicitly sets a maxExtensionPeriod, respect it.
if maxExtension > 0 {
ackDeadline = minDuration(ackDeadline, maxExtension)
}
if pt > maxAckDeadline {
return maxAckDeadline

// If the user explicitly sets a minExtensionPeriod, respect it.
if minExtension > 0 {
ackDeadline = maxDuration(ackDeadline, minExtension)
} else if exactlyOnce {
// Higher minimum ack_deadline for subscriptions with
// exactly-once delivery enabled.
ackDeadline = maxDuration(ackDeadline, minDurationPerLeaseExtensionExactlyOnce)
} else if ackDeadline < minDurationPerLeaseExtension {
// Otherwise, lower bound is min ack extension. This is normally bounded
// when adding datapoints to the distribution, but this is needed for
// the initial few calls to ackDeadline.
ackDeadline = minDurationPerLeaseExtension
}
if pt < minAckDeadline {
return minAckDeadline

return ackDeadline
}

func minDuration(x, y time.Duration) time.Duration {
if x < y {
return x
}
return y
}

func maxDuration(x, y time.Duration) time.Duration {
if x > y {
return x
}
return pt
return y
}
87 changes: 81 additions & 6 deletions pubsub/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,13 @@ func TestMaxExtensionPeriod(t *testing.T) {
if err != nil {
t.Fatal(err)
}
want := 1 * time.Second
want := 15 * time.Second
iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{
maxExtensionPeriod: want,
})

receiveTime := time.Now().Add(time.Duration(-3) * time.Second)
// Add a datapoint that's greater than maxExtensionPeriod.
receiveTime := time.Now().Add(time.Duration(-20) * time.Second)
iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))

if got := iter.ackDeadline(); got != want {
Expand All @@ -140,8 +141,8 @@ func TestAckDistribution(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

minAckDeadline = 1 * time.Second
pstest.SetMinAckDeadline(minAckDeadline)
minDurationPerLeaseExtension = 1 * time.Second
pstest.SetMinAckDeadline(minDurationPerLeaseExtension)
srv := pstest.NewServer()
defer srv.Close()
defer pstest.ResetMinAckDeadline()
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestAckDistribution(t *testing.T) {

modacks := modacksByTime(srv.Messages())
u := modackDeadlines(modacks)
initialDL := int32(minAckDeadline / time.Second)
initialDL := int32(minDurationPerLeaseExtension / time.Second)
if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) {
t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v",
initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u))
Expand Down Expand Up @@ -273,7 +274,7 @@ func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, i
recvdWg.Add(1)
msg++
queuedMsgs <- msg
<-time.After(minAckDeadline)
<-time.After(minDurationPerLeaseExtension)

t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+
"when the next batch of messages go out.", initialProcessSecs)
Expand Down Expand Up @@ -421,3 +422,77 @@ func TestIterator_SynchronousPullCancel(t *testing.T) {
t.Fatalf("Got error in pullMessages: %v", err)
}
}

func TestIterator_BoundedDuration(t *testing.T) {
// Use exported fields for time.Duration fields so they
// print nicely. Otherwise, they will print as integers.
//
// AckDeadline is bounded by min/max ack deadline, which are
// 10 seconds and 600 seconds respectively. This is
// true for the real distribution data points as well.
testCases := []struct {
desc string
AckDeadline time.Duration
MinDuration time.Duration
MaxDuration time.Duration
exactlyOnce bool
Want time.Duration
}{
{
desc: "AckDeadline should be updated to the min duration",
AckDeadline: time.Duration(10 * time.Second),
MinDuration: time.Duration(15 * time.Second),
MaxDuration: time.Duration(10 * time.Minute),
exactlyOnce: false,
Want: time.Duration(15 * time.Second),
},
{
desc: "AckDeadline should be updated to 1 minute when using exactly once",
AckDeadline: time.Duration(10 * time.Second),
MinDuration: 0,
MaxDuration: time.Duration(10 * time.Minute),
exactlyOnce: true,
Want: time.Duration(1 * time.Minute),
},
{
desc: "AckDeadline should not be updated here, even though exactly once is enabled",
AckDeadline: time.Duration(10 * time.Second),
MinDuration: time.Duration(15 * time.Second),
MaxDuration: time.Duration(10 * time.Minute),
exactlyOnce: true,
Want: time.Duration(15 * time.Second),
},
{
desc: "AckDeadline should not be updated here",
AckDeadline: time.Duration(10 * time.Minute),
MinDuration: time.Duration(15 * time.Second),
MaxDuration: time.Duration(10 * time.Minute),
exactlyOnce: true,
Want: time.Duration(10 * time.Minute),
},
{
desc: "AckDeadline should not be updated when neither durations are set",
AckDeadline: time.Duration(5 * time.Minute),
MinDuration: 0,
MaxDuration: 0,
exactlyOnce: false,
Want: time.Duration(5 * time.Minute),
},
{
desc: "AckDeadline should should not be updated here since it is within both boundaries",
AckDeadline: time.Duration(5 * time.Minute),
MinDuration: time.Duration(1 * time.Minute),
MaxDuration: time.Duration(7 * time.Minute),
exactlyOnce: false,
Want: time.Duration(5 * time.Minute),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
got := boundedDuration(tc.AckDeadline, tc.MinDuration, tc.MaxDuration, tc.exactlyOnce)
if got != tc.Want {
t.Errorf("boundedDuration mismatch:\n%+v\ngot: %v, want: %v", tc, got, tc.Want)
}
})
}
}
2 changes: 0 additions & 2 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ const (
// ScopeCloudPlatform grants permissions to view and manage your data
// across Google Cloud Platform services.
ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"

maxAckDeadline = 10 * time.Minute
)

// Client is a Google Pub/Sub client scoped to a single project.
Expand Down
27 changes: 21 additions & 6 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,20 @@ type ReceiveSettings struct {
// bounds the maximum amount of time before a message redelivery in the
// event the subscriber fails to extend the deadline.
//
// MaxExtensionPeriod configuration can be disabled by specifying a
// duration less than (or equal to) 0.
// MaxExtensionPeriod must be between 10s and 600s (inclusive). This configuration
// can be disabled by specifying a duration less than (or equal to) 0.
MaxExtensionPeriod time.Duration

// MinExtensionPeriod is the the min duration for a single lease extension attempt.
// By default the 99th percentile of ack latency is used to determine lease extension
// periods but this value can be set to minimize the number of extraneous RPCs sent.
//
// MinExtensionPeriod must be between 10s and 600s (inclusive). This configuration
// can be disabled by specifying a duration less than (or equal to) 0.
// Defaults to off but set to 60 seconds if the subscription has exactly-once delivery enabled,
// which will be added in a future release.
MinExtensionPeriod time.Duration

// MaxOutstandingMessages is the maximum number of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
Expand Down Expand Up @@ -589,13 +599,11 @@ type ReceiveSettings struct {
// idea of a duration that is short, but not so short that we perform excessive RPCs.
const synchronousWaitTime = 100 * time.Millisecond

// This is a var so that tests can change it.
var minAckDeadline = 10 * time.Second

// DefaultReceiveSettings holds the default values for ReceiveSettings.
var DefaultReceiveSettings = ReceiveSettings{
MaxExtension: 60 * time.Minute,
MaxExtensionPeriod: 0,
MinExtensionPeriod: 0,
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9, // 1G
NumGoroutines: 10,
Expand Down Expand Up @@ -850,6 +858,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes

s.checkOrdering(ctx)

// TODO(hongalex): move settings check to a helper function to make it more testable
maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
maxCount = DefaultReceiveSettings.MaxOutstandingMessages
Expand All @@ -867,7 +876,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
}
maxExtPeriod := s.ReceiveSettings.MaxExtensionPeriod
if maxExtPeriod < 0 {
maxExtPeriod = 0
maxExtPeriod = DefaultReceiveSettings.MaxExtensionPeriod
}
minExtPeriod := s.ReceiveSettings.MinExtensionPeriod
if minExtPeriod < 0 {
minExtPeriod = DefaultReceiveSettings.MinExtensionPeriod
}

var numGoroutines int
Expand All @@ -883,6 +896,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
po := &pullOptions{
maxExtension: maxExt,
maxExtensionPeriod: maxExtPeriod,
minExtensionPeriod: minExtPeriod,
maxPrefetch: trunc32(int64(maxCount)),
synchronous: s.ReceiveSettings.Synchronous,
maxOutstandingMessages: maxCount,
Expand Down Expand Up @@ -1060,6 +1074,7 @@ func (s *Subscription) checkOrdering(ctx context.Context) {
type pullOptions struct {
maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
minExtensionPeriod time.Duration // the minimum time to extend a message's lease duration per modack
maxPrefetch int32 // the max number of outstanding messages, used to calculate maxToPull
// If true, use unary Pull instead of StreamingPull, and never pull more
// than maxPrefetch messages.
Expand Down

0 comments on commit f2407c7

Please sign in to comment.